[ 
https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451774#comment-16451774
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8497:
--------------------------------------------

Hi [~alexey.lesnov], I gave you contributor permissions and assigned you to the 
task.

Please feel free to work on it. You can also pick up unassigned tasks that you 
would like to work on in the future.

> KafkaConsumer throws NPE if topic doesn't exist
> -----------------------------------------------
>
>                 Key: FLINK-8497
>                 URL: https://issues.apache.org/jira/browse/FLINK-8497
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: chris snow
>            Priority: Minor
>
> If I accidentally set the kafka consumer with a topic that doesn't exist:
> {code:java}
> FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
>    "does_not_exist",
>     new JSONKeyValueDeserializationSchema(false),
>     properties
>     );
> DataStream<String> input = env.addSource(kafkaConsumer);{code}
> Flink throws NPE
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748){code}
> Maybe Flink could through an IllegalStateException("Topic not found")?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to