Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/589#discussion_r28227162
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
 ---
    @@ -65,36 +75,145 @@ public void createTopic(String topicName, int 
numOfPartitions, int replicationFa
                                LOG.warn("Kafka topic \"{}\" already exists. 
Returning without action.", topicName);
                        }
                } else {
    +                   LOG.info("Connecting zookeeper");
    +
    +                   initZkClient();
                        AdminUtils.createTopic(zkClient, topicName, 
numOfPartitions, replicationFactor, topicConfig);
    +                   closeZkClient();
    +           }
    +   }
    +
    +   public String getBrokerList(String topicName) {
    +           return getBrokerAddressList(getBrokerAddresses(topicName));
    +   }
    +
    +   public String getBrokerList(String topicName, int partitionId) {
    +           return getBrokerAddressList(getBrokerAddresses(topicName, 
partitionId));
    +   }
    +
    +   public Set<String> getBrokerAddresses(String topicName) {
    +           int numOfPartitions = getNumberOfPartitions(topicName);
    +
    +           HashSet<String> brokers = new HashSet<String>();
    +           for (int i = 0; i < numOfPartitions; i++) {
    +                   brokers.addAll(getBrokerAddresses(topicName, i));
                }
    +           return brokers;
    +   }
    +
    +   public Set<String> getBrokerAddresses(String topicName, int 
partitionId) {
    +           PartitionMetadata partitionMetadata = 
waitAndGetPartitionMetadata(topicName, partitionId);
    +           Collection<Broker> inSyncReplicas = 
JavaConversions.asJavaCollection(partitionMetadata.isr());
    +
    +           HashSet<String> addresses = new HashSet<String>();
    +           for (Broker broker : inSyncReplicas) {
    +                   addresses.add(broker.connectionString());
    +           }
    +           return addresses;
    +   }
    +
    +   private static String getBrokerAddressList(Set<String> brokerAddresses) 
{
    +           StringBuilder brokerAddressList = new StringBuilder("");
    +           for (String broker : brokerAddresses) {
    +                   brokerAddressList.append(broker);
    +                   brokerAddressList.append(',');
    +           }
    +           brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
    +
    +           return brokerAddressList.toString();
        }
     
        public int getNumberOfPartitions(String topicName) {
    -           Seq<PartitionMetadata> partitionMetadataSeq = 
getTopicInfo(topicName).partitionsMetadata();
    +           Seq<PartitionMetadata> partitionMetadataSeq = 
getTopicMetadata(topicName).partitionsMetadata();
                return 
JavaConversions.asJavaCollection(partitionMetadataSeq).size();
        }
     
    -   public String getLeaderBrokerAddressForTopic(String topicName) {
    -           TopicMetadata topicInfo = getTopicInfo(topicName);
    +   public PartitionMetadata waitAndGetPartitionMetadata(String topicName, 
int partitionId) {
    +           PartitionMetadata partitionMetadata;
    +           while (true) {
    +                   try {
    +                           partitionMetadata = 
getPartitionMetadata(topicName, partitionId);
    +                           return partitionMetadata;
    +                   } catch (LeaderNotAvailableException e) {
    +                           // try fetching metadata again
    +                   }
    --- End diff --
    
    We might end up in an infinitive loop here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to