Mark Payne created NIFI-1665:
--------------------------------
Summary: GetKafka continually throws NullPointerException if it
ever fails to write out message
Key: NIFI-1665
URL: https://issues.apache.org/jira/browse/NIFI-1665
Project: Apache NiFi
Issue Type: Bug
Reporter: Mark Payne
If an Exception is thrown in GetKafka's consumeFromKafka method, it enters the
following block:
{code}
catch (final Exception e) {
this.shutdownConsumer();
getLogger().error("Failed to receive FlowFile from Kafka due to
{}", new Object[]{e});
if (flowFile != null) {
session.remove(flowFile);
}
{code}
This call to shutdownConsumer performs the following:
{code}
if (this.executor != null) {
this.executor.shutdown();
try {
if (!this.executor.awaitTermination(30000,
TimeUnit.MILLISECONDS)) {
this.executor.shutdownNow();
getLogger().warn("Executor did not stop in 30 sec.
Terminated.");
}
this.executor = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
{code}
Now that this.executor is set to null, the onTrigger method will continually
throw NullPointerException because it attempts to call executor.submit:
{code}
synchronized (this.consumerStreamsReady) {
if (!this.consumerStreamsReady.get()) {
Future<Void> f = this.executor.submit(new Callable<Void>() {
...
{code}
and also
{code}
if (this.consumerStreamsReady.get()) {
Future<Void> consumptionFuture = this.executor.submit(new
Callable<Void>() {
@Override
public Void call() throws Exception {
...
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)