linqigeng created FLINK-37537:
---------------------------------
Summary: Duplicate key when job failover in SchemaOperator
Key: FLINK-37537
URL: https://issues.apache.org/jira/browse/FLINK-37537
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.3.0
Reporter: linqigeng
When SchemaOperator receives SchameChangeEvent, if exceptions occur and cause
failover, the latest CreateTableEvent will be automatically issued when the job
is restored from the previous checkpoint. When the AddColumnEvent is processed
in `SchemaMergingUtils#coerceRow` later would cause `Duplicate key` exception.
{code:java}
java.lang.IllegalStateException: Duplicate key not_show (attempted merging
values TINYINT and TINYINT)
at java.base/java.util.stream.Collectors.duplicateKeyException(Unknown
Source)
at
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Unknown
Source)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source)
at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at
org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:270)
at
org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:253)
at
org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator.coerceDataRecord(SchemaDerivator.java:334)
at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:227)
at
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147)
at java.base/java.util.Collections$SingletonList.forEach(Unknown Source)
at
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120)
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:121)
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
at
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:421)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)