Repository: apex-malhar Updated Branches: refs/heads/master 7d9386d2a -> 3f30b81a6
APEXMALHAR-2199 #closes #380 #resolve #comment Simplify the zookeeper url parser to use whatever user specified and support chroot path Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3f30b81a Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3f30b81a Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3f30b81a Branch: refs/heads/master Commit: 3f30b81a6e123879f33f95bc025c35808860fedc Parents: 7d9386d Author: Siyuan Hua <[email protected]> Authored: Fri Aug 26 16:58:12 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Fri Aug 26 16:58:28 2016 -0700 ---------------------------------------------------------------------- .../contrib/kafka/HighlevelKafkaConsumer.java | 2 +- .../contrib/kafka/KafkaConsumer.java | 27 ++++++++------------ .../contrib/kafka/KafkaMetadataUtil.java | 8 ++++-- .../contrib/kafka/KafkaInputOperatorTest.java | 10 +++----- 4 files changed, 22 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java index 2f7cece..5b9c5ed 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java @@ -122,7 +122,7 @@ public class HighlevelKafkaConsumer extends KafkaConsumer // create high level consumer for every cluster Properties config = new Properties(); config.putAll(consumerConfig); - config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeperMap.get(cluster))); + config.setProperty("zookeeper.connect", zookeeperMap.get(cluster).iterator().next()); // create consumer connector will start a daemon thread to monitor the metadata change // we want to start this thread until the operator is activated standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config))); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java index 805fdc4..a67ff48 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java @@ -86,8 +86,16 @@ public abstract class KafkaConsumer implements Closeable protected String topic = "default_topic"; /** - * A zookeeper map keyed by cluster id - * It's mandatory field + * A zookeeper map keyed by cluster id. + * It's mandatory field <br> + * Each cluster should have only one connection string contain all nodes in the cluster <br> + * zookeeper chroot path is also supported <br> + * + * Single cluster zookeeper example: <br> + * node1:2181,node2:2181,node3:2181/your/kafka/data <br> + * Multi-cluster zookeeper example: <br> + * cluster1::node1:2181,node2:2181,node3:2181/cluster1;cluster2::node1:2181/cluster2 + * */ @NotNull @Bind(JavaSerializer.class) @@ -535,20 +543,7 @@ public abstract class KafkaConsumer implements Closeable for (String zk : zookeeper.split(";")) { String[] parts = zk.split("::"); String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0]; - String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(","); - String portId = ""; - for (int idx = hostNames.length - 1; idx >= 0; idx--) { - String[] zkParts = hostNames[idx].split(":"); - if (zkParts.length == 2) { - portId = zkParts[1]; - } - if (!portId.isEmpty() && portId != "") { - theClusters.put(clusterId, zkParts[0] + ":" + portId); - } else { - throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeper + "\n" - + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2"); - } - } + theClusters.put(clusterId, parts.length == 1 ? parts[0] : parts[1]); } return theClusters; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java index af5045a..b9d4b1b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java @@ -105,10 +105,14 @@ public class KafkaMetadataUtil }}); } - + /** + * There is always only one string in zkHost + * @param zkHost + * @return + */ public static Set<String> getBrokers(Set<String> zkHost){ - ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$); + ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$); Set<String> brokerHosts = new HashSet<String>(); for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) { brokerHosts.add(b.connectionString()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3f30b81a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index e4a4dec..f3af37f 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -488,14 +488,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase consumer.setTopic(TEST_TOPIC); testMeta.operator.setConsumer(consumer); - testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181"); + testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182/chroot/dir;cluster2::node4:2181"); latch.await(500, TimeUnit.MILLISECONDS); - Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().zookeeperMap.size()); - Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().zookeeperMap.get("cluster1").size()); - Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").toString()); - Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().zookeeperMap.get("cluster2").size()); - Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").toString()); + Assert.assertEquals("Total size of clusters ", 2, testMeta.operator.getConsumer().zookeeperMap.size()); + Assert.assertEquals("Connection url for cluster1 ", "node0,node1,node2:2181,node3:2182/chroot/dir", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").iterator().next()); + Assert.assertEquals("Connection url for cluster 2 ", "node4:2181", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").iterator().next()); } }
