Sergei Morozov created FLINK-36891: -------------------------------------- Summary: MySQL CDC connector produces corrupted state in case of serialization failure Key: FLINK-36891 URL: https://issues.apache.org/jira/browse/FLINK-36891 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.1 Reporter: Sergei Morozov
PendingSplitsStateSerializer maintains a {{DataOutputSerializer}} instance stored in {{{}SERIALIZER_CACHE{}}}. If a call to {{serialize()}} fails with an exception, then the value returned by a subsequent call will contain the partial results of the previous serialization. As a result, this state is corrupted and cannot be deserialized. Example serialization failure: {code:java} org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162) at java.base/java.util.Optional.orElseGet(Optional.java:364) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: java.util.ConcurrentModificationException: null at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1617) at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1615) at org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:194) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:161) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:178) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:84) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:45) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:462) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ... 6 common frames omitted {code} Example deserialization failure: {code:java} java.lang.IllegalArgumentException: Invalid identifier: at org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:75) at org.apache.flink.cdc.connectors.shaded.io.debezium.text.TokenStream.start(TokenStream.java:446) at org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser.parse(TableIdParser.java:31) at org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parseParts(TableId.java:51) at org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:40) at org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:27) at org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.readTableSchemas(MySqlSplitSerializer.java:210) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:283) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:318) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:139) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:108) at org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:45) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:489) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:384) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:390) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143) at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77) at java.base/java.lang.Thread.run(Thread.java:831) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)