[ 
https://issues.apache.org/jira/browse/FLINK-30424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17647955#comment-17647955
 ] 

Ran Tao edited comment on FLINK-30424 at 12/15/22 9:28 AM:
-----------------------------------------------------------

Hi [~gaoyunhaii] . thanks for reviewing. i have added the jm and tm log in the 
below.

*We can see the recover log in the JM:*
2022-12-15 16:55:05,747 INFO  [pool-10-thread-1] 
org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC 
endpoint for 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at 
akka://flink/user/rpc/resourcemanager_2 .
2022-12-15 16:55:05,757 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Starting the resource manager.
2022-12-15 16:55:05,763 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] 
- Starting the slot manager.
2022-12-15 16:55:05,823 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Found 0 checkpoints in 
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Trying to fetch 0 checkpoints from storage.
2022-12-15 16:55:05,868 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.util.ZooKeeperUtils                 [] - Initialized 
DefaultCompletedCheckpointStore in 
'ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}'
 with /checkpoints.
2022-12-15 16:55:05,962 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running 
initialization on master for job antc4flink87003521 
(20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:05,962 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully 
ran initialization on master in 0 ms.
2022-12-15 16:55:06,251 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 new pipelined regions in 1 ms, total 1 pipelined regions currently.
2022-12-15 16:55:06,349 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}
2022-12-15 16:55:06,352 INFO  [jobmanager-io-thread-1] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2022-12-15 16:55:06,353 INFO  [jobmanager-io-thread-1] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
 state.backend.antkv.ttl.ms=129600000, 
state.backend.antkv.index-block-restart-interval=16, 
state.backend.antkv.learned.index.enabled=false, 
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false, 
state.backend.antkv.cache-num-shard-bits=1, 
state.backend.antkv.level-for-learned-index=0, 
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:55:06,354 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
application-defined state backend: EmbeddedRocksDBStateBackend\{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:55:06,355 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.state.StateBackendLoader            [] - State backend 
loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:55:06,360 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@5e36d9c3
2022-12-15 16:55:06,547 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint 
found during restore.
2022-12-15 16:55:06,548 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job 
20a4149a35ee6a00aba5356c361bf4df from savepoint 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50/_metadata
 ()
2022-12-15 16:55:06,768 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Reset the 
checkpoint ID of job 20a4149a35ee6a00aba5356c361bf4df to 51.
2022-12-15 16:55:06,768 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 
20a4149a35ee6a00aba5356c361bf4df from Checkpoint 50 @ 0 for 
20a4149a35ee6a00aba5356c361bf4df located at 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50.
2022-12-15 16:55:06,840 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master 
state to restore
2022-12-15 16:55:06,840 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
 [] - Resetting coordinator to checkpoint.
2022-12-15 16:55:06,842 INFO  [Thread-18] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing 
SourceCoordinator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,842 INFO  [Thread-18] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
coordinator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) closed.
2022-12-15 16:55:06,844 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring 
SplitEnumerator of source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) from checkpoint.
2022-12-15 16:55:06,862 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5f14baca
 for antc4flink87003521 (20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:06,937 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:06,937 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting 
execution of job 'antc4flink87003521' (20a4149a35ee6a00aba5356c361bf4df) under 
job master id b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:06,941 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting 
split enumerator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,943 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting 
scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-12-15 16:55:06,943 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
flink_demo (20a4149a35ee6a00aba5356c361bf4df) switched from state CREATED to 
RUNNING.
2022-12-15 16:55:06,943 INFO  [SourceCoordinator-Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt])] 
com.alipay.flink.connectors.demo.source.enumerator.demoSourceEnumerator [] - 
Starting the demoSourceEnumerator with partition discovery interval of 60000 ms.

*but if no partitions change, the discover thread just check and return.  
Though i know the job was recovered but in the tm log there are no logs related 
with recover :*

 

2022-12-15 16:55:30,114 INFO  [main] 
org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC 
endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/rpc/taskmanager_0 .
2022-12-15 16:55:30,177 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:30,178 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job 
leader service.
2022-12-15 16:55:30,180 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.filecache.FileCache                 [] - User file 
cache uses directory 
/opt/flink/flink-dist-cache-2a59b734-6c51-4379-b18e-3d6dac87f7af
2022-12-15 16:55:30,182 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to 
ResourceManager 
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2(b54febaa53b5412ad03a423f37644f07).
2022-12-15 16:55:30,426 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved 
ResourceManager address, beginning registration
2022-12-15 16:55:30,518 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2 under registration 
id e85390bf49682d3d723733ac807adb15.
2022-12-15 16:55:30,549 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request cd0f4d4ab80dbc66fe2c42dce23a143a for job 
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id 
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,554 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,555 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
20a4149a35ee6a00aba5356c361bf4df for job leader monitoring.
2022-12-15 16:55:30,557 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/20a4149a35ee6a00aba5356c361bf4df/connection_info'}.
2022-12-15 16:55:30,561 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request eac343b1f8cef50bd6f636f3595b58bf for job 
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id 
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,561 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:55:30,562 INFO  [main-EventThread] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager akka.tcp://[email protected]:6123/user/rpc/jobmanager_1 
with leader id d03a423f-3764-4f07-b54f-ebaa53b5412a.
2022-12-15 16:55:30,585 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
2022-12-15 16:55:30,618 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager 
akka.tcp://[email protected]:6123/user/rpc/jobmanager_1 for job 
20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,619 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
JobManager connection for job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,622 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
reserved slots to the leader of job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,648 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,648 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,892 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,907 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - 
Creating a changelog storage with name 'memory'.
2022-12-15 16:56:19,925 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7), deploy into slot with allocation id 
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,926 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,930 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,929 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0 (9293cae434983862cfcae1f732528bc7) [DEPLOYING].
2022-12-15 16:56:19,932 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856), deploy into slot with allocation id 
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,933 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,933 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0 (3e38d62bdebbb01fd55b0e96deed0856) [DEPLOYING].
2022-12-15 16:56:20,000 INFO  [Thread-3] 
com.alibaba.dfs.common.PerformanceCounter                    [] - PERF: 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.OPEN 
count 1 latency(us): avg 12369 deviation 0
time dist1:  [>10000 1 100.00%]
time dist2:  [<20000 1 100.00%]
2022-12-15 16:56:20,001 INFO  [Thread-3] 
com.alibaba.dfs.common.PerformanceCounter                    [] - PERF: 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.READ_SYS 
count 1 latency(us): avg 35153 deviation 0 Bps: avg 29834011.0 deviation 0.0
time dist1:  [>10000 1 100.00%]
time dist2:  [<40000 1 100.00%]
2022-12-15 16:56:20,716 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager [] - 
Reuse old classloader because the cached libraries is not compatible with new 
task.
2022-12-15 16:56:20,839 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}
2022-12-15 16:56:20,839 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}
2022-12-15 16:56:20,841 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,843 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,844 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using application-defined options factory: 
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
 state.backend.antkv.ttl.ms=129600000, 
state.backend.antkv.index-block-restart-interval=16, 
state.backend.antkv.learned.index.enabled=false, 
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false, 
state.backend.antkv.cache-num-shard-bits=1, 
state.backend.antkv.level-for-learned-index=0, 
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,844 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using application-defined options factory: 
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
 state.backend.antkv.ttl.ms=129600000, 
state.backend.antkv.index-block-restart-interval=16, 
state.backend.antkv.learned.index.enabled=false, 
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false, 
state.backend.antkv.cache-num-shard-bits=1, 
state.backend.antkv.level-for-learned-index=0, 
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using application-defined state backend: EmbeddedRocksDBStateBackend\{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using application-defined state backend: EmbeddedRocksDBStateBackend\{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.state.StateBackendLoader            [] - 
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.state.StateBackendLoader            [] - 
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,850 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@4413f5e9
2022-12-15 16:56:20,850 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@1095af08
2022-12-15 16:56:20,863 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7) switched from DEPLOYING to INITIALIZING.
2022-12-15 16:56:20,863 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856) switched from DEPLOYING to INITIALIZING.
{color:#00875a}2022-12-15 16:56:21,160 INFO  [Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0] 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: test_topic-2, startingOffset: 1670947200000, 
queueCursor: MTY3MTA3NTYzNTY4NDg2ODY3MA==, stoppingOffset: 
9223372036854775807]]{color}
2022-12-15 16:56:21,160 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.connector.base.source.reader.SourceReaderBase [] - 
Adding split(s) to reader: [[Partition: test_topic-1, startingOffset: 
1670947200000, queueCursor: MTY2OTYxNDE0MTI1ODYyMzkzMQ==, stoppingOffset: 
9223372036854775807]]
2022-12-15 16:56:21,161 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier 
[] - Add new split reader.
2022-12-15 16:56:21,161 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier 
[] - Add new split reader.
2022-12-15 16:56:21,171 INFO  [Source Data Fetcher for Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0] 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
2022-12-15 16:56:21,171 INFO  [Source Data Fetcher for Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0] 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
2022-12-15 16:56:21,177 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7) switched from INITIALIZING to RUNNING.
2022-12-15 16:56:21,177 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856) switched from INITIALIZING to RUNNING.

*{color:#172b4d}just with the green line Adding split(s) to reader but this log 
is existed both in partition discover thread newPartitionsSplit adding and 
recover situation (it means the later adding split could be restored also can 
be new added splits). We can not distinguish whether task recover from split 
state or new partitions adding. There are no logs related with restore in the 
tm side.{color}* 

So i think we can add task restore log in the tm side. It's useful for 
debugging and troubleshooting.


was (Author: lemonjing):
Hi [~gaoyunhaii] . thanks for reviewing. i have added the jm and tm log in the 
below.

*We can see the recover log in the JM:*
2022-12-15 16:55:05,747 INFO  [pool-10-thread-1] 
org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC 
endpoint for 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at 
akka://flink/user/rpc/resourcemanager_2 .
2022-12-15 16:55:05,757 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Starting the resource manager.
2022-12-15 16:55:05,763 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] 
- Starting the slot manager.
2022-12-15 16:55:05,823 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Found 0 checkpoints in 
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
Trying to fetch 0 checkpoints from storage.
2022-12-15 16:55:05,868 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.util.ZooKeeperUtils                 [] - Initialized 
DefaultCompletedCheckpointStore in 
'ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}'
 with /checkpoints.
2022-12-15 16:55:05,962 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running 
initialization on master for job antc4flink87003521 
(20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:05,962 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully 
ran initialization on master in 0 ms.
2022-12-15 16:55:06,251 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 new pipelined regions in 1 ms, total 1 pipelined regions currently.
2022-12-15 16:55:06,349 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}
2022-12-15 16:55:06,352 INFO  [jobmanager-io-thread-1] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2022-12-15 16:55:06,353 INFO  [jobmanager-io-thread-1] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
 state.backend.antkv.ttl.ms=129600000, 
state.backend.antkv.index-block-restart-interval=16, 
state.backend.antkv.learned.index.enabled=false, 
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false, 
state.backend.antkv.cache-num-shard-bits=1, 
state.backend.antkv.level-for-learned-index=0, 
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:55:06,354 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
application-defined state backend: EmbeddedRocksDBStateBackend\{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:55:06,355 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.state.StateBackendLoader            [] - State backend 
loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:55:06,360 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@5e36d9c3
2022-12-15 16:55:06,547 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint 
found during restore.
2022-12-15 16:55:06,548 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job 
20a4149a35ee6a00aba5356c361bf4df from savepoint 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50/_metadata
 ()
2022-12-15 16:55:06,768 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Reset the 
checkpoint ID of job 20a4149a35ee6a00aba5356c361bf4df to 51.
2022-12-15 16:55:06,768 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 
20a4149a35ee6a00aba5356c361bf4df from Checkpoint 50 @ 0 for 
20a4149a35ee6a00aba5356c361bf4df located at 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50.
2022-12-15 16:55:06,840 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master 
state to restore
2022-12-15 16:55:06,840 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
 [] - Resetting coordinator to checkpoint.
2022-12-15 16:55:06,842 INFO  [Thread-18] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing 
SourceCoordinator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,842 INFO  [Thread-18] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
coordinator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) closed.
2022-12-15 16:55:06,844 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring 
SplitEnumerator of source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) from checkpoint.
2022-12-15 16:55:06,862 INFO  [jobmanager-io-thread-1] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using 
failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5f14baca
 for antc4flink87003521 (20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:06,937 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:06,937 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting 
execution of job 'antc4flink87003521' (20a4149a35ee6a00aba5356c361bf4df) under 
job master id b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:06,941 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting 
split enumerator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,943 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting 
scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-12-15 16:55:06,943 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
flink_demo (20a4149a35ee6a00aba5356c361bf4df) switched from state CREATED to 
RUNNING.
2022-12-15 16:55:06,943 INFO  [SourceCoordinator-Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt])] 
com.alipay.flink.connectors.demo.source.enumerator.demoSourceEnumerator [] - 
Starting the demoSourceEnumerator with partition discovery interval of 60000 ms.

*but if no partitions change, the discover thread just check and return.  
Though i know the job was recovered but in the tm log there are no logs related 
with recover :*

 

2022-12-15 16:55:30,114 INFO  [main] 
org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC 
endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/rpc/taskmanager_0 .
2022-12-15 16:55:30,177 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:30,178 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job 
leader service.
2022-12-15 16:55:30,180 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.filecache.FileCache                 [] - User file 
cache uses directory 
/opt/flink/flink-dist-cache-2a59b734-6c51-4379-b18e-3d6dac87f7af
2022-12-15 16:55:30,182 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to 
ResourceManager 
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2(b54febaa53b5412ad03a423f37644f07).
2022-12-15 16:55:30,426 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved 
ResourceManager address, beginning registration
2022-12-15 16:55:30,518 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2 under registration 
id e85390bf49682d3d723733ac807adb15.
2022-12-15 16:55:30,549 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request cd0f4d4ab80dbc66fe2c42dce23a143a for job 
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id 
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,554 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,555 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
20a4149a35ee6a00aba5356c361bf4df for job leader monitoring.
2022-12-15 16:55:30,557 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Starting DefaultLeaderRetrievalService with 
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/20a4149a35ee6a00aba5356c361bf4df/connection_info'}.
2022-12-15 16:55:30,561 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request eac343b1f8cef50bd6f636f3595b58bf for job 
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id 
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,561 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:55:30,562 INFO  [main-EventThread] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager akka.tcp://[email protected]:6123/user/rpc/jobmanager_1 
with leader id d03a423f-3764-4f07-b54f-ebaa53b5412a.
2022-12-15 16:55:30,585 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
2022-12-15 16:55:30,618 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager 
akka.tcp://[email protected]:6123/user/rpc/jobmanager_1 for job 
20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,619 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
JobManager connection for job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,622 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
reserved slots to the leader of job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,648 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,648 INFO  [flink-akka.actor.default-dispatcher-4] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,892 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,907 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - 
Creating a changelog storage with name 'memory'.
2022-12-15 16:56:19,925 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7), deploy into slot with allocation id 
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,926 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,930 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,929 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0 (9293cae434983862cfcae1f732528bc7) [DEPLOYING].
2022-12-15 16:56:19,932 INFO  [flink-akka.actor.default-dispatcher-15] 
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856), deploy into slot with allocation id 
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,933 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,933 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0 (3e38d62bdebbb01fd55b0e96deed0856) [DEPLOYING].
2022-12-15 16:56:20,000 INFO  [Thread-3] 
com.alibaba.dfs.common.PerformanceCounter                    [] - PERF: 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.OPEN 
count 1 latency(us): avg 12369 deviation 0
time dist1:  [>10000 1 100.00%]
time dist2:  [<20000 1 100.00%]
2022-12-15 16:56:20,001 INFO  [Thread-3] 
com.alibaba.dfs.common.PerformanceCounter                    [] - PERF: 
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.READ_SYS 
count 1 latency(us): avg 35153 deviation 0 Bps: avg 29834011.0 deviation 0.0
time dist1:  [>10000 1 100.00%]
time dist2:  [<40000 1 100.00%]
2022-12-15 16:56:20,716 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager [] - 
Reuse old classloader because the cached libraries is not compatible with new 
task.
2022-12-15 16:56:20,839 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}
2022-12-15 16:56:20,839 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}
2022-12-15 16:56:20,841 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,843 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,844 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using application-defined options factory: 
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
 state.backend.antkv.ttl.ms=129600000, 
state.backend.antkv.index-block-restart-interval=16, 
state.backend.antkv.learned.index.enabled=false, 
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false, 
state.backend.antkv.cache-num-shard-bits=1, 
state.backend.antkv.level-for-learned-index=0, 
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,844 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend 
[] - Using application-defined options factory: 
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
 state.backend.antkv.ttl.ms=129600000, 
state.backend.antkv.index-block-restart-interval=16, 
state.backend.antkv.learned.index.enabled=false, 
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false, 
state.backend.antkv.cache-num-shard-bits=1, 
state.backend.antkv.level-for-learned-index=0, 
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using application-defined state backend: EmbeddedRocksDBStateBackend\{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using application-defined state backend: EmbeddedRocksDBStateBackend\{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.state.StateBackendLoader            [] - 
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,845 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.state.StateBackendLoader            [] - 
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,850 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@4413f5e9
2022-12-15 16:56:20,850 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask          [] - 
Using job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@1095af08
2022-12-15 16:56:20,863 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7) switched from DEPLOYING to INITIALIZING.
2022-12-15 16:56:20,863 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856) switched from DEPLOYING to INITIALIZING.
{color:#00875a}2022-12-15 16:56:21,160 INFO  [Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0] 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: test_topic-2, startingOffset: 1670947200000, 
queueCursor: MTY3MTA3NTYzNTY4NDg2ODY3MA==, stoppingOffset: 
9223372036854775807]]{color}
2022-12-15 16:56:21,160 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.connector.base.source.reader.SourceReaderBase [] - 
Adding split(s) to reader: [[Partition: test_topic-1, startingOffset: 
1670947200000, queueCursor: MTY2OTYxNDE0MTI1ODYyMzkzMQ==, stoppingOffset: 
9223372036854775807]]
2022-12-15 16:56:21,161 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier 
[] - Add new split reader.
2022-12-15 16:56:21,161 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier 
[] - Add new split reader.
2022-12-15 16:56:21,171 INFO  [Source Data Fetcher for Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0] 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
2022-12-15 16:56:21,171 INFO  [Source Data Fetcher for Source: 
TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0] 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
2022-12-15 16:56:21,177 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(1/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0 
(9293cae434983862cfcae1f732528bc7) switched from INITIALIZING to RUNNING.
2022-12-15 16:56:21,177 INFO  [Source: TableSourceScan(table=[[default_catalog, 
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) 
(2/2)#0] org.apache.flink.runtime.taskmanager.Task                    [] - 
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]], 
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0 
(3e38d62bdebbb01fd55b0e96deed0856) switched from INITIALIZING to RUNNING.

*just with the green line {color:#00875a} Adding split(s) {color:#172b4d}to 
reader but this log is existed both in partition discover thread 
newPartitionsSplit adding and recover situation (it means the later adding 
split could be restored also can be new added splits). We can not distinguish 
whether task recover from split state or new partitions adding. There are no 
logs related with restore in the tm side{color}{color}.* 

So i think we can add task restore log in the tm side. It's useful for 
debugging and troubleshooting.

> Add source operator restore readerState log to distinguish split is from 
> newPartitions or split state
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30424
>                 URL: https://issues.apache.org/jira/browse/FLINK-30424
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.16.0, 1.15.3, 1.16.1
>            Reporter: Ran Tao
>            Assignee: Ran Tao
>            Priority: Major
>              Labels: pull-request-available
>
> When a job start firstly, we can find 'assignPartitions' from log。but if 
> source recover from state, we can not distinguish the newPartitions is from 
> timed discover thread or from reader task state.  
> We can add a helper log to distinguish and confirm the reader using split 
> state in recover situation.  it's very useful for troubleshooting.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to