[
https://issues.apache.org/jira/browse/FLINK-32416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18052561#comment-18052561
]
wangkang commented on FLINK-32416:
----------------------------------
Recently, while verifying this issue feature, I found that if switching from
the Kafka source to dynamic mode, the system cannot recover from the checkpoint
state, likely due to incompatibility.
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to deserialize
value
at
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:140)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119)
at
org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:358)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer.deserialize(KafkaPartitionSplitSerializer.java:67)
at
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:71)
at
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplitSerializer.deserialize(DynamicKafkaSourceSplitSerializer.java:35)
at
org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227)
at
org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138)
If this issue does exist, we should add compatibility instructions in the
documentation.
> Initial DynamicKafkaSource Implementation
> ------------------------------------------
>
> Key: FLINK-32416
> URL: https://issues.apache.org/jira/browse/FLINK-32416
> Project: Flink
> Issue Type: Sub-task
> Components: Connectors / Kafka
> Affects Versions: kafka-3.1.0
> Reporter: Mason Chen
> Assignee: Mason Chen
> Priority: Major
> Labels: pull-request-available
> Fix For: kafka-3.1.0
>
>
> Implementation that supports unbounded and bounded modes. With a default
> implementation of KafkaMetadataService
--
This message was sent by Atlassian Jira
(v8.20.10#820010)