Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5929#discussion_r185759080
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo :
kafkaConsumer.partitionsFor(topic)) {
+ List<PartitionInfo> topicPartitions =
kafkaConsumer.partitionsFor(topic);
+ if (topicPartitions == null) {
+ throw new IllegalStateException("The
topic " + topic + " does not exist");
--- End diff --
I think the `RuntimeException` in
`AbstractPartitionDiscoverer#discoverPartitions` maybe needs to be revisited,
also.
As far as I understand it, we should only fail the job if for the first
discovery (for seed initial partitions that the connector consumes) is empty
across all partitions. Otherwise, it should be ok that while the job runs, the
partitions discover fails fetch partition meta info for some discovery attempt.
---