Repository: nifi
Updated Branches:
  refs/heads/master 68cfc8c61 -> 148b4497b


NIFI-1629 This closes #282. downgraded Kafka back to 0.8 - added context.yield 
to PutKafka - added lifecycle hooks to defend from Kafka deadlocks

NIFI-1629 changd thread pool implementation in Get/PutKafka

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/148b4497
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/148b4497
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/148b4497

Branch: refs/heads/master
Commit: 148b4497b4a31f292e9b5b2fba3ee19555bf808b
Parents: 68cfc8c
Author: Oleg Zhurakousky <[email protected]>
Authored: Wed Mar 16 09:55:08 2016 -0400
Committer: joewitt <[email protected]>
Committed: Wed Mar 16 15:38:46 2016 -0400

----------------------------------------------------------------------
 .../nifi-kafka-processors/pom.xml               |  4 +-
 .../apache/nifi/processors/kafka/GetKafka.java  | 82 +++++++++++++++++++-
 .../nifi/processors/kafka/KafkaUtils.java       |  3 +-
 .../apache/nifi/processors/kafka/PutKafka.java  | 52 ++++++++++++-
 .../nifi/processors/kafka/TestGetKafka.java     | 15 +++-
 .../nifi/processors/kafka/TestPutKafka.java     | 12 ---
 6 files changed, 143 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
index 908cb00..cbabc1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
@@ -37,12 +37,12 @@
         <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
-                   <version>0.9.0.0</version>
+                   <version>0.8.2.2</version>
                </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>
-            <version>0.9.0.0</version>
+            <version>0.8.2.2</version>
             <exclusions>
                 <!-- Transitive dependencies excluded because they are located 
                 in a legacy Maven repository, which Maven 3 doesn't support. 
-->

http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 5e7a7ae..7057dff 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -28,8 +28,14 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -40,6 +46,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -174,6 +181,10 @@ public class GetKafka extends AbstractProcessor {
 
     private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
 
+    private volatile long deadlockTimeout;
+
+    private volatile ExecutorService executor;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
@@ -287,6 +298,18 @@ public class GetKafka extends AbstractProcessor {
                 consumer.shutdown();
             }
         }
+        if (this.executor != null) {
+            this.executor.shutdown();
+            try {
+                if (!this.executor.awaitTermination(30000, 
TimeUnit.MILLISECONDS)) {
+                    this.executor.shutdownNow();
+                    getLogger().warn("Executor did not stop in 30 sec. 
Terminated.");
+                }
+                this.executor = null;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
     }
 
     @Override
@@ -297,6 +320,14 @@ public class GetKafka extends AbstractProcessor {
                 .build();
     }
 
+    @OnScheduled
+    public void schedule(ProcessContext context) {
+        this.deadlockTimeout = 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
+        if (this.executor == null || this.executor.isShutdown()) {
+            this.executor = Executors.newCachedThreadPool();
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         /*
@@ -305,12 +336,55 @@ public class GetKafka extends AbstractProcessor {
          */
         synchronized (this.consumerStreamsReady) {
             if (!this.consumerStreamsReady.get()) {
-                this.createConsumers(context);
+                Future<Void> f = this.executor.submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        createConsumers(context);
+                        return null;
+                    }
+                });
+                try {
+                    f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    this.consumerStreamsReady.set(false);
+                    f.cancel(true);
+                    Thread.currentThread().interrupt();
+                    getLogger().warn("Interrupted while waiting to get 
connection", e);
+                } catch (ExecutionException e) {
+                    throw new IllegalStateException(e);
+                } catch (TimeoutException e) {
+                    this.consumerStreamsReady.set(false);
+                    f.cancel(true);
+                    getLogger().warn("Timed out after " + this.deadlockTimeout 
+ " milliseconds while waiting to get connection", e);
+                }
             }
         }
-        ConsumerIterator<byte[], byte[]> iterator = this.getStreamIterator();
-        if (iterator != null) {
-            this.consumeFromKafka(context, session, iterator);
+        //===
+        if (this.consumerStreamsReady.get()) {
+            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    ConsumerIterator<byte[], byte[]> iterator = 
getStreamIterator();
+                    if (iterator != null) {
+                        consumeFromKafka(context, session, iterator);
+                    }
+                    return null;
+                }
+            });
+            try {
+                consumptionFuture.get(this.deadlockTimeout, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                this.consumerStreamsReady.set(false);
+                consumptionFuture.cancel(true);
+                Thread.currentThread().interrupt();
+                getLogger().warn("Interrupted while consuming messages", e);
+            } catch (ExecutionException e) {
+                throw new IllegalStateException(e);
+            } catch (TimeoutException e) {
+                this.consumerStreamsReady.set(false);
+                consumptionFuture.cancel(true);
+                getLogger().warn("Timed out after " + this.deadlockTimeout + " 
milliseconds while consuming messages", e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
index d09ac4a..a725c2b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -25,7 +25,6 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
 import kafka.admin.AdminUtils;
 import kafka.api.TopicMetadata;
 import kafka.utils.ZKStringSerializer;
-import kafka.utils.ZkUtils;
 import scala.collection.JavaConversions;
 
 /**
@@ -52,7 +51,7 @@ class KafkaUtils {
             }
         });
         scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
-                
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 ZkUtils.apply(zkClient, false));
+                
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 zkClient);
         return topicMetadatas.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 1cb28d8..f91099e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -30,10 +30,16 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
@@ -255,6 +261,9 @@ public class PutKafka extends 
AbstractSessionFactoryProcessor {
 
     private volatile Producer<byte[], byte[]> producer;
 
+    private volatile ExecutorService executor;
+    private volatile long deadlockTimeout;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
@@ -316,10 +325,26 @@ public class PutKafka extends 
AbstractSessionFactoryProcessor {
         for (final FlowFileMessageBatch batch : activeBatches) {
             batch.cancelOrComplete();
         }
+        if (this.executor != null) {
+            this.executor.shutdown();
+            try {
+                if (!this.executor.awaitTermination(30000, 
TimeUnit.MILLISECONDS)) {
+                    this.executor.shutdownNow();
+                    getLogger().warn("Executor did not stop in 30 sec. 
Terminated.");
+                }
+                this.executor = null;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
     }
 
     @OnScheduled
     public void createProducer(final ProcessContext context) {
+        this.deadlockTimeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
+        if (this.executor == null || this.executor.isShutdown()) {
+            this.executor = Executors.newCachedThreadPool();
+        }
         producer = new KafkaProducer<byte[], byte[]>(createConfig(context), 
new ByteArraySerializer(), new ByteArraySerializer());
     }
 
@@ -421,6 +446,7 @@ public class PutKafka extends 
AbstractSessionFactoryProcessor {
                 .build();
     }
 
+
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         FlowFileMessageBatch batch;
@@ -430,10 +456,32 @@ public class PutKafka extends 
AbstractSessionFactoryProcessor {
 
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
+        if (flowFile != null){
+            Future<Void> consumptionFuture = this.executor.submit(new 
Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    doOnTrigger(context, session, flowFile);
+                    return null;
+                }
+            });
+            try {
+                consumptionFuture.get(this.deadlockTimeout, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                consumptionFuture.cancel(true);
+                Thread.currentThread().interrupt();
+                getLogger().warn("Interrupted while sending messages", e);
+            } catch (ExecutionException e) {
+                throw new IllegalStateException(e);
+            } catch (TimeoutException e) {
+                consumptionFuture.cancel(true);
+                getLogger().warn("Timed out after " + this.deadlockTimeout + " 
milliseconds while sending messages", e);
+            }
+        } else {
+            context.yield();
         }
+    }
 
+    private void doOnTrigger(final ProcessContext context, ProcessSession 
session, final FlowFile flowFile) throws ProcessException {
         final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
         final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
         final byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);

http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
index 69ff48c..dfcf0d9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -16,12 +16,11 @@
  */
 package org.apache.nifi.processors.kafka;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-
-import kafka.consumer.ConsumerIterator;
-import kafka.message.MessageAndMetadata;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.BasicConfigurator;
 import org.apache.nifi.processor.ProcessContext;
@@ -35,6 +34,9 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
 public class TestGetKafka {
 
     @BeforeClass
@@ -119,6 +121,13 @@ public class TestGetKafka {
 
         @Override
         public void createConsumers(ProcessContext context) {
+            try {
+                Field f = 
GetKafka.class.getDeclaredField("consumerStreamsReady");
+                f.setAccessible(true);
+                ((AtomicBoolean) f.get(this)).set(true);
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index e12ec2a..2f5da5c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.producer.BufferExhaustedException;
 import org.apache.kafka.clients.producer.Callback;
@@ -475,16 +474,5 @@ public class TestPutKafka {
         @Override
         public void close() {
         }
-
-        @Override
-        public void close(long arg0, TimeUnit arg1) {
-            // ignore, not used in test
-        }
-
-        @Override
-        public void flush() {
-            // ignore, not used in test
-        }
     }
-
 }

Reply via email to