Github user alexpf commented on a diff in the pull request:
https://github.com/apache/flink/pull/5929#discussion_r186110995
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo :
kafkaConsumer.partitionsFor(topic)) {
+ List<PartitionInfo> topicPartitions =
kafkaConsumer.partitionsFor(topic);
+ if (topicPartitions == null) {
+ throw new IllegalStateException("The
topic " + topic + " does not exist");
--- End diff --
I've took a look into it. Ideally we have to make the go/fail decision one
level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both
for the initial seed and further on the run, so that's the guy who should
decide. The problem I see here is that the
`AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of
available partitions, but also filters the partitions applicable for the
current task. So, once we get the partition list, we can't say whether the list
is empty because nothing is found, or because the partitions have been
post-filtered.
The only way to communicate this difference, as I see it now, is to
introduce some new specific exception, and catch it at the
`FlinkKafkaConsumerBase`.
---