[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847197#comment-17847197
 ] 

Yechao Chen commented on FLINK-35296:
-------------------------------------

our version is 3.0.1 

log:
 
{code:java}
2024-05-17 14:14:29.787 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Handling 
split change 
SplitAddition:[[MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}]]
2024-05-17 14:14:29.787 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished 
reading from splits [ads_brand.data_convert_nodup_log:21074]
2024-05-17 14:14:29.787 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Split read 
by poll split,binlogRead=false,snpshotRead=true
2024-05-17 14:14:29.787 WARN io.debezium.connector.mysql.MySqlConnection - 
Database configuration option 'serverTimezone' is set but is obsolete, please 
use 'connectionTimeZone' instead
2024-05-17 14:14:29.821 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext - 
Starting offset is initialized to {ts_sec=0, file=, pos=0, kind=EARLIEST, 
row=0, event=0}
2024-05-17 14:14:29.822 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Snapshot 
currentReader submitSplit has 
finished,nextSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:29.825 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.146439, 
pos=332807150, kind=SPECIFIC, gtids=, row=0, event=0} for split 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:29.825 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Snapshot step 2 - Snapshotting data
2024-05-17 14:14:29.825 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Exporting data from split 'ads_brand.data_convert_nodup_log:21075' of table 
ads_brand.data_convert_nodup_log
2024-05-17 14:14:29.825 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
For split 'ads_brand.data_convert_nodup_log:21075' of table 
ads_brand.data_convert_nodup_log using select statement: 'SELECT * FROM 
`ads_brand`.`data_convert_nodup_log` WHERE `id` >= ? AND NOT (`id` = ?) AND 
`id` <= ?'
2024-05-17 14:14:29.967 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished 
reading split(s) [ads_brand.data_convert_nodup_log:21074]
2024-05-17 14:14:29.967 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader - Split read 
has finished,check read next split,requestNextSplit=true
2024-05-17 14:14:29.970 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader - Source 
reader 1 adds split 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:29.970 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase - Adding 
split(s) to reader: 
[MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}]
2024-05-17 14:14:30.214 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Finished exporting 15917 records for split 
'ads_brand.data_convert_nodup_log:21075', total duration '00:00:00.389'
2024-05-17 14:14:30.217 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.146439, 
pos=333744355, kind=SPECIFIC, gtids=, row=0, event=0} for split 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:30.218 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Mysql 
Snapshot read has 
finished,currentSnapshotSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log,
 splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT 
NOT NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:30.218 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Back 
fill to starting execute binlog read task,currentSnapshotSplit = 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:30.218 INFO io.debezium.util.Threads - Requested thread 
factory for connector MySqlConnector, id = mysql_binlog_source named = 
binlog-client
2024-05-17 14:14:30.218 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - 
Starting binlog read 
task,currentSnapshotSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log,
 splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT 
NOT NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:30.225 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Skip 0 events on 
streaming start
2024-05-17 14:14:30.225 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Skip 0 rows on 
streaming start
2024-05-17 14:14:30.225 INFO io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-05-17 14:14:30.228 INFO io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-05-17 14:14:30.243 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Connected to 
MySQL binlog at mysql-xima-slave-073.ximalaya.local:3306, starting at 
MySqlOffsetContext 
[sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, 
sourceInfo=SourceInfo [currentGtid=null, 
currentBinlogFilename=mysql-bin.146439, currentBinlogPosition=332807150, 
currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, 
currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, 
transactionContext=TransactionContext [currentTransactionId=null, 
perTableEventCount={}, totalEventCount=0], restartGtidSet=null, 
currentGtidSet=null, restartBinlogFilename=mysql-bin.146439, 
restartBinlogPosition=332807150, restartRowsToSkip=0, restartEventsToSkip=0, 
currentEventLengthInBytes=0, inTransaction=false, transactionId=null, 
incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, 
chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, 
maximumKey=null]]
2024-05-17 14:14:30.243 INFO io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-05-17 14:14:30.243 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Waiting for 
keepalive thread to start
2024-05-17 14:14:30.296 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - 
CurrentReader has init 
finished,dataIt=false,currentReader.class=SnapshotSplitReader
2024-05-17 14:14:30.299 INFO io.debezium.jdbc.JdbcConnection - Connection 
gracefully closed
2024-05-17 14:14:30.300 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Stopped reading 
binlog after 0 events, last recorded offset: {transaction_id=null, 
ts_sec=1715926470, file=mysql-bin.146439, pos=333963403, server_id=3089103, 
event=4}
2024-05-17 14:14:35.243 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Client disconnect 
is successfully,taskId=0
2024-05-17 14:14:35.244 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask - 
Binlog split read task execute has finished,startOffset={ts_sec=0, 
file=mysql-bin.146439, pos=332807150, kind=SPECIFIC, gtids=, row=0, 
event=0},endOffset={ts_sec=0, file=mysql-bin.146439, pos=333744355, 
kind=SPECIFIC, gtids=, row=0, event=0}
2024-05-17 14:14:35.244 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Binlog 
read task has 
finished,currentSnapshotSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log,
 splitId='ads_brand.data_convert_nodup_log:21075', splitKeyType=[`id` BIGINT 
NOT NULL], splitStart=[626055826], splitEnd=[626085527], highWatermark=null}
2024-05-17 14:14:35.244 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Source 
record has fetch finished,subtaskId=1
2024-05-17 14:14:35.244 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Handling 
split change 
SplitAddition:[[MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}]]
2024-05-17 14:14:35.244 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished 
reading from splits [ads_brand.data_convert_nodup_log:21075]
2024-05-17 14:14:35.244 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Split read 
by poll split,binlogRead=false,snpshotRead=true
2024-05-17 14:14:35.244 WARN io.debezium.connector.mysql.MySqlConnection - 
Database configuration option 'serverTimezone' is set but is obsolete, please 
use 'connectionTimeZone' instead
2024-05-17 14:14:35.282 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext - 
Starting offset is initialized to {ts_sec=0, file=, pos=0, kind=EARLIEST, 
row=0, event=0}
2024-05-17 14:14:35.283 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Snapshot 
currentReader submitSplit has 
finished,nextSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:35.286 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.146439, 
pos=346179811, kind=SPECIFIC, gtids=, row=0, event=0} for split 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:35.286 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Snapshot step 2 - Snapshotting data
2024-05-17 14:14:35.286 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Exporting data from split 'ads_brand.data_convert_nodup_log:21076' of table 
ads_brand.data_convert_nodup_log
2024-05-17 14:14:35.286 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
For split 'ads_brand.data_convert_nodup_log:21076' of table 
ads_brand.data_convert_nodup_log using select statement: 'SELECT * FROM 
`ads_brand`.`data_convert_nodup_log` WHERE `id` >= ?'
2024-05-17 14:14:35.422 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished 
reading split(s) [ads_brand.data_convert_nodup_log:21075]
2024-05-17 14:14:35.422 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader - Split read 
has finished,check read next split,requestNextSplit=true
2024-05-17 14:14:39.726 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Finished exporting 175433 records for split 
'ads_brand.data_convert_nodup_log:21076', total duration '00:00:04.44'
2024-05-17 14:14:39.729 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask - 
Snapshot step 3 - Determining high watermark {ts_sec=0, file=mysql-bin.146439, 
pos=355392617, kind=SPECIFIC, gtids=, row=0, event=0} for split 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:39.730 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Mysql 
Snapshot read has 
finished,currentSnapshotSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log,
 splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT 
NOT NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:39.730 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Back 
fill to starting execute binlog read task,currentSnapshotSplit = 
MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log, 
splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT NOT 
NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:39.730 INFO io.debezium.util.Threads - Requested thread 
factory for connector MySqlConnector, id = mysql_binlog_source named = 
binlog-client
2024-05-17 14:14:39.730 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - 
Starting binlog read 
task,currentSnapshotSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log,
 splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT 
NOT NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:39.737 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Skip 0 events on 
streaming start
2024-05-17 14:14:39.738 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Skip 0 rows on 
streaming start
2024-05-17 14:14:39.738 INFO io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-05-17 14:14:39.740 INFO io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-05-17 14:14:39.756 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Connected to 
MySQL binlog at mysql-xima-slave-073.ximalaya.local:3306, starting at 
MySqlOffsetContext 
[sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, 
sourceInfo=SourceInfo [currentGtid=null, 
currentBinlogFilename=mysql-bin.146439, currentBinlogPosition=346179811, 
currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, 
currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, 
transactionContext=TransactionContext [currentTransactionId=null, 
perTableEventCount={}, totalEventCount=0], restartGtidSet=null, 
currentGtidSet=null, restartBinlogFilename=mysql-bin.146439, 
restartBinlogPosition=346179811, restartRowsToSkip=0, restartEventsToSkip=0, 
currentEventLengthInBytes=0, inTransaction=false, transactionId=null, 
incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, 
chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, 
maximumKey=null]]
2024-05-17 14:14:39.756 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Waiting for 
keepalive thread to start
2024-05-17 14:14:39.756 INFO io.debezium.util.Threads - Creating thread 
debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-05-17 14:14:39.872 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Keepalive thread 
is running
2024-05-17 14:14:39.956 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Stopped reading 
binlog after 0 events, last recorded offset: {transaction_id=null, 
ts_sec=1715926478, file=mysql-bin.146439, pos=352990413, server_id=3089103, 
event=18}
2024-05-17 14:14:39.956 INFO 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Client disconnect 
is successfully,taskId=0
2024-05-17 14:14:39.956 INFO 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask - 
Binlog split read task execute has finished,startOffset={ts_sec=0, 
file=mysql-bin.146439, pos=346179811, kind=SPECIFIC, gtids=, row=0, 
event=0},endOffset={ts_sec=0, file=mysql-bin.146439, pos=355392617, 
kind=SPECIFIC, gtids=, row=0, event=0}
2024-05-17 14:14:39.956 INFO 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader - Binlog 
read task has 
finished,currentSnapshotSplit=MySqlSnapshotSplit{tableId=ads_brand.data_convert_nodup_log,
 splitId='ads_brand.data_convert_nodup_log:21076', splitKeyType=[`id` BIGINT 
NOT NULL], splitStart=[626085527], splitEnd=null, highWatermark=null}
2024-05-17 14:14:40.154 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - 
CurrentReader has init 
finished,dataIt=false,currentReader.class=SnapshotSplitReader
2024-05-17 14:14:40.157 INFO io.debezium.jdbc.JdbcConnection - Connection 
gracefully closed
2024-05-17 14:14:40.157 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader - Source 
record has fetch finished,subtaskId=1
2024-05-17 14:14:40.157 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished 
reading from splits [ads_brand.data_convert_nodup_log:21076]
2024-05-17 14:14:41.920 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished 
reading split(s) [ads_brand.data_convert_nodup_log:21076]
2024-05-17 14:14:41.920 INFO 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader - Split read 
has finished,check read next split,requestNextSplit=true
2024-05-17 14:14:41.920 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - 
Closing splitFetcher 0 because it is idle.
2024-05-17 14:14:41.920 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting 
down split fetcher 0
2024-05-17 14:14:41.920 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Split 
fetcher 0 exited.

  {code}
Thread dump:
{code:java}
"AsyncOperations-thread-115" Id=42816 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3621aba6
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3621aba6
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"AsyncOperations-thread-114" 
Id=42815 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@437cadf8
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@437cadf8
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-8" 
Id=42812 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-7" 
Id=42810 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-6" 
Id=42809 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-5" 
Id=42808 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-metrics-22" Id=42805 
TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@26025fa1
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@26025fa1
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-4" 
Id=42760 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-3" 
Id=42759 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-2" 
Id=42758 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"flink-taskexecutor-io-thread-1" 
Id=42757 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3678563e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at 
java.lang.Thread.run(Thread.java:750)"blc-mysql-xima-slave-073.ximalaya.local:3306"
 Id=37065 TIMED_WAITING on io.debezium.connector.base.ChangeEventQueue@2b0adb40
    at java.lang.Object.wait(Native Method)
    -  waiting on io.debezium.connector.base.ChangeEventQueue@2b0adb40
    at 
io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204)
    at 
io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169)
    at 
com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.dispatchWatermarkEvent(SignalEventDispatcher.java:91)
    at 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:105)
    at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1095)
    at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$$Lambda$1436/136127176.onEvent(Unknown
 Source)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - 
java.util.concurrent.locks.ReentrantLock$NonfairSync@10cd2e64"debezium-reader-0"
 Id=37064 TIMED_WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@10cd2e64 owned by 
"blc-mysql-xima-slave-073.ximalaya.local:3306" Id=37065
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.locks.ReentrantLock$NonfairSync@10cd2e64
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireNanos(AbstractQueuedSynchronizer.java:934)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1247)
    at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:442)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.tryLockInterruptibly(BinaryLogClient.java:1335)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.terminateConnect(BinaryLogClient.java:1329)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.disconnect(BinaryLogClient.java:1298)
    at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.execute(MySqlStreamingChangeEventSource.java:1247)
    at 
com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.execute(MySqlBinlogSplitReadTask.java:84)
    at 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.backfill(SnapshotSplitReader.java:197)
    at 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$1(SnapshotSplitReader.java:153)
    at 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader$$Lambda$1405/15047152.run(Unknown
 Source)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - 
java.util.concurrent.ThreadPoolExecutor$Worker@4fe23624"flink-metrics-pekko.remote.default-remote-dispatcher-16"
 Id=35313 WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@1f4176d7
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@1f4176d7
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"flink-pekko.remote.default-remote-dispatcher-19"
 Id=35312 WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@6b7c0aa7
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@6b7c0aa7
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"I/O
 client dispatch - 46d5c4b9-a728-491a-9996-0a583ca6026e" Id=472 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@123bb7a2
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@123bb7a2
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"I/O client dispatch - 
4b2937c5-9326-4fdd-bb93-6679fbc30c33" Id=447 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6f96ae7e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6f96ae7e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"I/O client dispatch - 
fbb42f49-a225-419f-845b-1a41d73a763f" Id=203 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6f96ae7e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6f96ae7e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"I/O client dispatch - 
84ea7fea-2803-460c-84c3-27496c47c789" Id=202 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@123bb7a2
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@123bb7a2
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at 
java.lang.Thread.run(Thread.java:750)"mysql-cj-abandoned-connection-cleanup" 
Id=101 TIMED_WAITING on java.lang.ref.ReferenceQueue$Lock@7bf58a95
    at java.lang.Object.wait(Native Method)
    -  waiting on java.lang.ref.ReferenceQueue$Lock@7bf58a95
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    at 
com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.run(AbandonedConnectionCleanupThread.java:91)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@43f4dc77"System Time 
Trigger for Source: source_table[4] -> Calc[5] -> ConstraintEnforcer[6] -> 
Sink: sink_table[6] (1/2)#0" Id=100 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@70579e7e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@70579e7e
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"System Time Trigger for Source: 
source_table[4] -> Calc[5] -> ConstraintEnforcer[6] -> Sink: sink_table[6] 
(2/2)#0" Id=99 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3f9ea171
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3f9ea171
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"Time Trigger for Source: 
source_table[4] -> Calc[5] -> ConstraintEnforcer[6] -> Sink: sink_table[6] 
(1/2)#0" Id=97 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@65210da2
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@65210da2
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"Time Trigger for Source: 
source_table[4] -> Calc[5] -> ConstraintEnforcer[6] -> Sink: sink_table[6] 
(2/2)#0" Id=95 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@325792b1
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@325792b1
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"Source Data Fetcher for Source: 
source_table[4] -> Calc[5] -> ConstraintEnforcer[6] -> Sink: sink_table[6] 
(1/2)#0" Id=94 TIMED_WAITING on 
java.util.concurrent.locks.ReentrantLock$NonfairSync@10cd2e64 owned by 
"blc-mysql-xima-slave-073.ximalaya.local:3306" Id=37065
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.locks.ReentrantLock$NonfairSync@10cd2e64
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireNanos(AbstractQueuedSynchronizer.java:934)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1247)
    at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:442)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.tryLockInterruptibly(BinaryLogClient.java:1335)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.terminateConnect(BinaryLogClient.java:1329)
    at 
com.github.shyiko.mysql.binlog.BinaryLogClient.disconnect(BinaryLogClient.java:1298)
    at 
com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.close(SnapshotSplitReader.java:392)
    at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.closeSnapshotReader(MySqlSplitReader.java:282)
    at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.forRecords(MySqlSplitReader.java:205)
    at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:138)
    at 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:87)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - 
java.util.concurrent.ThreadPoolExecutor$Worker@b382b59"StarRocks-Sink-Manager" 
Id=92 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7ebf5eeb
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7ebf5eeb
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:161)
    at 
com.starrocks.data.load.stream.v2.StreamLoadManagerV2$$Lambda$1159/452173932.run(Unknown
 Source)
    at java.lang.Thread.run(Thread.java:750)"StarRocks-Sink-Manager" Id=91 
TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3460ed64
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3460ed64
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:161)
    at 
com.starrocks.data.load.stream.v2.StreamLoadManagerV2$$Lambda$1159/452173932.run(Unknown
 Source)
    at java.lang.Thread.run(Thread.java:750)"Thread-25" Id=89 RUNNABLE (in 
native)"Thread-24" Id=88 RUNNABLE (in 
native)"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner"
 Id=87 WAITING on java.lang.ref.ReferenceQueue$Lock@59c1ba57
    at java.lang.Object.wait(Native Method)
    -  waiting on java.lang.ref.ReferenceQueue$Lock@59c1ba57
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
    at 
org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:4021)
    at java.lang.Thread.run(Thread.java:750)"Source: source_table[4] -> Calc[5] 
-> ConstraintEnforcer[6] -> Sink: sink_table[6] (2/2)#0" Id=86 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@578d79c4
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@578d79c4
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
    at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1236/1566291491.run(Unknown 
Source)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:750)"CloseableReaperThread" Id=85 
WAITING on java.lang.ref.ReferenceQueue$Lock@53413dd8
    at java.lang.Object.wait(Native Method)
    -  waiting on java.lang.ref.ReferenceQueue$Lock@53413dd8
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
    at 
org.apache.flink.core.fs.SafetyNetCloseableRegistry$CloseableReaperThread.run(SafetyNetCloseableRegistry.java:215)"Source:
 source_table[4] -> Calc[5] -> ConstraintEnforcer[6] -> Sink: sink_table[6] 
(1/2)#0" Id=84 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6079e119
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6079e119
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
    at 
org.apache.flink.runtime.taskmanager.Task$$Lambda$1236/1566291491.run(Unknown 
Source)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at 
java.lang.Thread.run(Thread.java:750)"flink-pekko.actor.default-dispatcher-18" 
Id=83 WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@34beec43
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@34beec43
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"pool-6-thread-1"
 Id=81 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@8c1f049
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@8c1f049
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at 
java.lang.Thread.run(Thread.java:750)"taskmanager_0-main-scheduler-thread-1" 
Id=80 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@52b4a374
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@52b4a374
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at 
java.lang.Thread.run(Thread.java:750)"flink-pekko.actor.default-dispatcher-17" 
Id=79 WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@34beec43
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@34beec43
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"flink-pekko.actor.default-dispatcher-16"
 Id=78 WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@34beec43
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@34beec43
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"Hashed
 wheel timer #1" Id=25 TIMED_WAITING
    at java.lang.Thread.sleep(Native Method)
    at 
org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
    at 
org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at java.lang.Thread.run(Thread.java:750)"Flink Netty Server (0) Thread 0" 
Id=72 RUNNABLE (in native)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native 
Method)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:209)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.epollWait(Native.java:202)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:306)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:363)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at 
java.lang.Thread.run(Thread.java:750)"Flink-Metric-View-Updater-thread-1" Id=71 
TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7deae112
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7deae112
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"IOManager reader thread #6" Id=68 
WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3ef5d618
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@3ef5d618
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:372)"IOManager
 reader thread #5" Id=67 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39b12a37
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39b12a37
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:372)"IOManager
 reader thread #4" Id=66 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4e698176
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4e698176
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:372)"IOManager
 reader thread #3" Id=65 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@64211bd8
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@64211bd8
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:372)"IOManager
 reader thread #2" Id=64 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@74f45452
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@74f45452
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:372)"IOManager
 reader thread #1" Id=63 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44a42869
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@44a42869
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:372)"IOManager
 writer thread #6" Id=62 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@46c463ba
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@46c463ba
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:482)"IOManager
 writer thread #5" Id=61 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2ad75f61
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2ad75f61
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:482)"IOManager
 writer thread #4" Id=60 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7c57c8ad
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7c57c8ad
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:482)"IOManager
 writer thread #3" Id=59 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1955502e
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1955502e
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:482)"IOManager
 writer thread #2" Id=58 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7c69390d
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7c69390d
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:482)"IOManager
 writer thread #1" Id=57 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6cd55db6
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6cd55db6
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:482)"Timer-1"
 Id=55 TIMED_WAITING on java.util.TaskQueue@2e85e221
    at java.lang.Object.wait(Native Method)
    -  waiting on java.util.TaskQueue@2e85e221
    at java.util.TimerThread.mainLoop(Timer.java:552)
    at java.util.TimerThread.run(Timer.java:505)"Timer-0" Id=53 TIMED_WAITING 
on java.util.TaskQueue@1de213ff
    at java.lang.Object.wait(Native Method)
    -  waiting on java.util.TaskQueue@1de213ff
    at java.util.TimerThread.mainLoop(Timer.java:552)
    at java.util.TimerThread.run(Timer.java:505)"New I/O server boss #12" Id=49 
RUNNABLE (in native)
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@28f6f01f
    -  locked java.util.Collections$UnmodifiableSet@6e08205a
    -  locked sun.nio.ch.EPollSelectorImpl@6f9d4350
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
    at 
org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@19d14b65"New I/O worker 
#11" Id=48 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@626dc8b7
    -  locked java.util.Collections$UnmodifiableSet@7d84baed
    -  locked sun.nio.ch.EPollSelectorImpl@52215ff7
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@529e2c3"New I/O worker 
#10" Id=47 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@57a31120
    -  locked java.util.Collections$UnmodifiableSet@5b874313
    -  locked sun.nio.ch.EPollSelectorImpl@30cc95c
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@28b3a765"New I/O boss #9" 
Id=46 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@5b8ae02d
    -  locked java.util.Collections$UnmodifiableSet@14efc06d
    -  locked sun.nio.ch.EPollSelectorImpl@75119638
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@6db124b7"New I/O worker 
#8" Id=44 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@10b2e7ef
    -  locked java.util.Collections$UnmodifiableSet@1cb5e65c
    -  locked sun.nio.ch.EPollSelectorImpl@76b3b10f
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@698ead5e"New I/O worker 
#7" Id=43 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@f6a49b7
    -  locked java.util.Collections$UnmodifiableSet@8605a2a
    -  locked sun.nio.ch.EPollSelectorImpl@3bd4ca7c
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - 
java.util.concurrent.ThreadPoolExecutor$Worker@67494fc7"flink-metrics-pekko.remote.default-remote-dispatcher-6"
 Id=41 TIMED_WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@1f4176d7
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@1f4176d7
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"flink-metrics-scheduler-1"
 Id=36 TIMED_WAITING
    at java.lang.Thread.sleep(Native Method)
    at 
org.apache.pekko.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:99)
    at 
org.apache.pekko.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:310)
    at 
org.apache.pekko.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:280)
    at java.lang.Thread.run(Thread.java:750)"Flink-Metric-Reporter-thread-1" 
Id=35 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@50042fc0
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@50042fc0
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"MetricManager$$anon$1" Id=34 
TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@d7ef386
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@d7ef386
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"kafka-producer-network-thread | 
producer-1" Id=33 RUNNABLE (in native)
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@7454909
    -  locked java.util.Collections$UnmodifiableSet@1adff4b7
    -  locked sun.nio.ch.EPollSelectorImpl@6bd4b203
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at org.apache.kafka.common.network.Selector.select(Selector.java:873)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:465)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
    at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
    at java.lang.Thread.run(Thread.java:750)"Thread-4" Id=32 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@11cc8e3c
    -  locked java.util.Collections$UnmodifiableSet@3c6c800d
    -  locked sun.nio.ch.EPollSelectorImpl@5973a17b
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java:453)
    at java.lang.Thread.run(Thread.java:750)"idle-timeout-task" Id=31 
TIMED_WAITING on java.util.TaskQueue@177954fc
    at java.lang.Object.wait(Native Method)
    -  waiting on java.util.TaskQueue@177954fc
    at java.util.TimerThread.mainLoop(Timer.java:552)
    at java.util.TimerThread.run(Timer.java:505)"New I/O server boss #6" Id=29 
RUNNABLE (in native)
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@62b19541
    -  locked java.util.Collections$UnmodifiableSet@3a050df2
    -  locked sun.nio.ch.EPollSelectorImpl@7e390bb0
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
    at 
org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@19d921eb"New I/O worker 
#5" Id=28 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@23ba6e3f
    -  locked java.util.Collections$UnmodifiableSet@4dcfec94
    -  locked sun.nio.ch.EPollSelectorImpl@5fb71242
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@776f755d"New I/O worker 
#4" Id=27 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@6965f68f
    -  locked java.util.Collections$UnmodifiableSet@195eb60a
    -  locked sun.nio.ch.EPollSelectorImpl@2bc752f9
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@4a61a06"New I/O boss #3" 
Id=26 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@4c0570c1
    -  locked java.util.Collections$UnmodifiableSet@a123640
    -  locked sun.nio.ch.EPollSelectorImpl@66103fee
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@4cdb5b07"New I/O worker 
#2" Id=24 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@7751e94f
    -  locked java.util.Collections$UnmodifiableSet@10350c1c
    -  locked sun.nio.ch.EPollSelectorImpl@b0e986c
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@7918f451"New I/O worker 
#1" Id=23 RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    -  locked sun.nio.ch.Util$3@6c8629ac
    -  locked java.util.Collections$UnmodifiableSet@14e5bee2
    -  locked sun.nio.ch.EPollSelectorImpl@2c08158e
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
    at 
org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)    Number of locked synchronizers 
= 1
    - 
java.util.concurrent.ThreadPoolExecutor$Worker@293a14d3"flink-pekko.remote.default-remote-dispatcher-6"
 Id=21 TIMED_WAITING on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@6b7c0aa7
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool@6b7c0aa7
    at java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1824)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1693)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"flink-pekko.actor.default-dispatcher-5"
 Id=20 RUNNABLE
    at sun.management.ThreadImpl.dumpThreads0(Native Method)
    at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:496)
    at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:484)
    at org.apache.flink.runtime.util.JvmUtils.createThreadDump(JvmUtils.java:50)
    at 
org.apache.flink.runtime.rest.messages.ThreadDumpInfo.dumpAndCreate(ThreadDumpInfo.java:59)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.requestThreadDump(TaskExecutor.java:1350)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$$Lambda$797/432379350.get(Unknown
 Source)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$$Lambda$523/886869080.apply(Unknown
 Source)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)"flink-scheduler-1"
 Id=16 TIMED_WAITING
    at java.lang.Thread.sleep(Native Method)
    at 
org.apache.pekko.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:99)
    at 
org.apache.pekko.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:310)
    at 
org.apache.pekko.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:280)
    at java.lang.Thread.run(Thread.java:750)"logback-1" Id=9 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@533a259b
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@533a259b
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)"Signal Dispatcher" Id=5 
RUNNABLE"Finalizer" Id=3 WAITING on java.lang.ref.ReferenceQueue$Lock@e2cbd19
    at java.lang.Object.wait(Native Method)
    -  waiting on java.lang.ref.ReferenceQueue$Lock@e2cbd19
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
    at 
java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:188)"Reference 
Handler" Id=2 WAITING on java.lang.ref.Reference$Lock@37870090
    at java.lang.Object.wait(Native Method)
    -  waiting on java.lang.ref.Reference$Lock@37870090
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)"main" 
Id=1 WAITING on java.util.concurrent.CompletableFuture$Signaller@5b4fd78b
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@5b4fd78b
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:497)
    at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:535)
    at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner$$Lambda$55/1423983012.call(Unknown
 Source)
    at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext$$Lambda$56/127791068.run(Unknown
 Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:535)
    at 
org.apache.flink.yarn.YarnTaskExecutorRunner.runTaskManagerSecurely(YarnTaskExecutorRunner.java:94)
    at 
org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:68)
 {code}

> Flink mysql-cdc connector stops reading data
> --------------------------------------------
>
>                 Key: FLINK-35296
>                 URL: https://issues.apache.org/jira/browse/FLINK-35296
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.0
>            Reporter: Gang Yang
>            Priority: Major
>             Fix For: cdc-3.2.0
>
>         Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to