Jiangfei Liu created FLINK-28156:
------------------------------------

             Summary: Flink KafkaSource with Bounded Throw Exception
                 Key: FLINK-28156
                 URL: https://issues.apache.org/jira/browse/FLINK-28156
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Connectors / Kafka
    Affects Versions: 1.14.3
            Reporter: Jiangfei Liu


I want to use KafkaSource consume topic between commited offset and last-offset,

but throw a exception

 KafkaSource.<String>builder()
.setBootstrapServers("10.18.34.43:9092,10.18.34.44:9092,10.18.34.45:9092")
.setTopics(topic)
.setGroupId(groupId)
// .setStartingOffsets(OffsetsInitializer.timestamp(1655717760000L))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setBounded(OffsetsInitializer.latest())
.build();

 

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any 
topics or assigned any partitions
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:99)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
    ... 6 more



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to