[ 
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18052561#comment-18052561
 ] 

wangkang edited comment on FLINK-32416 at 1/17/26 12:18 PM:
------------------------------------------------------------

Recently, while verifying this issue feature, I found that if switching from 
the Kafka source to dynamic mode, the system cannot recover from the checkpoint 
state, likely due to incompatibility.

If this issue does exist, we should add compatibility instructions in the 
documentation.

 

The compatibility error is as follows:
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to deserialize 
value
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:140)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at 
org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:358)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer.deserialize(KafkaPartitionSplitSerializer.java:67)
at 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:71)
at 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:35)
at 
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)


was (Author: wangkang):
Recently, while verifying this issue feature, I found that if switching from 
the Kafka source to dynamic mode, the system cannot recover from the checkpoint 
state, likely due to incompatibility.

 
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to deserialize 
value
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:140)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at 
org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:358)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer.deserialize(KafkaPartitionSplitSerializer.java:67)
at 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:71)
at 
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:35)
at 
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at 
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
 
If this issue does exist, we should add compatibility instructions in the 
documentation.

> Initial DynamicKafkaSource Implementation 
> ------------------------------------------
>
>                 Key: FLINK-32416
>                 URL: https://issues.apache.org/jira/browse/FLINK-32416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.1.0
>            Reporter: Mason Chen
>            Assignee: Mason Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kafka-3.1.0
>
>
> Implementation that supports unbounded and bounded modes. With a default 
> implementation of KafkaMetadataService



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to