Sergei Morozov created FLINK-36891:
--------------------------------------

             Summary: MySQL CDC connector produces corrupted state in case of 
serialization failure
                 Key: FLINK-36891
                 URL: https://issues.apache.org/jira/browse/FLINK-36891
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.2.1
            Reporter: Sergei Morozov


PendingSplitsStateSerializer maintains a {{DataOutputSerializer}} instance 
stored in {{{}SERIALIZER_CACHE{}}}. If a call to {{serialize()}} fails with an 
exception, then the value returned by a subsequent call will contain the 
partial results of the previous serialization.

As a result, this state is corrupted and cannot be deserialized.

Example serialization failure:
{code:java}
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint 
failure.
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162)
    at java.base/java.util.Optional.orElseGet(Optional.java:364)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908)
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636)
    at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
    at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
    at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.util.ConcurrentModificationException: null
    at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584)
    at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1617)
    at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1615)
    at 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:194)
    at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:161)
    at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:178)
    at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:84)
    at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:45)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:462)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
    ... 6 common frames omitted
{code}
 

Example deserialization failure:
{code:java}
java.lang.IllegalArgumentException: Invalid identifier: 
        at 
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:75)
        at 
org.apache.flink.cdc.connectors.shaded.io.debezium.text.TokenStream.start(TokenStream.java:446)
        at 
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser.parse(TableIdParser.java:31)
        at 
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parseParts(TableId.java:51)
        at 
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:40)
        at 
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:27)
        at 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.readTableSchemas(MySqlSplitSerializer.java:210)
        at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:283)
        at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:318)
        at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:139)
        at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:108)
        at 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:45)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:489)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:384)
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:390)
        at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143)
        at 
org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
        at java.base/java.lang.Thread.run(Thread.java:831)
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to