NIFI-1684 This closes #308. fixed ZKClient connection leak Signed-off-by: joewitt <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c3d54ab7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c3d54ab7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c3d54ab7 Branch: refs/heads/master Commit: c3d54ab7246fba5ea1432d949c79f97f0e97f25c Parents: 9912f18 Author: Oleg Zhurakousky <[email protected]> Authored: Mon Mar 28 21:48:37 2016 -0400 Committer: joewitt <[email protected]> Committed: Tue Mar 29 09:56:24 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/GetKafka.java | 13 +++--- .../nifi/processors/kafka/KafkaUtils.java | 43 ++++++++++++++------ 2 files changed, 37 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c3d54ab7/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index e06befb..7660305 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -256,13 +256,14 @@ public class GetKafka extends AbstractProcessor { props.setProperty("consumer.timeout.ms", "1"); } + int partitionCount = KafkaUtils.retrievePartitionCountForTopic( + context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); + final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = Consumer.createJavaConsumerConnector(consumerConfig); final Map<String, Integer> topicCountMap = new HashMap<>(1); - int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); - int concurrentTaskToUse = context.getMaxConcurrentTasks(); if (context.getMaxConcurrentTasks() < partitionCount){ this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " @@ -346,14 +347,14 @@ public class GetKafka extends AbstractProcessor { try { f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); f.cancel(true); Thread.currentThread().interrupt(); getLogger().warn("Interrupted while waiting to get connection", e); } catch (ExecutionException e) { throw new IllegalStateException(e); } catch (TimeoutException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); f.cancel(true); getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e); } @@ -374,14 +375,14 @@ public class GetKafka extends AbstractProcessor { try { consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); consumptionFuture.cancel(true); Thread.currentThread().interrupt(); getLogger().warn("Interrupted while consuming messages", e); } catch (ExecutionException e) { throw new IllegalStateException(e); } catch (TimeoutException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); consumptionFuture.cancel(true); getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c3d54ab7/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index a725c2b..8ddea61 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -33,25 +33,42 @@ import scala.collection.JavaConversions; */ class KafkaUtils { + /** * Will retrieve the amount of partitions for a given Kafka topic. */ static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { - ZkClient zkClient = new ZkClient(zookeeperConnectionString); + ZkClient zkClient = null; - zkClient.setZkSerializer(new ZkSerializer() { - @Override - public byte[] serialize(Object o) throws ZkMarshallingError { - return ZKStringSerializer.serialize(o); - } + try { + zkClient = new ZkClient(zookeeperConnectionString); + zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError { + return ZKStringSerializer.serialize(o); + } - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - return ZKStringSerializer.deserialize(bytes); + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + return ZKStringSerializer.deserialize(bytes); + } + }); + scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + if (topicMetadatas != null && topicMetadatas.size() > 0) { + return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size(); + } else { + throw new IllegalStateException("Failed to get metadata for topic " + topicName); } - }); - scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); - return topicMetadatas.size(); + } catch (Exception e) { + throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e); + } finally { + try { + zkClient.close(); + } catch (Exception e2) { + // ignore + } + } } + }
