[ 
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18052561#comment-18052561
 ] 

wangkang commented on FLINK-32416:
----------------------------------

Recently, while verifying this issue feature, I found that if switching from 
the Kafka source to dynamic mode, the system cannot recover from the checkpoint 
state, likely due to incompatibility.

 
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to deserialize 
value
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:140)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at 
org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:358)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer.deserialize(KafkaPartitionSplitSerializer.java:67)
at 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:71)
at 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:35)
at 
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
 
If this issue does exist, we should add compatibility instructions in the 
documentation.

> Initial DynamicKafkaSource Implementation 
> ------------------------------------------
>
>                 Key: FLINK-32416
>                 URL: https://issues.apache.org/jira/browse/FLINK-32416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.1.0
>            Reporter: Mason Chen
>            Assignee: Mason Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kafka-3.1.0
>
>
> Implementation that supports unbounded and bounded modes. With a default 
> implementation of KafkaMetadataService



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to