Github user alexpf commented on a diff in the pull request:
https://github.com/apache/flink/pull/5929#discussion_r185745293
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo :
kafkaConsumer.partitionsFor(topic)) {
+ List<PartitionInfo> topicPartitions =
kafkaConsumer.partitionsFor(topic);
+ if (topicPartitions == null) {
+ throw new IllegalStateException("The
topic " + topic + " does not exist");
--- End diff --
Currently, it just throws NPE on running into a first non-existing topic.
Taking into account both your and Stephan's comment, I think the most
consistent behavior could be to return the partitions that we're able to find.
So, if no topics - then, an empty list.
But in case of an empty list, the AbstractPartitionDiscoverer's
discoverPartitions() itself throws RuntimeException, so we get back to the
initial situation. Is it ok?
---