Repository: nifi Updated Branches: refs/heads/master 68cfc8c61 -> 148b4497b
NIFI-1629 This closes #282. downgraded Kafka back to 0.8 - added context.yield to PutKafka - added lifecycle hooks to defend from Kafka deadlocks NIFI-1629 changd thread pool implementation in Get/PutKafka Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/148b4497 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/148b4497 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/148b4497 Branch: refs/heads/master Commit: 148b4497b4a31f292e9b5b2fba3ee19555bf808b Parents: 68cfc8c Author: Oleg Zhurakousky <[email protected]> Authored: Wed Mar 16 09:55:08 2016 -0400 Committer: joewitt <[email protected]> Committed: Wed Mar 16 15:38:46 2016 -0400 ---------------------------------------------------------------------- .../nifi-kafka-processors/pom.xml | 4 +- .../apache/nifi/processors/kafka/GetKafka.java | 82 +++++++++++++++++++- .../nifi/processors/kafka/KafkaUtils.java | 3 +- .../apache/nifi/processors/kafka/PutKafka.java | 52 ++++++++++++- .../nifi/processors/kafka/TestGetKafka.java | 15 +++- .../nifi/processors/kafka/TestPutKafka.java | 12 --- 6 files changed, 143 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index 908cb00..cbabc1c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -37,12 +37,12 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.9.0.0</version> + <version>0.8.2.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> - <version>0.9.0.0</version> + <version>0.8.2.2</version> <exclusions> <!-- Transitive dependencies excluded because they are located in a legacy Maven repository, which Maven 3 doesn't support. --> http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 5e7a7ae..7057dff 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -28,8 +28,14 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.nifi.annotation.behavior.DynamicProperty; @@ -40,6 +46,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; @@ -174,6 +181,10 @@ public class GetKafka extends AbstractProcessor { private final AtomicBoolean consumerStreamsReady = new AtomicBoolean(); + private volatile long deadlockTimeout; + + private volatile ExecutorService executor; + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() @@ -287,6 +298,18 @@ public class GetKafka extends AbstractProcessor { consumer.shutdown(); } } + if (this.executor != null) { + this.executor.shutdown(); + try { + if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + this.executor.shutdownNow(); + getLogger().warn("Executor did not stop in 30 sec. Terminated."); + } + this.executor = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @Override @@ -297,6 +320,14 @@ public class GetKafka extends AbstractProcessor { .build(); } + @OnScheduled + public void schedule(ProcessContext context) { + this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; + if (this.executor == null || this.executor.isShutdown()) { + this.executor = Executors.newCachedThreadPool(); + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { /* @@ -305,12 +336,55 @@ public class GetKafka extends AbstractProcessor { */ synchronized (this.consumerStreamsReady) { if (!this.consumerStreamsReady.get()) { - this.createConsumers(context); + Future<Void> f = this.executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + createConsumers(context); + return null; + } + }); + try { + f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + this.consumerStreamsReady.set(false); + f.cancel(true); + Thread.currentThread().interrupt(); + getLogger().warn("Interrupted while waiting to get connection", e); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (TimeoutException e) { + this.consumerStreamsReady.set(false); + f.cancel(true); + getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e); + } } } - ConsumerIterator<byte[], byte[]> iterator = this.getStreamIterator(); - if (iterator != null) { - this.consumeFromKafka(context, session, iterator); + //=== + if (this.consumerStreamsReady.get()) { + Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + ConsumerIterator<byte[], byte[]> iterator = getStreamIterator(); + if (iterator != null) { + consumeFromKafka(context, session, iterator); + } + return null; + } + }); + try { + consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + this.consumerStreamsReady.set(false); + consumptionFuture.cancel(true); + Thread.currentThread().interrupt(); + getLogger().warn("Interrupted while consuming messages", e); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (TimeoutException e) { + this.consumerStreamsReady.set(false); + consumptionFuture.cancel(true); + getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index d09ac4a..a725c2b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -25,7 +25,6 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; import kafka.api.TopicMetadata; import kafka.utils.ZKStringSerializer; -import kafka.utils.ZkUtils; import scala.collection.JavaConversions; /** @@ -52,7 +51,7 @@ class KafkaUtils { } }); scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false)); + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); return topicMetadatas.size(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 1cb28d8..f91099e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -30,10 +30,16 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -255,6 +261,9 @@ public class PutKafka extends AbstractSessionFactoryProcessor { private volatile Producer<byte[], byte[]> producer; + private volatile ExecutorService executor; + private volatile long deadlockTimeout; + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final PropertyDescriptor clientName = new PropertyDescriptor.Builder() @@ -316,10 +325,26 @@ public class PutKafka extends AbstractSessionFactoryProcessor { for (final FlowFileMessageBatch batch : activeBatches) { batch.cancelOrComplete(); } + if (this.executor != null) { + this.executor.shutdown(); + try { + if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + this.executor.shutdownNow(); + getLogger().warn("Executor did not stop in 30 sec. Terminated."); + } + this.executor = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } @OnScheduled public void createProducer(final ProcessContext context) { + this.deadlockTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; + if (this.executor == null || this.executor.isShutdown()) { + this.executor = Executors.newCachedThreadPool(); + } producer = new KafkaProducer<byte[], byte[]>(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer()); } @@ -421,6 +446,7 @@ public class PutKafka extends AbstractSessionFactoryProcessor { .build(); } + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { FlowFileMessageBatch batch; @@ -430,10 +456,32 @@ public class PutKafka extends AbstractSessionFactoryProcessor { final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; + if (flowFile != null){ + Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + doOnTrigger(context, session, flowFile); + return null; + } + }); + try { + consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + consumptionFuture.cancel(true); + Thread.currentThread().interrupt(); + getLogger().warn("Interrupted while sending messages", e); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (TimeoutException e) { + consumptionFuture.cancel(true); + getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e); + } + } else { + context.yield(); } + } + private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException { final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java index 69ff48c..dfcf0d9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java @@ -16,12 +16,11 @@ */ package org.apache.nifi.processors.kafka; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Iterator; import java.util.List; - -import kafka.consumer.ConsumerIterator; -import kafka.message.MessageAndMetadata; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.BasicConfigurator; import org.apache.nifi.processor.ProcessContext; @@ -35,6 +34,9 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import kafka.consumer.ConsumerIterator; +import kafka.message.MessageAndMetadata; + public class TestGetKafka { @BeforeClass @@ -119,6 +121,13 @@ public class TestGetKafka { @Override public void createConsumers(ProcessContext context) { + try { + Field f = GetKafka.class.getDeclaredField("consumerStreamsReady"); + f.setAccessible(true); + ((AtomicBoolean) f.get(this)).set(true); + } catch (Exception e) { + throw new IllegalStateException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/148b4497/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index e12ec2a..2f5da5c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -475,16 +474,5 @@ public class TestPutKafka { @Override public void close() { } - - @Override - public void close(long arg0, TimeUnit arg1) { - // ignore, not used in test - } - - @Override - public void flush() { - // ignore, not used in test - } } - }
