[ 
https://issues.apache.org/jira/browse/FLINK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-2325.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9.1
                   0.10

I just verified the behavior of the new {{FlinkKafkaSource}} for consuming from 
topics which don't exist.
It works. The log looks like this:

{code}
12:01:02,233 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to get 
topic metadata from broker localhost:9092 in try 0/3
12:01:02,566 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Error while 
getting metadata from broker localhost:9092 to find partitions for 
thatDoesNotExist151
kafka.common.LeaderNotAvailableException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:625)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.<init>(FlinkKafkaConsumer082.java:49)
        at com.dataartisans.KafkaStringReader.main(KafkaStringReader.java:38)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
12:01:02,569 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to get 
topic metadata from broker localhost:9092 in try 1/3
12:01:02,574 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic 
thatDoesNotExist151 has 1 partitions
12:01:02,680 INFO  org.apache.flink.streaming.util.ClusterUtil                  
 - Running on mini cluster
{code}

There is a retry logic for getting the topic metadata. The first call to get 
the metadata fails, because the topic does not yet exist. In the second call, 
we see that Kafka has created the topic with one partition.
The consumer is then able to consume the messages from the topic.

> PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a 
> topic that is created after starting the Source
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2325
>                 URL: https://issues.apache.org/jira/browse/FLINK-2325
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 0.9
>            Reporter: Rico Bergmann
>            Assignee: Robert Metzger
>             Fix For: 0.10, 0.9.1
>
>
> I'm creating a PersistentKafkaSource reading from a specified topic from 
> Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) 
> not yet present. That's why the number of partitions, that is read in the 
> open(.) function is 0, which leads to arrays of length 0 (lastOffsets and 
> committedOffsets).
> May be it is better to check, whether numberOfPartitions returns 0 and if so, 
> to take the default number of partitions from Kafka config?
> Stacktrace:
> java.lang.ArrayIndexOutOfBoundsException: 0
>       at 
> org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to