[ 
https://issues.apache.org/jira/browse/FLINK-37975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tongyp updated FLINK-37975:
---------------------------
    Summary: mysql sync a lot sharding table to doris, Post Transform failed  
(was: sync a lot sharding table to doris, Post Transform failed)

> mysql sync a lot sharding table to doris, Post Transform failed
> ---------------------------------------------------------------
>
>                 Key: FLINK-37975
>                 URL: https://issues.apache.org/jira/browse/FLINK-37975
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>            Reporter: tongyp
>            Priority: Major
>
> env: flinkcdc 3.4 pipeline + flink 1.20.1
> i sync about 10000 sharding tables to doris,the task will failed。the 
> jobmanager log is
>  
> 2025-06-17 17:41:25,211 INFO  
> org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator 
> [] - The enumerator assigns split MySqlSnapshotSplit{tableId=xxx, 
> splitId='xx', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, 
> splitEnd=null, highWatermark=null} to subtask 0
> 2025-06-17 17:41:26,070 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
> Transform:Data -> SchemaOperator -> PrePartition (1/1) 
> (219052134362b9a800a3f2246605fbac_90bea66de1c231edf33913ecd54406c1_0_0) 
> switched from RUNNING to FAILED on localhost:44643-f3a1f2 @ localhost 
> (dataPort=40666).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
>  Failed to post-transform with
> CreateTableEvent\{xxx}
> for table
> xx
> from schema
> columns={xxx)
> to schema
> columns=
> {xxx). at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:146)
>  ~[?:?] at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) 
> ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
> ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist-1.20.1.jar:1.20.1] at java.lang.Thread.run(Thread.java:829) 
> ~[?:?] Caused by: org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist-1.20.1.jar:1.20.1] at 
> java.util.Optional.ifPresent(Optional.java:183) ~[?:?] at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175)
>  ~[?:?] at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130)
>  ~[?:?] ... 13 more Caused by: org.apache.flink.util.SerializedThrowable: 
> java.lang.IllegalStateException: Failed to send request to coordinator: 
> SchemaChangeRequest\\{xxx}
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:241)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.requestSchemaChange(SchemaOperator.java:227)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleSchemaChangeEvent(SchemaOperator.java:173)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:148)
>  ~[?:?]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>  ~[flink-dist-1.20.1.jar:1.20.1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>  ~[flink-dist-1.20.1.jar:1.20.1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist-1.20.1.jar:1.20.1]
> at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130)
>  ~[?:?]
> ... 13 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: 
> java.util.concurrent.TimeoutException: Invocation of 
> [RemoteRpcInvocation(JobMasterOperatorEventGateway.sendRequestToCoordinator(OperatorID,
>  SerializedValue))] at recipient 
> [pekko.tcp://flink@localhost:6123/user/rpc/jobmanager_2] timed out. This is 
> usually caused by: 1) Pekko failed sending the message silently, due to 
> problems like oversized payload or serialization failures. In that case, you 
> should find detailed error information in the logs. 2) The recipient needs 
> more time for responding, due to problems like slow machines or network 
> jitters. In that case, you can try to increase pekko.ask.timeout.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2028) 
> ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:238)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.requestSchemaChange(SchemaOperator.java:227)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleSchemaChangeEvent(SchemaOperator.java:173)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:148)
>  ~[?:?]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
>  ~[flink-dist-1.20.1.jar:1.20.1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
>  ~[flink-dist-1.20.1.jar:1.20.1]
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist-1.20.1.jar:1.20.1]
> at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175)
>  ~[?:?]
> at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130)
>  ~[?:?]
> ... 13 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.TimeoutException: Invocation of 
> [RemoteRpcInvocation(JobMasterOperatorEventGateway.sendRequestToCoordinator(OperatorID,
>  SerializedValue))] at recipient 
> [pekko.tcp://flink@localhost:6123/user/rpc/jobmanager_2] timed out. This is 
> usually caused by: 1) Pekko failed sending the message silently, due to 
> problems like oversized payload or serialization failures. In that case, you 
> should find detailed error information in the logs. 2) The recipient needs 
> more time for responding, due to problems like slow machines or network 
> jitters. In that case, you can try to increase pekko.ask.timeout.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to