[ 
https://issues.apache.org/jira/browse/FLINK-36891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-36891.
--------------------------------
    Resolution: Fixed

Fixed via master(3.3-SNAPSHOT): b50d1728e083c2364ae104c9050a4599d1301ab9

> MySQL CDC connector produces corrupted state in case of serialization failure
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-36891
>                 URL: https://issues.apache.org/jira/browse/FLINK-36891
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.1
>            Reporter: Sergei Morozov
>            Assignee: Sergei Morozov
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.3.0
>
>
> PendingSplitsStateSerializer maintains a {{DataOutputSerializer}} instance 
> stored in {{{}SERIALIZER_CACHE{}}}. If a call to {{serialize()}} fails with 
> an exception, then the value returned by a subsequent call will contain the 
> partial results of the previous call.
> As a result, this state is corrupted and cannot be deserialized.
> Example serialization failure:
> {code:java}
> org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint 
> failure.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162)
>     at java.base/java.util.Optional.orElseGet(Optional.java:364)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636)
>     at 
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
>     at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>     at 
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>     at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: java.util.ConcurrentModificationException: null
>     at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584)
>     at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1617)
>     at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1615)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:194)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:161)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:178)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:84)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:45)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:462)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321)
>     at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
>     at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>     ... 6 common frames omitted
> {code}
>  
> Example deserialization failure:
> {code:java}
> java.lang.IllegalArgumentException: Invalid identifier: 
>       at 
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:75)
>       at 
> org.apache.flink.cdc.connectors.shaded.io.debezium.text.TokenStream.start(TokenStream.java:446)
>       at 
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser.parse(TableIdParser.java:31)
>       at 
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parseParts(TableId.java:51)
>       at 
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:40)
>       at 
> org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:27)
>       at 
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.readTableSchemas(MySqlSplitSerializer.java:210)
>       at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:283)
>       at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:318)
>       at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:139)
>       at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:108)
>       at 
> org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:45)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:489)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:384)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:390)
>       at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144)
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>       at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>       at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>       at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143)
>       at 
> org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
>       at java.base/java.lang.Thread.run(Thread.java:831)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to