Repository: nifi
Updated Branches:
  refs/heads/master f120952ab -> a68f87f96


NIFI-1665 This closes #296. fixed GetKafka to reset consumer in case of timeout

NIFI-1665 polishing

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a68f87f9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a68f87f9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a68f87f9

Branch: refs/heads/master
Commit: a68f87f96efdfb3c13d2032410aae38857f51183
Parents: f120952
Author: Oleg Zhurakousky <[email protected]>
Authored: Tue Mar 22 11:19:39 2016 -0400
Committer: joewitt <[email protected]>
Committed: Tue Mar 22 21:48:47 2016 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/kafka/GetKafka.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a68f87f9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 7057dff..e06befb 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -323,9 +323,6 @@ public class GetKafka extends AbstractProcessor {
     @OnScheduled
     public void schedule(ProcessContext context) {
         this.deadlockTimeout = 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
-        if (this.executor == null || this.executor.isShutdown()) {
-            this.executor = Executors.newCachedThreadPool();
-        }
     }
 
     @Override
@@ -335,6 +332,9 @@ public class GetKafka extends AbstractProcessor {
          * of onTrigger. Will be reset to 'false' in the event of exception
          */
         synchronized (this.consumerStreamsReady) {
+            if (this.executor == null || this.executor.isShutdown()) {
+                this.executor = Executors.newCachedThreadPool();
+            }
             if (!this.consumerStreamsReady.get()) {
                 Future<Void> f = this.executor.submit(new Callable<Void>() {
                     @Override

Reply via email to