Repository: nifi Updated Branches: refs/heads/master f120952ab -> a68f87f96
NIFI-1665 This closes #296. fixed GetKafka to reset consumer in case of timeout NIFI-1665 polishing Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a68f87f9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a68f87f9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a68f87f9 Branch: refs/heads/master Commit: a68f87f96efdfb3c13d2032410aae38857f51183 Parents: f120952 Author: Oleg Zhurakousky <[email protected]> Authored: Tue Mar 22 11:19:39 2016 -0400 Committer: joewitt <[email protected]> Committed: Tue Mar 22 21:48:47 2016 -0600 ---------------------------------------------------------------------- .../main/java/org/apache/nifi/processors/kafka/GetKafka.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a68f87f9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 7057dff..e06befb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -323,9 +323,6 @@ public class GetKafka extends AbstractProcessor { @OnScheduled public void schedule(ProcessContext context) { this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; - if (this.executor == null || this.executor.isShutdown()) { - this.executor = Executors.newCachedThreadPool(); - } } @Override @@ -335,6 +332,9 @@ public class GetKafka extends AbstractProcessor { * of onTrigger. Will be reset to 'false' in the event of exception */ synchronized (this.consumerStreamsReady) { + if (this.executor == null || this.executor.isShutdown()) { + this.executor = Executors.newCachedThreadPool(); + } if (!this.consumerStreamsReady.get()) { Future<Void> f = this.executor.submit(new Callable<Void>() { @Override
