Mark Payne created NIFI-1665:
--------------------------------

             Summary: GetKafka continually throws NullPointerException if it 
ever fails to write out message
                 Key: NIFI-1665
                 URL: https://issues.apache.org/jira/browse/NIFI-1665
             Project: Apache NiFi
          Issue Type: Bug
            Reporter: Mark Payne


If an Exception is thrown in GetKafka's consumeFromKafka method, it enters the 
following block:

{code}
        catch (final Exception e) {
            this.shutdownConsumer();
            getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[]{e});
            if (flowFile != null) {
                session.remove(flowFile);
            }
{code}

This call to shutdownConsumer performs the following:

{code}
if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(30000, 
TimeUnit.MILLISECONDS)) {
                    this.executor.shutdownNow();
                    getLogger().warn("Executor did not stop in 30 sec. 
Terminated.");
                }
                this.executor = null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
{code}

Now that this.executor is set to null, the onTrigger method will continually 
throw NullPointerException because it attempts to call executor.submit:

{code}
        synchronized (this.consumerStreamsReady) {
            if (!this.consumerStreamsReady.get()) {
                Future<Void> f = this.executor.submit(new Callable<Void>() {
...
{code}

and also

{code}
        if (this.consumerStreamsReady.get()) {
            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
                @Override
                public Void call() throws Exception {
...
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to