[ https://issues.apache.org/jira/browse/FLINK-37975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
tongyp updated FLINK-37975: --------------------------- Summary: mysql sync a lot sharding table to doris, Post Transform failed (was: sync a lot sharding table to doris, Post Transform failed) > mysql sync a lot sharding table to doris, Post Transform failed > --------------------------------------------------------------- > > Key: FLINK-37975 > URL: https://issues.apache.org/jira/browse/FLINK-37975 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.4.0 > Reporter: tongyp > Priority: Major > > env: flinkcdc 3.4 pipeline + flink 1.20.1 > i sync about 10000 sharding tables to doris,the task will failed。the > jobmanager log is > > 2025-06-17 17:41:25,211 INFO > org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator > [] - The enumerator assigns split MySqlSnapshotSplit{tableId=xxx, > splitId='xx', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, > splitEnd=null, highWatermark=null} to subtask 0 > 2025-06-17 17:41:26,070 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Transform:Data -> SchemaOperator -> PrePartition (1/1) > (219052134362b9a800a3f2246605fbac_90bea66de1c231edf33913ecd54406c1_0_0) > switched from RUNNING to FAILED on localhost:44643-f3a1f2 @ localhost > (dataPort=40666). > org.apache.flink.util.SerializedThrowable: > org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException: > Failed to post-transform with > CreateTableEvent\{xxx} > for table > xx > from schema > columns={xxx) > to schema > columns= > {xxx). at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:146) > ~[?:?] at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist-1.20.1.jar:1.20.1] at java.lang.Thread.run(Thread.java:829) > ~[?:?] Caused by: org.apache.flink.util.SerializedThrowable: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > ~[flink-dist-1.20.1.jar:1.20.1] at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist-1.20.1.jar:1.20.1] at > java.util.Optional.ifPresent(Optional.java:183) ~[?:?] at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175) > ~[?:?] at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130) > ~[?:?] ... 13 more Caused by: org.apache.flink.util.SerializedThrowable: > java.lang.IllegalStateException: Failed to send request to coordinator: > SchemaChangeRequest\\{xxx} > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:241) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.requestSchemaChange(SchemaOperator.java:227) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleSchemaChangeEvent(SchemaOperator.java:173) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:148) > ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ~[flink-dist-1.20.1.jar:1.20.1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > ~[flink-dist-1.20.1.jar:1.20.1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist-1.20.1.jar:1.20.1] > at java.util.Optional.ifPresent(Optional.java:183) ~[?:?] > at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130) > ~[?:?] > ... 13 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.util.concurrent.ExecutionException: > java.util.concurrent.TimeoutException: Invocation of > [RemoteRpcInvocation(JobMasterOperatorEventGateway.sendRequestToCoordinator(OperatorID, > SerializedValue))] at recipient > [pekko.tcp://flink@localhost:6123/user/rpc/jobmanager_2] timed out. This is > usually caused by: 1) Pekko failed sending the message silently, due to > problems like oversized payload or serialization failures. In that case, you > should find detailed error information in the logs. 2) The recipient needs > more time for responding, due to problems like slow machines or network > jitters. In that case, you can try to increase pekko.ask.timeout. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > ~[?:?] > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2028) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:238) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.requestSchemaChange(SchemaOperator.java:227) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleSchemaChangeEvent(SchemaOperator.java:173) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:148) > ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) > ~[flink-dist-1.20.1.jar:1.20.1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) > ~[flink-dist-1.20.1.jar:1.20.1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist-1.20.1.jar:1.20.1] > at java.util.Optional.ifPresent(Optional.java:183) ~[?:?] > at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175) > ~[?:?] > at > org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130) > ~[?:?] > ... 13 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.util.concurrent.TimeoutException: Invocation of > [RemoteRpcInvocation(JobMasterOperatorEventGateway.sendRequestToCoordinator(OperatorID, > SerializedValue))] at recipient > [pekko.tcp://flink@localhost:6123/user/rpc/jobmanager_2] timed out. This is > usually caused by: 1) Pekko failed sending the message silently, due to > problems like oversized payload or serialization failures. In that case, you > should find detailed error information in the logs. 2) The recipient needs > more time for responding, due to problems like slow machines or network > jitters. In that case, you can try to increase pekko.ask.timeout. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)