[
https://issues.apache.org/jira/browse/FLINK-39315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081568#comment-18081568
]
Ran Tao edited comment on FLINK-39315 at 5/18/26 4:12 AM:
----------------------------------------------------------
The task will hang because the FlushEvent cannot be aligned until a timeout
occurs.
{code:java}
2026-05-14 19:41:18,573 INFO
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
[] - Application FAILED:
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
Application Status: FAILED
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$7(ApplicationDispatcherBootstrap.java:410)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
~[?:?]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.OnComplete.internal(Future.scala:300)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.OnComplete.internal(Future.scala:297)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[?:1.8.0_362]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[?:1.8.0_362]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[?:1.8.0_362]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
[?:1.8.0_362]
Caused by:
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
Application Status: FAILED
at
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
... 52 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
... 52 more
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'SchemaMapper -> Assign Bucket' (operator
9899a42c64d67ef3172b7e3be3c1bbb9).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:662)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.failJob(SchemaRegistry.java:392)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:291)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to apply schema
change event.
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:291)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
Caused by: java.util.concurrent.TimeoutException: Loop checking time limit has
exceeded.
at
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.loopWhen(SchemaRegistry.java:377)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.startSchemaChange(SchemaCoordinator.java:318)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:289)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]{code}
was (Author: lemonjing):
{code:java}
2026-05-14 19:41:18,573 INFO
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
[] - Application FAILED:
java.util.concurrent.CompletionException:
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
Application Status: FAILED
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$7(ApplicationDispatcherBootstrap.java:410)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
~[?:?]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_362]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
~[?:1.8.0_362]
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.OnComplete.internal(Future.scala:300)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.OnComplete.internal(Future.scala:297)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[?:1.8.0_362]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[?:1.8.0_362]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[?:1.8.0_362]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
[?:1.8.0_362]
Caused by:
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
Application Status: FAILED
at
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
... 52 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
... 52 more
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'SchemaMapper -> Assign Bucket' (operator
9899a42c64d67ef3172b7e3be3c1bbb9).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:662)
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
at
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.failJob(SchemaRegistry.java:392)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:291)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to apply schema
change event.
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:291)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
Caused by: java.util.concurrent.TimeoutException: Loop checking time limit has
exceeded.
at
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.loopWhen(SchemaRegistry.java:377)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.startSchemaChange(SchemaCoordinator.java:318)
~[?:?]
at
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:289)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_362]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]{code}
> MySql cdc connector could get stuck in backfill binlog reading after a
> failover within snapshot phase when the MySql table is being continuously
> written
> --------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39315
> URL: https://issues.apache.org/jira/browse/FLINK-39315
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.4.0, cdc-3.5.0
> Reporter: Cong Cheng
> Assignee: Cong Cheng
> Priority: Major
> Labels: pull-request-available
>
> h3. Summary
> This issue is different from FLINK-39207. Even with FLINK-39207 fixed, the
> MySQL CDC source can still hang during the snapshot backfill phase when
> processing multiple snapshot splits sequentially while reusing the same
> SnapshotSplitReader (also reusing the same BinaryLogClient) .
> When `MySqlSourceReader` processes multiple snapshot splits sequentially
> (reusing the same `BinaryLogClient` across splits), the job can get stuck and
> hang indefinitely in `SnapshotSplitReader.pollWithBuffer()` during the
> snapshot backfill phase, waiting for `BINLOG_END` while the queue remains
> empty.
> h3. Root Cause Analysis
> # `SnapshotSplitReader.pollWithBuffer()` keeps polling
> `ChangeEventQueue.poll()` until it receives the `BINLOG_END` watermark event;
> otherwise it will wait indefinitely.
> # The MySQL CDC implementation reuses a single `BinaryLogClient` instance
> across split executions (via reusing `StatefulTaskContext` /
> `MySqlTaskContextImpl`).
> # `StatefulTaskContext.configure()` rebuilds `ChangeEventQueue` /
> `EventDispatcher` / `SignalEventDispatcher` for each split, so each split has
> a different target queue/dispatcher.
> # In `MySqlStreamingChangeEventSource.execute()`, each execution registers
> multiple `BinaryLogClient` event/lifecycle listeners (e.g. the main event
> listener, lifecycle listener, `onEvent`, debug listener), but the
> implementation does not unregister these listeners when the execution
> finishes (it only disconnects the client).
> # Therefore, listeners from previous splits accumulate and remain active
> when later splits start the backfill binlog reading.
> # During backfill of a later split, binlog events will still trigger
> callbacks of an old split’s listener/task. When the current binlog offset
> advances to a point that satisfies the old split’s stop condition (e.g.
> `currentOffset >= oldEndingOffset`, common under continuous writes), the old
> listener can:
> ## stop the shared `StoppableChangeEventSourceContext` (by calling
> `stopChangeEventSource()`), causing the *current* split’s backfill loop to
> exit prematurely; and/or
> ## dispatch `BINLOG_END` via the old `SignalEventDispatcher` into the old
> `ChangeEventQueue` (because the old listener holds the old dispatcher/queue
> created in its split’s `StatefulTaskContext.configure()`).
> # As a result, the current `pollWithBuffer()` is polling the new queue and
> never receives `BINLOG_END`, while the backfill thread has already stopped
> without surfacing an exception, leading to a deadlock/hang.
> h3. Steps to Reproduce
> # Configure a MySQL CDC Source with scan.incremental.snapshot.chunk.size set
> to a large value to ensure a snapshot split is time consuming to read;
> # Keep continuous writes on the MySql source table so binlog offsets advance;
> # Trigger a TaskManager failover while the job is in the snapshot phase;
> # Observe that the job hangs after processing the first split.
> A regression test is also included in the PR (e.g.,
> SnapshotSplitReaderTest#testMultipleSplitsWithBackfill ). It reproduces the
> hang on the buggy version if the fix code splits are commented out and passes
> with the fix applied.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)