Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227162 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java --- @@ -65,36 +75,145 @@ public void createTopic(String topicName, int numOfPartitions, int replicationFa LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName); } } else { + LOG.info("Connecting zookeeper"); + + initZkClient(); AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); + closeZkClient(); + } + } + + public String getBrokerList(String topicName) { + return getBrokerAddressList(getBrokerAddresses(topicName)); + } + + public String getBrokerList(String topicName, int partitionId) { + return getBrokerAddressList(getBrokerAddresses(topicName, partitionId)); + } + + public Set<String> getBrokerAddresses(String topicName) { + int numOfPartitions = getNumberOfPartitions(topicName); + + HashSet<String> brokers = new HashSet<String>(); + for (int i = 0; i < numOfPartitions; i++) { + brokers.addAll(getBrokerAddresses(topicName, i)); } + return brokers; + } + + public Set<String> getBrokerAddresses(String topicName, int partitionId) { + PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId); + Collection<Broker> inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr()); + + HashSet<String> addresses = new HashSet<String>(); + for (Broker broker : inSyncReplicas) { + addresses.add(broker.connectionString()); + } + return addresses; + } + + private static String getBrokerAddressList(Set<String> brokerAddresses) { + StringBuilder brokerAddressList = new StringBuilder(""); + for (String broker : brokerAddresses) { + brokerAddressList.append(broker); + brokerAddressList.append(','); + } + brokerAddressList.deleteCharAt(brokerAddressList.length() - 1); + + return brokerAddressList.toString(); } public int getNumberOfPartitions(String topicName) { - Seq<PartitionMetadata> partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata(); + Seq<PartitionMetadata> partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata(); return JavaConversions.asJavaCollection(partitionMetadataSeq).size(); } - public String getLeaderBrokerAddressForTopic(String topicName) { - TopicMetadata topicInfo = getTopicInfo(topicName); + public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) { + PartitionMetadata partitionMetadata; + while (true) { + try { + partitionMetadata = getPartitionMetadata(topicName, partitionId); + return partitionMetadata; + } catch (LeaderNotAvailableException e) { + // try fetching metadata again + } --- End diff -- We might end up in an infinitive loop here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---