[
https://issues.apache.org/jira/browse/FLINK-36891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu reassigned FLINK-36891:
----------------------------------
Assignee: Sergei Morozov
> MySQL CDC connector produces corrupted state in case of serialization failure
> -----------------------------------------------------------------------------
>
> Key: FLINK-36891
> URL: https://issues.apache.org/jira/browse/FLINK-36891
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.2.1
> Reporter: Sergei Morozov
> Assignee: Sergei Morozov
> Priority: Major
> Labels: pull-request-available
>
> PendingSplitsStateSerializer maintains a {{DataOutputSerializer}} instance
> stored in {{{}SERIALIZER_CACHE{}}}. If a call to {{serialize()}} fails with
> an exception, then the value returned by a subsequent call will contain the
> partial results of the previous call.
> As a result, this state is corrupted and cannot be deserialized.
> Example serialization failure:
> {code:java}
> org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint
> failure.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162)
> at java.base/java.util.Optional.orElseGet(Optional.java:364)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636)
> at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
> at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
> at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: java.util.ConcurrentModificationException: null
> at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584)
> at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1617)
> at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1615)
> at
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:194)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:161)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:178)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:84)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:45)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:462)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
> at
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
> ... 6 common frames omitted
> {code}
>
> Example deserialization failure:
> {code:java}
> java.lang.IllegalArgumentException: Invalid identifier:
> at
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:75)
> at
> org.apache.flink.cdc.connectors.shaded.io.debezium.text.TokenStream.start(TokenStream.java:446)
> at
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser.parse(TableIdParser.java:31)
> at
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parseParts(TableId.java:51)
> at
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:40)
> at
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:27)
> at
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.readTableSchemas(MySqlSplitSerializer.java:210)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:283)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:318)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:139)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:108)
> at
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:45)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:489)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:384)
> at
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:390)
> at
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143)
> at
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
> at java.base/java.lang.Thread.run(Thread.java:831)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)