[ 
https://issues.apache.org/jira/browse/FLINK-39315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18081568#comment-18081568
 ] 

Ran Tao commented on FLINK-39315:
---------------------------------

 
{code:java}
2026-05-14 19:41:18,573 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application FAILED: 
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: FAILED
  at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$7(ApplicationDispatcherBootstrap.java:410)
 ~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
~[?:1.8.0_362]
  at 
org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)
 ~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
~[?:1.8.0_362]
  at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
 ~[?:?]
  at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
~[?:1.8.0_362]
  at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277) 
~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
~[?:1.8.0_362]
  at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
~[?:1.8.0_362]
  at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at akka.dispatch.OnComplete.internal(Future.scala:300) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at akka.dispatch.OnComplete.internal(Future.scala:297) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) 
~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
 ~[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) 
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) 
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) 
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
[flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
 [flink-rpc-akka_c0d780d6-328a-465b-a944-a71296145164.jar:1.16-byted-SNAPSHOT]
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_362]
  at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_362]
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_362]
  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_362]
Caused by: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: FAILED
  at 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)
 ~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  ... 52 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
  at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 ~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  at 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)
 ~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  ... 52 more
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'SchemaMapper -> Assign Bucket' (operator 
9899a42c64d67ef3172b7e3be3c1bbb9).
  at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:662)
 ~[flink-dist-1.16-byted-SNAPSHOT.jar:1.16-byted-SNAPSHOT]
  at 
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.failJob(SchemaRegistry.java:392)
 ~[?:?]
  at 
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:291)
 ~[?:?]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_362]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_362]
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_362]
  at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to apply schema 
change event.
  at 
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:291)
 ~[?:?]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_362]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_362]
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_362]
  at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
Caused by: java.util.concurrent.TimeoutException: Loop checking time limit has 
exceeded.
  at 
org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry.loopWhen(SchemaRegistry.java:377)
 ~[?:?]
  at 
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.startSchemaChange(SchemaCoordinator.java:318)
 ~[?:?]
  at 
org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaCoordinator.lambda$handleSchemaEvolveRequest$1(SchemaCoordinator.java:289)
 ~[?:?]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_362]
  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_362]
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_362]
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_362]
  at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]{code}
 

> MySql cdc connector could get stuck in backfill binlog reading after a 
> failover within snapshot phase when the MySql table is being continuously 
> written
> --------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39315
>                 URL: https://issues.apache.org/jira/browse/FLINK-39315
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0, cdc-3.5.0
>            Reporter: Cong Cheng
>            Assignee: Cong Cheng
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> This issue is different from FLINK-39207. Even with FLINK-39207 fixed, the 
> MySQL CDC source can still hang during the snapshot backfill phase when 
> processing multiple snapshot splits sequentially while reusing the same 
> SnapshotSplitReader (also reusing the same BinaryLogClient) .
> When `MySqlSourceReader` processes multiple snapshot splits sequentially 
> (reusing the same `BinaryLogClient` across splits), the job can get stuck and 
> hang indefinitely in `SnapshotSplitReader.pollWithBuffer()` during the 
> snapshot backfill phase, waiting for `BINLOG_END` while the queue remains 
> empty.
> h3. Root Cause Analysis
>  # `SnapshotSplitReader.pollWithBuffer()` keeps polling 
> `ChangeEventQueue.poll()` until it receives the `BINLOG_END` watermark event; 
> otherwise it will wait indefinitely.
>  # The MySQL CDC implementation reuses a single `BinaryLogClient` instance 
> across split executions (via reusing `StatefulTaskContext` / 
> `MySqlTaskContextImpl`).
>  # `StatefulTaskContext.configure()` rebuilds `ChangeEventQueue` / 
> `EventDispatcher` / `SignalEventDispatcher` for each split, so each split has 
> a different target queue/dispatcher.
>  # In `MySqlStreamingChangeEventSource.execute()`, each execution registers 
> multiple `BinaryLogClient` event/lifecycle listeners (e.g. the main event 
> listener, lifecycle listener, `onEvent`, debug listener), but the 
> implementation does not unregister these listeners when the execution 
> finishes (it only disconnects the client).
>  # Therefore, listeners from previous splits accumulate and remain active 
> when later splits start the backfill binlog reading.
>  # During backfill of a later split, binlog events will still trigger 
> callbacks of an old split’s listener/task. When the current binlog offset 
> advances to a point that satisfies the old split’s stop condition (e.g. 
> `currentOffset >= oldEndingOffset`, common under continuous writes), the old 
> listener can:
>  ## stop the shared `StoppableChangeEventSourceContext` (by calling 
> `stopChangeEventSource()`), causing the *current* split’s backfill loop to 
> exit prematurely; and/or
>  ## dispatch `BINLOG_END` via the old `SignalEventDispatcher` into the old 
> `ChangeEventQueue` (because the old listener holds the old dispatcher/queue 
> created in its split’s `StatefulTaskContext.configure()`).
>  # As a result, the current `pollWithBuffer()` is polling the new queue and 
> never receives `BINLOG_END`, while the backfill thread has already stopped 
> without surfacing an exception, leading to a deadlock/hang.
> h3. Steps to Reproduce
>  # Configure a MySQL CDC Source with scan.incremental.snapshot.chunk.size set 
> to a large value to ensure a snapshot split is time consuming to read;
>  # Keep continuous writes on the MySql source table so binlog offsets advance;
>  # Trigger a TaskManager failover while the job is in the snapshot phase;
>  # Observe that the job hangs after processing the first split.
> A regression test is also included in the PR (e.g., 
> SnapshotSplitReaderTest#testMultipleSplitsWithBackfill ). It reproduces the 
> hang on the buggy version if the fix code splits are commented out and passes 
> with the fix applied.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to