[
https://issues.apache.org/jira/browse/FLINK-24855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441746#comment-17441746
]
Chesnay Schepler commented on FLINK-24855:
------------------------------------------
Are there any other errors being logged?
Looking at {{ThreadPoolExecutor#processWorkerExit}}, it states that this method
is called by the worker thread. Is that the SourceCoordinator thread then?
In this method it then also starts a new worker (creating a new thread) to
replace the lost worker (aka, itself?), but if the worker thread is running
this method, then surely it is still alive. Then it would make sense that the
factory throws an error.
FLINK-24545 mentions cases where this happens due to idleness, but exceptions
also seem to cause the same issue:
{code}
final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory factory =
new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
"op", Thread.currentThread().getContextClassLoader());
final ExecutorService scheduledExecutorService =
Executors.newSingleThreadExecutor(factory);
scheduledExecutorService.execute(
() -> {
try {
// wait a bit to ensure the second operation is queued
Thread.sleep(1000);
} catch (InterruptedException e) {
}
throw new RuntimeException();
// this also works: Thread.currentThread().stop();
});
scheduledExecutorService.submit(() -> {}).get();
{code}
[~sewen]; you mentioned in FLINK-22454 that we should consider removing this
check altogether. Did we follow-up on that?
> Source Coordinator Thread already exists. There should never be more than one
> thread driving the actions of a Source Coordinator.
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24855
> URL: https://issues.apache.org/jira/browse/FLINK-24855
> Project: Flink
> Issue Type: Bug
> Components: API / Core, Runtime / Coordination
> Affects Versions: 1.13.3
> Environment: flink 1.13.3
> flink-cdc 2.1
> Reporter: WangMinChao
> Priority: Blocker
>
>
> When I am synchronizing large tables, have the following problems :
> 2021-11-09 20:33:04,222 INFO
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator []
> - Assign split MySqlSnapshotSplit\{tableId=db.table, splitId='db.table:383',
> splitKeyType=[`id` BIGINT NOT NULL], splitStart=[9798290],
> splitEnd=[9823873], highWatermark=null} to subtask 1
> 2021-11-09 20:33:04,248 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 101 (type=CHECKPOINT) @ 1636461183945 for job
> 3cee105643cfee78b80cd0a41143b5c1.
> 2021-11-09 20:33:10,734 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread
> 'SourceCoordinator-Source: mysqlcdc-source -> Sink: kafka-sink' produced an
> uncaught exception. Stopping the process...
> java.lang.Error: Source Coordinator Thread already exists. There should never
> be more than one thread driving the actions of a Source Coordinator. Existing
> Thread: Thread[SourceCoordinator-Source: mysqlcdc-source -> Sink:
> kafka-sink,5,main]
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:119)
> [flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619)
> ~[?:1.8.0_191]
> at
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
> ~[?:1.8.0_191]
> at
> java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025)
> ~[?:1.8.0_191]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
> ~[?:1.8.0_191]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_191]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)