Repository: nifi Updated Branches: refs/heads/master 287d3ef53 -> 133838a93
NIFI-1233 upgraded to Kafka 0.9.0.0 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/133838a9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/133838a9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/133838a9 Branch: refs/heads/master Commit: 133838a93fa5a5771e9812a14845003ad8782c96 Parents: 287d3ef Author: Oleg Zhurakousky <[email protected]> Authored: Tue Jan 12 10:56:49 2016 -0500 Committer: jpercivall <[email protected]> Committed: Tue Jan 12 18:21:38 2016 -0500 ---------------------------------------------------------------------- .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 6 +++--- .../org/apache/nifi/processors/kafka/KafkaUtils.java | 4 +++- .../java/org/apache/nifi/processors/kafka/PutKafka.java | 5 ++--- .../org/apache/nifi/processors/kafka/TestPutKafka.java | 11 +++++++++++ 4 files changed, 19 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index 2eee050..fb59e69 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -37,12 +37,12 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.8.2.2</version> + <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.9.1</artifactId> - <version>0.8.2.2</version> + <artifactId>kafka_2.10</artifactId> + <version>0.9.0.0</version > <exclusions> <!-- Transitive dependencies excluded because they are located in a legacy Maven repository, which Maven 3 doesn't support. --> http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index 657d88b..d09ac4a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; import kafka.api.TopicMetadata; import kafka.utils.ZKStringSerializer; +import kafka.utils.ZkUtils; import scala.collection.JavaConversions; /** @@ -38,6 +39,7 @@ class KafkaUtils { */ static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { ZkClient zkClient = new ZkClient(zookeeperConnectionString); + zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object o) throws ZkMarshallingError { @@ -50,7 +52,7 @@ class KafkaUtils { } }); scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false)); return topicMetadatas.size(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index b5766e4..febb666 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,14 +20,15 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; -import scala.actors.threadpool.Arrays; - @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 17d1cc8..e12ec2a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -474,6 +475,16 @@ public class TestPutKafka { @Override public void close() { } + + @Override + public void close(long arg0, TimeUnit arg1) { + // ignore, not used in test + } + + @Override + public void flush() { + // ignore, not used in test + } } }
