Repository: apex-malhar Updated Branches: refs/heads/master 7b019fa1b -> 9b62506bd
APEXMALHAR-2135 Upgrade Kafka dependency to 0.8.2.1 Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9b62506b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9b62506b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9b62506b Branch: refs/heads/master Commit: 9b62506bd38436218d616ba6199185dd3bb7b71b Parents: 7b019fa Author: Thomas Weise <[email protected]> Authored: Sat Jul 9 21:30:30 2016 +0200 Committer: Thomas Weise <[email protected]> Committed: Sat Jul 9 21:30:30 2016 +0200 ---------------------------------------------------------------------- contrib/pom.xml | 2 +- .../datatorrent/contrib/kafka/KafkaMetadataUtil.java | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b62506b/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index dcb44c1..1c5be13 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -244,7 +244,7 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> - <version>0.8.1.1</version> + <version>0.8.2.1</version> <optional>true</optional> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b62506b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java index f6057cd..af5045a 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java @@ -59,9 +59,9 @@ import kafka.utils.ZkUtils; */ public class KafkaMetadataUtil { - + public static final String PRODUCER_PROP_PARTITIONER = "partitioner.class"; - + public static final String PRODUCER_PROP_BROKERLIST = "metadata.broker.list"; private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class); @@ -88,7 +88,7 @@ public class KafkaMetadataUtil } return tmd.partitionsMetadata(); } - + /** * @param brokers in multiple clusters, keyed by cluster id * @param topic @@ -104,14 +104,14 @@ public class KafkaMetadataUtil return getPartitionsForTopic(new HashSet<String>(bs), topic); }}); } - - + + public static Set<String> getBrokers(Set<String> zkHost){ - + ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$); Set<String> brokerHosts = new HashSet<String>(); for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) { - brokerHosts.add(b.getConnectionString()); + brokerHosts.add(b.connectionString()); } zkclient.close(); return brokerHosts;
