NIFI-1218 upgraded Kafka to 0.9.0.0 client API Tested and validated that it is still compatible with 0.8.* Kafka brokers
Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37635232 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37635232 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37635232 Branch: refs/heads/master Commit: 37635232c708ed5bdb63033751a74a9aca40b12f Parents: b19ff7c Author: Oleg Zhurakousky <[email protected]> Authored: Wed Dec 16 12:49:44 2015 -0500 Committer: jpercivall <[email protected]> Committed: Wed Dec 16 14:25:28 2015 -0500 ---------------------------------------------------------------------- .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 6 +++--- .../org/apache/nifi/processors/kafka/KafkaUtils.java | 3 ++- .../org/apache/nifi/processors/kafka/PutKafka.java | 3 +-- .../org/apache/nifi/processors/kafka/TestPutKafka.java | 13 +++++++++++++ 4 files changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index afbffbe..9fb5589 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -37,12 +37,12 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.8.2.2</version> + <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.9.1</artifactId> - <version>0.8.2.2</version> + <artifactId>kafka_2.10</artifactId> + <version>0.9.0.0</version> <exclusions> <!-- Transitive dependencies excluded because they are located in a legacy Maven repository, which Maven 3 doesn't support. --> http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index 657d88b..a1e9f65 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; import kafka.api.TopicMetadata; import kafka.utils.ZKStringSerializer; +import kafka.utils.ZkUtils; import scala.collection.JavaConversions; /** @@ -50,7 +51,7 @@ class KafkaUtils { } }); scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false)); return topicMetadatas.size(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index b5766e4..eec03ea 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; -import scala.actors.threadpool.Arrays; - @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) http://git-wip-us.apache.org/repos/asf/nifi/blob/37635232/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 17d1cc8..e352840 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -474,6 +475,18 @@ public class TestPutKafka { @Override public void close() { } + + @Override + public void close(long arg0, TimeUnit arg1) { + // TODO Auto-generated method stub + + } + + @Override + public void flush() { + // TODO Auto-generated method stub + + } } }
