Github user alexpf commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5929#discussion_r186110995
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
    @@ -74,7 +74,12 @@ protected void initializeConnections() {
     
                try {
                        for (String topic : topics) {
    -                           for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
    +                           List<PartitionInfo> topicPartitions = 
kafkaConsumer.partitionsFor(topic);
    +                           if (topicPartitions == null) {
    +                                   throw new IllegalStateException("The 
topic " + topic + " does not exist");
    --- End diff --
    
    I've took a look into it. Ideally we have to make the go/fail decision one 
level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both 
for the initial seed and further on the run, so that's the guy who should 
decide. The problem I see here is that the 
`AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of 
available partitions, but also filters the partitions applicable for the 
current task. So, once we get the partition list, we can't say whether the list 
is empty because nothing is found, or because the partitions have been 
post-filtered.
    The only way to communicate this difference, as I see it now, is to 
introduce some new specific exception, and catch it at the 
`FlinkKafkaConsumerBase`.


---

Reply via email to