Github user alexpf commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5929#discussion_r185745293
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
    @@ -74,7 +74,12 @@ protected void initializeConnections() {
     
                try {
                        for (String topic : topics) {
    -                           for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
    +                           List<PartitionInfo> topicPartitions = 
kafkaConsumer.partitionsFor(topic);
    +                           if (topicPartitions == null) {
    +                                   throw new IllegalStateException("The 
topic " + topic + " does not exist");
    --- End diff --
    
    Currently, it just throws NPE on running into a first non-existing topic.
    Taking into account both your and Stephan's comment, I think the most 
consistent behavior could be to return the partitions that we're able to find. 
So, if no topics - then, an empty list.
    But in case of an empty list, the AbstractPartitionDiscoverer's 
discoverPartitions() itself throws RuntimeException, so we get back to the 
initial situation. Is it ok?


---

Reply via email to