[ https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger resolved FLINK-2325. ----------------------------------- Resolution: Fixed Fix Version/s: 0.9.1 0.10 I just verified the behavior of the new {{FlinkKafkaSource}} for consuming from topics which don't exist. It works. The log looks like this: {code} 12:01:02,233 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker localhost:9092 in try 0/3 12:01:02,566 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Error while getting metadata from broker localhost:9092 to find partitions for thatDoesNotExist151 kafka.common.LeaderNotAvailableException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:625) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.<init>(FlinkKafkaConsumer082.java:49) at com.dataartisans.KafkaStringReader.main(KafkaStringReader.java:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 12:01:02,569 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker localhost:9092 in try 1/3 12:01:02,574 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Topic thatDoesNotExist151 has 1 partitions 12:01:02,680 INFO org.apache.flink.streaming.util.ClusterUtil - Running on mini cluster {code} There is a retry logic for getting the topic metadata. The first call to get the metadata fails, because the topic does not yet exist. In the second call, we see that Kafka has created the topic with one partition. The consumer is then able to consume the messages from the topic. > PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a > topic that is created after starting the Source > ----------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-2325 > URL: https://issues.apache.org/jira/browse/FLINK-2325 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 0.9 > Reporter: Rico Bergmann > Assignee: Robert Metzger > Fix For: 0.10, 0.9.1 > > > I'm creating a PersistentKafkaSource reading from a specified topic from > Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) > not yet present. That's why the number of partitions, that is read in the > open(.) function is 0, which leads to arrays of length 0 (lastOffsets and > committedOffsets). > May be it is better to check, whether numberOfPartitions returns 0 and if so, > to take the default number of partitions from Kafka config? > Stacktrace: > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)