Revert "NIFI-1218 upgraded Kafka to 0.9.0.0 client API Tested and validated that it is still compatible with 0.8.* Kafka brokers"
This reverts commit 37635232c708ed5bdb63033751a74a9aca40b12f. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb514837 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb514837 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb514837 Branch: refs/heads/master Commit: fb514837770ea082c349822a689da3ff7400690f Parents: 51821e0 Author: joewitt <[email protected]> Authored: Sat Dec 19 00:07:34 2015 -0500 Committer: joewitt <[email protected]> Committed: Sat Dec 19 00:07:34 2015 -0500 ---------------------------------------------------------------------- .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 6 +++--- .../org/apache/nifi/processors/kafka/KafkaUtils.java | 3 +-- .../org/apache/nifi/processors/kafka/PutKafka.java | 3 ++- .../org/apache/nifi/processors/kafka/TestPutKafka.java | 13 ------------- 4 files changed, 6 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fb514837/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index 9fb5589..afbffbe 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -37,12 +37,12 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.9.0.0</version> + <version>0.8.2.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.9.0.0</version> + <artifactId>kafka_2.9.1</artifactId> + <version>0.8.2.2</version> <exclusions> <!-- Transitive dependencies excluded because they are located in a legacy Maven repository, which Maven 3 doesn't support. --> http://git-wip-us.apache.org/repos/asf/nifi/blob/fb514837/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index a1e9f65..657d88b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -25,7 +25,6 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; import kafka.api.TopicMetadata; import kafka.utils.ZKStringSerializer; -import kafka.utils.ZkUtils; import scala.collection.JavaConversions; /** @@ -51,7 +50,7 @@ class KafkaUtils { } }); scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false)); + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); return topicMetadatas.size(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fb514837/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index eec03ea..b5766e4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -76,6 +75,8 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; +import scala.actors.threadpool.Arrays; + @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) http://git-wip-us.apache.org/repos/asf/nifi/blob/fb514837/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index e352840..17d1cc8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -475,18 +474,6 @@ public class TestPutKafka { @Override public void close() { } - - @Override - public void close(long arg0, TimeUnit arg1) { - // TODO Auto-generated method stub - - } - - @Override - public void flush() { - // TODO Auto-generated method stub - - } } }
