[ 
https://issues.apache.org/jira/browse/FLINK-37537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938825#comment-17938825
 ] 

tianzhu.wen commented on FLINK-37537:
-------------------------------------

To help troubleshoot effectively, could you share the steps to reproduce 
(including test data if applicable), this will allow me to replicate the issue 
locally with precision.

> Duplicate key when job failover in SchemaOperator
> -------------------------------------------------
>
>                 Key: FLINK-37537
>                 URL: https://issues.apache.org/jira/browse/FLINK-37537
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.3.0
>            Reporter: linqigeng
>            Priority: Major
>
> When SchemaOperator receives SchameChangeEvent, if exceptions occur and cause 
> failover, the latest CreateTableEvent will be automatically issued when the 
> job is restored from the previous checkpoint.  When the AddColumnEvent is 
> processed in `SchemaMergingUtils#coerceRow` later would cause `Duplicate key` 
> exception.
> {code:java}
> java.lang.IllegalStateException: Duplicate key not_show (attempted merging 
> values TINYINT and TINYINT)
>     at java.base/java.util.stream.Collectors.duplicateKeyException(Unknown 
> Source)
>     at 
> java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Unknown 
> Source)
>     at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown 
> Source)
>     at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(Unknown 
> Source)
>     at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
>     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
> Source)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
> Source)
>     at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
>     at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
>     at 
> org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:270)
>     at 
> org.apache.flink.cdc.common.utils.SchemaMergingUtils.coerceRow(SchemaMergingUtils.java:253)
>     at 
> org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator.coerceDataRecord(SchemaDerivator.java:334)
>     at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:227)
>     at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
>     at 
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:147)
>     at java.base/java.util.Collections$SingletonList.forEach(Unknown Source)
>     at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:120)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:121)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
>     at 
> org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:421)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to