Repository: nifi
Updated Branches:
  refs/heads/master 2afc739ab -> 8d6e12fdc


NIFI-2739: Call KafkaConsumer.wakeup() if consumer is blocking for at least 30 
seconds when OnUnscheduled is called


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8d6e12fd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8d6e12fd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8d6e12fd

Branch: refs/heads/master
Commit: 8d6e12fdc4002f6eba52edcf0670df40217eb43a
Parents: 2afc739
Author: Mark Payne <[email protected]>
Authored: Tue Sep 6 20:27:10 2016 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Sep 7 08:56:23 2016 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafka_0_10.java         | 42 ++++++++++++++++++
 .../processors/kafka/pubsub/ConsumerLease.java  |  8 ++++
 .../processors/kafka/pubsub/ConsumeKafka.java   | 46 +++++++++++++++++++-
 .../processors/kafka/pubsub/ConsumerLease.java  |  8 ++++
 4 files changed, 102 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8d6e12fd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index e799876..41c8cc6 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -21,12 +21,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -35,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -157,6 +160,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
     static final Set<Relationship> RELATIONSHIPS;
 
     private volatile ConsumerPool consumerPool = null;
+    private final Set<ConsumerLease> activeLeases = 
Collections.synchronizedSet(new HashSet<>());
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -239,6 +243,37 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         return new ConsumerPool(maxLeases, demarcator, props, topics, 
maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }
 
+    @OnUnscheduled
+    public void interruptActiveThreads() {
+        // There are known issues with the Kafka client library that result in 
the client code hanging
+        // indefinitely when unable to communicate with the broker. In order 
to address this, we will wait
+        // up to 30 seconds for the Threads to finish and then will call 
Consumer.wakeup() to trigger the
+        // thread to wakeup when it is blocked, waiting on a response.
+        final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
+        final long start = System.nanoTime();
+        while (System.nanoTime() - start < nanosToWait && 
!activeLeases.isEmpty()) {
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        if (!activeLeases.isEmpty()) {
+            int count = 0;
+            for (final ConsumerLease lease : activeLeases) {
+                getLogger().info("Consumer {} has not finished after waiting 
30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                lease.wakeup();
+                count++;
+            }
+
+            getLogger().info("Woke up {} consumers", new Object[] {count});
+        }
+
+        activeLeases.clear();
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         final ConsumerPool pool = getConsumerPool(context);
@@ -252,6 +287,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
                 context.yield();
                 return;
             }
+
+            activeLeases.add(lease);
             try {
                 while (this.isScheduled() && lease.continuePolling()) {
                     lease.poll();
@@ -259,12 +296,17 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
                 if (this.isScheduled() && !lease.commit()) {
                     context.yield();
                 }
+            } catch (final WakeupException we) {
+                getLogger().warn("Was interrupted while trying to communicate 
with Kafka with lease {}. "
+                    + "Will roll back session and discard any partially 
received data.", new Object[] {lease});
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}",
                         new Object[]{lease, kex}, kex);
             } catch (final Throwable t) {
                 getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}",
                         new Object[]{lease, t}, t);
+            } finally {
+                activeLeases.remove(lease);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8d6e12fd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index f7a2e57..c611fa2 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -249,6 +250,13 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
     }
 
     /**
+     * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
+     */
+    public void wakeup() {
+        kafkaConsumer.wakeup();
+    }
+
+    /**
      * Abstract method that is intended to be extended by the pool that created
      * this ConsumerLease object. It should ensure that the session given to
      * create this session is rolled back and that the underlying kafka 
consumer

http://git-wip-us.apache.org/repos/asf/nifi/blob/8d6e12fd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 0b8a752..c311e2a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -21,12 +21,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -35,6 +37,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -157,6 +160,7 @@ public class ConsumeKafka extends AbstractProcessor {
     static final Set<Relationship> RELATIONSHIPS;
 
     private volatile ConsumerPool consumerPool = null;
+    private final Set<ConsumerLease> activeLeases = 
Collections.synchronizedSet(new HashSet<>());
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -239,6 +243,37 @@ public class ConsumeKafka extends AbstractProcessor {
         return new ConsumerPool(maxLeases, demarcator, props, topics, 
maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }
 
+    @OnUnscheduled
+    public void interruptActiveThreads() {
+        // There are known issues with the Kafka client library that result in 
the client code hanging
+        // indefinitely when unable to communicate with the broker. In order 
to address this, we will wait
+        // up to 30 seconds for the Threads to finish and then will call 
Consumer.wakeup() to trigger the
+        // thread to wakeup when it is blocked, waiting on a response.
+        final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
+        final long start = System.nanoTime();
+        while (System.nanoTime() - start < nanosToWait && 
!activeLeases.isEmpty()) {
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        if (!activeLeases.isEmpty()) {
+            int count = 0;
+            for (final ConsumerLease lease : activeLeases) {
+                getLogger().info("Consumer {} has not finished after waiting 
30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                lease.wakeup();
+                count++;
+            }
+
+            getLogger().info("Woke up {} consumers", new Object[] {count});
+        }
+
+        activeLeases.clear();
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         final ConsumerPool pool = getConsumerPool(context);
@@ -252,6 +287,8 @@ public class ConsumeKafka extends AbstractProcessor {
                 context.yield();
                 return;
             }
+
+            activeLeases.add(lease);
             try {
                 while (this.isScheduled() && lease.continuePolling()) {
                     lease.poll();
@@ -259,12 +296,17 @@ public class ConsumeKafka extends AbstractProcessor {
                 if (this.isScheduled() && !lease.commit()) {
                     context.yield();
                 }
+            } catch (final WakeupException we) {
+                getLogger().warn("Was interrupted while trying to communicate 
with Kafka with lease {}. "
+                    + "Will roll back session and discard any partially 
received data.", new Object[] {lease});
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}",
-                        new Object[]{lease, kex}, kex);
+                    new Object[] {lease, kex}, kex);
             } catch (final Throwable t) {
                 getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}",
-                        new Object[]{lease, t}, t);
+                    new Object[] {lease, t}, t);
+            } finally {
+                activeLeases.remove(lease);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8d6e12fd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 5b8ba1c..ad66bb2 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -249,6 +250,13 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
     }
 
     /**
+     * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
+     */
+    public void wakeup() {
+        kafkaConsumer.wakeup();
+    }
+
+    /**
      * Abstract method that is intended to be extended by the pool that created
      * this ConsumerLease object. It should ensure that the session given to
      * create this session is rolled back and that the underlying kafka 
consumer

Reply via email to