[
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462134#comment-16462134
]
ASF GitHub Bot commented on FLINK-8497:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5929#discussion_r185732772
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
try {
for (String topic : topics) {
- for (PartitionInfo partitionInfo :
kafkaConsumer.partitionsFor(topic)) {
+ List<PartitionInfo> topicPartitions =
kafkaConsumer.partitionsFor(topic);
+ if (topicPartitions == null) {
+ throw new IllegalStateException("The
topic " + topic + " does not exist");
--- End diff --
I fear that this might be too aggressive.
IMO, it is fine that the user has, say 3 topics, but only one of them
actually doesn't exist.
What we should handle is the case where there is completely no partitions
at all across all provided topics.
Perhaps for this, we only write a log that some topic has no partitions?
> KafkaConsumer throws NPE if topic doesn't exist
> -----------------------------------------------
>
> Key: FLINK-8497
> URL: https://issues.apache.org/jira/browse/FLINK-8497
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.0
> Reporter: chris snow
> Assignee: Aleksei Lesnov
> Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
> "does_not_exist",
> new JSONKeyValueDeserializationSchema(false),
> properties
> );
> DataStream<String> input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)