Repository: nifi
Updated Branches:
  refs/heads/master 287d3ef53 -> 133838a93


NIFI-1233 upgraded to Kafka 0.9.0.0

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/133838a9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/133838a9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/133838a9

Branch: refs/heads/master
Commit: 133838a93fa5a5771e9812a14845003ad8782c96
Parents: 287d3ef
Author: Oleg Zhurakousky <[email protected]>
Authored: Tue Jan 12 10:56:49 2016 -0500
Committer: jpercivall <[email protected]>
Committed: Tue Jan 12 18:21:38 2016 -0500

----------------------------------------------------------------------
 .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml      |  6 +++---
 .../org/apache/nifi/processors/kafka/KafkaUtils.java     |  4 +++-
 .../java/org/apache/nifi/processors/kafka/PutKafka.java  |  5 ++---
 .../org/apache/nifi/processors/kafka/TestPutKafka.java   | 11 +++++++++++
 4 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
index 2eee050..fb59e69 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
@@ -37,12 +37,12 @@
         <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
-                   <version>0.8.2.2</version>
+                   <version>0.9.0.0</version>
                </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.9.1</artifactId>
-            <version>0.8.2.2</version>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.9.0.0</version  >
             <exclusions>
                 <!-- Transitive dependencies excluded because they are located 
                 in a legacy Maven repository, which Maven 3 doesn't support. 
-->

http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
index 657d88b..d09ac4a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
 import kafka.admin.AdminUtils;
 import kafka.api.TopicMetadata;
 import kafka.utils.ZKStringSerializer;
+import kafka.utils.ZkUtils;
 import scala.collection.JavaConversions;
 
 /**
@@ -38,6 +39,7 @@ class KafkaUtils {
      */
     static int retrievePartitionCountForTopic(String 
zookeeperConnectionString, String topicName) {
         ZkClient zkClient = new ZkClient(zookeeperConnectionString);
+
         zkClient.setZkSerializer(new ZkSerializer() {
             @Override
             public byte[] serialize(Object o) throws ZkMarshallingError {
@@ -50,7 +52,7 @@ class KafkaUtils {
             }
         });
         scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
-                
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 zkClient);
+                
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 ZkUtils.apply(zkClient, false));
         return topicMetadatas.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index b5766e4..febb666 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -20,14 +20,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
 import org.apache.nifi.util.LongHolder;
 
-import scala.actors.threadpool.Arrays;
-
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})

http://git-wip-us.apache.org/repos/asf/nifi/blob/133838a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 17d1cc8..e12ec2a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.producer.BufferExhaustedException;
 import org.apache.kafka.clients.producer.Callback;
@@ -474,6 +475,16 @@ public class TestPutKafka {
         @Override
         public void close() {
         }
+
+        @Override
+        public void close(long arg0, TimeUnit arg1) {
+            // ignore, not used in test
+        }
+
+        @Override
+        public void flush() {
+            // ignore, not used in test
+        }
     }
 
 }

Reply via email to