Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5929#discussion_r185732772
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo :
kafkaConsumer.partitionsFor(topic)) {
+ List<PartitionInfo> topicPartitions =
kafkaConsumer.partitionsFor(topic);
+ if (topicPartitions == null) {
+ throw new IllegalStateException("The
topic " + topic + " does not exist");
--- End diff --
I fear that this might be too aggressive.
IMO, it is fine that the user has, say 3 topics, but only one of them
actually doesn't exist.
What we should handle is the case where there is completely no partitions
at all across all provided topics.
Perhaps for this, we only write a log that some topic has no partitions?
---