[
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463984#comment-16463984
]
ASF GitHub Bot commented on FLINK-8497:
---------------------------------------
Github user alexpf commented on a diff in the pull request:
https://github.com/apache/flink/pull/5929#discussion_r186110995
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo :
kafkaConsumer.partitionsFor(topic)) {
+ List<PartitionInfo> topicPartitions =
kafkaConsumer.partitionsFor(topic);
+ if (topicPartitions == null) {
+ throw new IllegalStateException("The
topic " + topic + " does not exist");
--- End diff --
I've took a look into it. Ideally we have to make the go/fail decision one
level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both
for the initial seed and further on the run, so that's the guy who should
decide. The problem I see here is that the
`AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of
available partitions, but also filters the partitions applicable for the
current task. So, once we get the partition list, we can't say whether the list
is empty because nothing is found, or because the partitions have been
post-filtered.
The only way to communicate this difference, as I see it now, is to
introduce some new specific exception, and catch it at the
`FlinkKafkaConsumerBase`.
> KafkaConsumer throws NPE if topic doesn't exist
> -----------------------------------------------
>
> Key: FLINK-8497
> URL: https://issues.apache.org/jira/browse/FLINK-8497
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.0
> Reporter: chris snow
> Assignee: Aleksei Lesnov
> Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
> "does_not_exist",
> new JSONKeyValueDeserializationSchema(false),
> properties
> );
> DataStream<String> input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)