[
https://issues.apache.org/jira/browse/FLINK-30424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17647955#comment-17647955
]
Ran Tao edited comment on FLINK-30424 at 12/15/22 9:23 AM:
-----------------------------------------------------------
Hi [~gaoyunhaii] . thanks for reviewing. i have added the jm and tm log in the
below.
*We can see the recover log in the JM:*
2022-12-15 16:55:05,747 INFO [pool-10-thread-1]
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC
endpoint for
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
akka://flink/user/rpc/resourcemanager_2 .
2022-12-15 16:55:05,757 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Starting the resource manager.
2022-12-15 16:55:05,763 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager []
- Starting the slot manager.
2022-12-15 16:55:05,823 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Recovering checkpoints from
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Found 0 checkpoints in
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Trying to fetch 0 checkpoints from storage.
2022-12-15 16:55:05,868 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized
DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}'
with /checkpoints.
2022-12-15 16:55:05,962 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Running
initialization on master for job antc4flink87003521
(20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:05,962 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully
ran initialization on master in 0 ms.
2022-12-15 16:55:06,251 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 new pipelined regions in 1 ms, total 1 pipelined regions currently.
2022-12-15 16:55:06,349 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
2022-12-15 16:55:06,352 INFO [jobmanager-io-thread-1]
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT.
2022-12-15 16:55:06,353 INFO [jobmanager-io-thread-1]
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
application-defined options factory:
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
state.backend.antkv.ttl.ms=129600000,
state.backend.antkv.index-block-restart-interval=16,
state.backend.antkv.learned.index.enabled=false,
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false,
state.backend.antkv.cache-num-shard-bits=1,
state.backend.antkv.level-for-learned-index=0,
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:55:06,354 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
application-defined state backend: EmbeddedRocksDBStateBackend\{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:55:06,355 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.state.StateBackendLoader [] - State backend
loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:55:06,360 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
job/cluster config to configure application-defined checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@5e36d9c3
2022-12-15 16:55:06,547 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint
found during restore.
2022-12-15 16:55:06,548 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting job
20a4149a35ee6a00aba5356c361bf4df from savepoint
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50/_metadata
()
2022-12-15 16:55:06,768 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Reset the
checkpoint ID of job 20a4149a35ee6a00aba5356c361bf4df to 51.
2022-12-15 16:55:06,768 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
20a4149a35ee6a00aba5356c361bf4df from Checkpoint 50 @ 0 for
20a4149a35ee6a00aba5356c361bf4df located at
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50.
2022-12-15 16:55:06,840 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master
state to restore
2022-12-15 16:55:06,840 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
[] - Resetting coordinator to checkpoint.
2022-12-15 16:55:06,842 INFO [Thread-18]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing
SourceCoordinator for source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,842 INFO [Thread-18]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) closed.
2022-12-15 16:55:06,844 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring
SplitEnumerator of source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) from checkpoint.
2022-12-15 16:55:06,862 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5f14baca
for antc4flink87003521 (20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:06,937 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:06,937 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster [] - Starting
execution of job 'antc4flink87003521' (20a4149a35ee6a00aba5356c361bf4df) under
job master id b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:06,941 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting
split enumerator for source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,943 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster [] - Starting
scheduling with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-12-15 16:55:06,943 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
flink_demo (20a4149a35ee6a00aba5356c361bf4df) switched from state CREATED to
RUNNING.
2022-12-15 16:55:06,943 INFO [SourceCoordinator-Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt])]
com.alipay.flink.connectors.demo.source.enumerator.demoSourceEnumerator [] -
Starting the demoSourceEnumerator with partition discovery interval of 60000 ms.
*but if no partitions change, the discover thread just check and return.
Though i know the job was recovered but in the tm log there are no logs related
with recover :*
2022-12-15 16:55:30,114 INFO [main]
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC
endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
akka://flink/user/rpc/taskmanager_0 .
2022-12-15 16:55:30,177 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:30,178 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job
leader service.
2022-12-15 16:55:30,180 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.filecache.FileCache [] - User file
cache uses directory
/opt/flink/flink-dist-cache-2a59b734-6c51-4379-b18e-3d6dac87f7af
2022-12-15 16:55:30,182 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to
ResourceManager
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2(b54febaa53b5412ad03a423f37644f07).
2022-12-15 16:55:30,426 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Resolved
ResourceManager address, beginning registration
2022-12-15 16:55:30,518 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful
registration at resource manager
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2 under registration
id e85390bf49682d3d723733ac807adb15.
2022-12-15 16:55:30,549 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot
request cd0f4d4ab80dbc66fe2c42dce23a143a for job
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,554 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated
slot for cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,555 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job
20a4149a35ee6a00aba5356c361bf4df for job leader monitoring.
2022-12-15 16:55:30,557 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/20a4149a35ee6a00aba5356c361bf4df/connection_info'}.
2022-12-15 16:55:30,561 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot
request eac343b1f8cef50bd6f636f3595b58bf for job
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,561 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated
slot for eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:55:30,562 INFO [main-EventThread]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to
register at job manager akka.tcp://[email protected]:6123/user/rpc/jobmanager_1
with leader id d03a423f-3764-4f07-b54f-ebaa53b5412a.
2022-12-15 16:55:30,585 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved
JobManager address, beginning registration
2022-12-15 16:55:30,618 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful
registration at job manager
akka.tcp://[email protected]:6123/user/rpc/jobmanager_1 for job
20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,619 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish
JobManager connection for job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,622 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer
reserved slots to the leader of job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,648 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,648 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,892 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,907 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] -
Creating a changelog storage with name 'memory'.
2022-12-15 16:56:19,925 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7), deploy into slot with allocation id
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,926 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,930 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,929 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0 (9293cae434983862cfcae1f732528bc7) [DEPLOYING].
2022-12-15 16:56:19,932 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856), deploy into slot with allocation id
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,933 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,933 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0 (3e38d62bdebbb01fd55b0e96deed0856) [DEPLOYING].
2022-12-15 16:56:20,000 INFO [Thread-3]
com.alibaba.dfs.common.PerformanceCounter [] - PERF:
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.OPEN
count 1 latency(us): avg 12369 deviation 0
time dist1: [>10000 1 100.00%]
time dist2: [<20000 1 100.00%]
2022-12-15 16:56:20,001 INFO [Thread-3]
com.alibaba.dfs.common.PerformanceCounter [] - PERF:
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.READ_SYS
count 1 latency(us): avg 35153 deviation 0 Bps: avg 29834011.0 deviation 0.0
time dist1: [>10000 1 100.00%]
time dist2: [<40000 1 100.00%]
2022-12-15 16:56:20,716 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0]
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager [] -
Reuse old classloader because the cached libraries is not compatible with new
task.
2022-12-15 16:56:20,839 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
2022-12-15 16:56:20,839 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
2022-12-15 16:56:20,841 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,843 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,844 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using application-defined options factory:
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
state.backend.antkv.ttl.ms=129600000,
state.backend.antkv.index-block-restart-interval=16,
state.backend.antkv.learned.index.enabled=false,
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false,
state.backend.antkv.cache-num-shard-bits=1,
state.backend.antkv.level-for-learned-index=0,
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,844 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using application-defined options factory:
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
state.backend.antkv.ttl.ms=129600000,
state.backend.antkv.index-block-restart-interval=16,
state.backend.antkv.learned.index.enabled=false,
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false,
state.backend.antkv.cache-num-shard-bits=1,
state.backend.antkv.level-for-learned-index=0,
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using application-defined state backend: EmbeddedRocksDBStateBackend\{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using application-defined state backend: EmbeddedRocksDBStateBackend\{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.state.StateBackendLoader [] -
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.state.StateBackendLoader [] -
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,850 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@4413f5e9
2022-12-15 16:56:20,850 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@1095af08
2022-12-15 16:56:20,863 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7) switched from DEPLOYING to INITIALIZING.
2022-12-15 16:56:20,863 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856) switched from DEPLOYING to INITIALIZING.
{color:#00875a}2022-12-15 16:56:21,160 INFO [Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0]
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader: [[Partition: test_topic-2, startingOffset: 1670947200000,
queueCursor: MTY3MTA3NTYzNTY4NDg2ODY3MA==, stoppingOffset:
9223372036854775807]]{color}
2022-12-15 16:56:21,160 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.connector.base.source.reader.SourceReaderBase [] -
Adding split(s) to reader: [[Partition: test_topic-1, startingOffset:
1670947200000, queueCursor: MTY2OTYxNDE0MTI1ODYyMzkzMQ==, stoppingOffset:
9223372036854775807]]
2022-12-15 16:56:21,161 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier
[] - Add new split reader.
2022-12-15 16:56:21,161 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier
[] - Add new split reader.
2022-12-15 16:56:21,171 INFO [Source Data Fetcher for Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0]
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
2022-12-15 16:56:21,171 INFO [Source Data Fetcher for Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0]
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
2022-12-15 16:56:21,177 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7) switched from INITIALIZING to RUNNING.
2022-12-15 16:56:21,177 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856) switched from INITIALIZING to RUNNING.
*just with the green line {color:#00875a} Adding split(s){color:#172b4d} to
reader but this log is existed both in partition discover thread
newPartitionsSplit adding and recover situation. We can not distinguish whether
task recover from split state or new partitions adding. There are no logs
related with restore in the tm side{color}.{color}*
So i think we can add task restore log in the tm side. It's useful for
debugging and troubleshooting.
was (Author: lemonjing):
Hi [~gaoyunhaii] . thanks for reviewing. i have added the jm and tm log in the
below.
*We can see the recover log in the JM:*
2022-12-15 16:55:05,747 INFO [pool-10-thread-1]
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC
endpoint for
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
akka://flink/user/rpc/resourcemanager_2 .
2022-12-15 16:55:05,757 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Starting the resource manager.
2022-12-15 16:55:05,763 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager []
- Starting the slot manager.
2022-12-15 16:55:05,823 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Recovering checkpoints from
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Found 0 checkpoints in
ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}.
2022-12-15 16:55:05,859 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Trying to fetch 0 checkpoints from storage.
2022-12-15 16:55:05,868 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized
DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore\{namespace='flink/antc4flink87003521/jobs/20a4149a35ee6a00aba5356c361bf4df/checkpoints'}'
with /checkpoints.
2022-12-15 16:55:05,962 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Running
initialization on master for job antc4flink87003521
(20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:05,962 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully
ran initialization on master in 0 ms.
2022-12-15 16:55:06,251 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 new pipelined regions in 1 ms, total 1 pipelined regions currently.
2022-12-15 16:55:06,349 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
2022-12-15 16:55:06,352 INFO [jobmanager-io-thread-1]
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT.
2022-12-15 16:55:06,353 INFO [jobmanager-io-thread-1]
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
application-defined options factory:
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
state.backend.antkv.ttl.ms=129600000,
state.backend.antkv.index-block-restart-interval=16,
state.backend.antkv.learned.index.enabled=false,
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false,
state.backend.antkv.cache-num-shard-bits=1,
state.backend.antkv.level-for-learned-index=0,
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:55:06,354 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
application-defined state backend: EmbeddedRocksDBStateBackend\{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:55:06,355 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.state.StateBackendLoader [] - State backend
loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:55:06,360 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
job/cluster config to configure application-defined checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@5e36d9c3
2022-12-15 16:55:06,547 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint
found during restore.
2022-12-15 16:55:06,548 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting job
20a4149a35ee6a00aba5356c361bf4df from savepoint
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50/_metadata
()
2022-12-15 16:55:06,768 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Reset the
checkpoint ID of job 20a4149a35ee6a00aba5356c361bf4df to 51.
2022-12-15 16:55:06,768 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
20a4149a35ee6a00aba5356c361bf4df from Checkpoint 50 @ 0 for
20a4149a35ee6a00aba5356c361bf4df located at
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/blink-operator/antc4flink87003521/chk-50.
2022-12-15 16:55:06,840 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master
state to restore
2022-12-15 16:55:06,840 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
[] - Resetting coordinator to checkpoint.
2022-12-15 16:55:06,842 INFO [Thread-18]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing
SourceCoordinator for source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,842 INFO [Thread-18]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) closed.
2022-12-15 16:55:06,844 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring
SplitEnumerator of source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) from checkpoint.
2022-12-15 16:55:06,862 INFO [jobmanager-io-thread-1]
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5f14baca
for antc4flink87003521 (20a4149a35ee6a00aba5356c361bf4df).
2022-12-15 16:55:06,937 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:06,937 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster [] - Starting
execution of job 'antc4flink87003521' (20a4149a35ee6a00aba5356c361bf4df) under
job master id b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:06,941 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting
split enumerator for source Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]).
2022-12-15 16:55:06,943 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster [] - Starting
scheduling with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-12-15 16:55:06,943 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
flink_demo (20a4149a35ee6a00aba5356c361bf4df) switched from state CREATED to
RUNNING.
2022-12-15 16:55:06,943 INFO [SourceCoordinator-Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt])]
com.alipay.flink.connectors.demo.source.enumerator.demoSourceEnumerator [] -
Starting the demoSourceEnumerator with partition discovery interval of 60000 ms.
*but if no partitions change, the discover thread just check and return.
Though i know the job was recovered but in the tm log there are no logs related
with recover :*
2022-12-15 16:55:30,114 INFO [main]
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC
endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
akka://flink/user/rpc/taskmanager_0 .
2022-12-15 16:55:30,177 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/resource_manager/connection_info'}.
2022-12-15 16:55:30,178 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job
leader service.
2022-12-15 16:55:30,180 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.filecache.FileCache [] - User file
cache uses directory
/opt/flink/flink-dist-cache-2a59b734-6c51-4379-b18e-3d6dac87f7af
2022-12-15 16:55:30,182 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to
ResourceManager
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2(b54febaa53b5412ad03a423f37644f07).
2022-12-15 16:55:30,426 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Resolved
ResourceManager address, beginning registration
2022-12-15 16:55:30,518 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Successful
registration at resource manager
akka.tcp://[email protected]:6123/user/rpc/resourcemanager_2 under registration
id e85390bf49682d3d723733ac807adb15.
2022-12-15 16:55:30,549 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot
request cd0f4d4ab80dbc66fe2c42dce23a143a for job
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,554 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated
slot for cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,555 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job
20a4149a35ee6a00aba5356c361bf4df for job leader monitoring.
2022-12-15 16:55:30,557 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
ZookeeperLeaderRetrievalDriver\{connectionInformationPath='/20a4149a35ee6a00aba5356c361bf4df/connection_info'}.
2022-12-15 16:55:30,561 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot
request eac343b1f8cef50bd6f636f3595b58bf for job
20a4149a35ee6a00aba5356c361bf4df from resource manager with leader id
b54febaa53b5412ad03a423f37644f07.
2022-12-15 16:55:30,561 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated
slot for eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:55:30,562 INFO [main-EventThread]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to
register at job manager akka.tcp://[email protected]:6123/user/rpc/jobmanager_1
with leader id d03a423f-3764-4f07-b54f-ebaa53b5412a.
2022-12-15 16:55:30,585 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved
JobManager address, beginning registration
2022-12-15 16:55:30,618 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful
registration at job manager
akka.tcp://[email protected]:6123/user/rpc/jobmanager_1 for job
20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,619 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish
JobManager connection for job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,622 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer
reserved slots to the leader of job 20a4149a35ee6a00aba5356c361bf4df.
2022-12-15 16:55:30,648 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:55:30,648 INFO [flink-akka.actor.default-dispatcher-4]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,892 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,907 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] -
Creating a changelog storage with name 'memory'.
2022-12-15 16:56:19,925 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7), deploy into slot with allocation id
cd0f4d4ab80dbc66fe2c42dce23a143a.
2022-12-15 16:56:19,926 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,930 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,929 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0 (9293cae434983862cfcae1f732528bc7) [DEPLOYING].
2022-12-15 16:56:19,932 INFO [flink-akka.actor.default-dispatcher-15]
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856), deploy into slot with allocation id
eac343b1f8cef50bd6f636f3595b58bf.
2022-12-15 16:56:19,933 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856) switched from CREATED to DEPLOYING.
2022-12-15 16:56:19,933 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Loading JAR files for task Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0 (3e38d62bdebbb01fd55b0e96deed0856) [DEPLOYING].
2022-12-15 16:56:20,000 INFO [Thread-3]
com.alibaba.dfs.common.PerformanceCounter [] - PERF:
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.OPEN
count 1 latency(us): avg 12369 deviation 0
time dist1: [>10000 1 100.00%]
time dist2: [<20000 1 100.00%]
2022-12-15 16:56:20,001 INFO [Thread-3]
com.alibaba.dfs.common.PerformanceCounter [] - PERF:
dfs://f-blink-test.sh.aliyun-inc.com:10290/blink/ha/antc4flink87003521.READ_SYS
count 1 latency(us): avg 35153 deviation 0 Bps: avg 29834011.0 deviation 0.0
time dist1: [>10000 1 100.00%]
time dist2: [<40000 1 100.00%]
2022-12-15 16:56:20,716 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0]
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager [] -
Reuse old classloader because the cached libraries is not compatible with new
task.
2022-12-15 16:56:20,839 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
2022-12-15 16:56:20,839 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend\{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
writeBatchSize=2097152}
2022-12-15 16:56:20,841 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,843 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using predefined options: DEFAULT.
2022-12-15 16:56:20,844 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using application-defined options factory:
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
state.backend.antkv.ttl.ms=129600000,
state.backend.antkv.index-block-restart-interval=16,
state.backend.antkv.learned.index.enabled=false,
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false,
state.backend.antkv.cache-num-shard-bits=1,
state.backend.antkv.level-for-learned-index=0,
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,844 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
[] - Using application-defined options factory:
DefaultConfigurableOptionsFactory\{configuredOptions={state.backend.antkv.perf-num-occurrence=1000,
state.backend.antkv.ttl.ms=129600000,
state.backend.antkv.index-block-restart-interval=16,
state.backend.antkv.learned.index.enabled=false,
state.backend.antkv.level-compaction-dynamic-level-bytes.enabled=false,
state.backend.antkv.cache-num-shard-bits=1,
state.backend.antkv.level-for-learned-index=0,
state.backend.antkv.statistics.enabled=false}}.
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using application-defined state backend: EmbeddedRocksDBStateBackend\{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using application-defined state backend: EmbeddedRocksDBStateBackend\{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=4, writeBatchSize=2097152}
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.state.StateBackendLoader [] -
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,845 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.state.StateBackendLoader [] -
State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2022-12-15 16:56:20,850 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@4413f5e9
2022-12-15 16:56:20,850 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.streaming.runtime.tasks.StreamTask [] -
Using job/cluster config to configure application-defined checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@1095af08
2022-12-15 16:56:20,863 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7) switched from DEPLOYING to INITIALIZING.
2022-12-15 16:56:20,863 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856) switched from DEPLOYING to INITIALIZING.
{color:#00875a}2022-12-15 16:56:21,160 INFO [Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0]
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding
split(s) to reader: [[Partition: test_topic-2, startingOffset: 1670947200000,
queueCursor: MTY3MTA3NTYzNTY4NDg2ODY3MA==, stoppingOffset:
9223372036854775807]]{color}
2022-12-15 16:56:21,160 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.connector.base.source.reader.SourceReaderBase [] -
Adding split(s) to reader: [[Partition: test_topic-1, startingOffset:
1670947200000, queueCursor: MTY2OTYxNDE0MTI1ODYyMzkzMQ==, stoppingOffset:
9223372036854775807]]
2022-12-15 16:56:21,161 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier
[] - Add new split reader.
2022-12-15 16:56:21,161 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] com.alipay.flink.connectors.common.source.reader.SplitReaderSupplier
[] - Add new split reader.
2022-12-15 16:56:21,171 INFO [Source Data Fetcher for Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0]
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
2022-12-15 16:56:21,171 INFO [Source Data Fetcher for Source:
TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0]
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Starting split fetcher 0
2022-12-15 16:56:21,177 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(1/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (1/2)#0
(9293cae434983862cfcae1f732528bc7) switched from INITIALIZING to RUNNING.
2022-12-15 16:56:21,177 INFO [Source: TableSourceScan(table=[[default_catalog,
default_database, demo_in]], fields=[f0, f1, cnt]) -> Calc(select=[f0, f1])
(2/2)#0] org.apache.flink.runtime.taskmanager.Task [] -
Source: TableSourceScan(table=[[default_catalog, default_database, demo_in]],
fields=[f0, f1, cnt]) -> Calc(select=[f0, f1]) (2/2)#0
(3e38d62bdebbb01fd55b0e96deed0856) switched from INITIALIZING to RUNNING.
*just with the green line {color:#00875a} Adding split(s) to reader
{color:#172b4d}but this log is existed both in partition discover thread
newPartitionsSplit adding and recover situation. We can not distinguish whether
task recover from split state or new partitions adding. There are no logs
related with restore in the tm side.{color}{color}*
So i think we can add task restore log in the tm side. It's useful for
debugging and troubleshooting.
> Add source operator restore readerState log to distinguish split is from
> newPartitions or split state
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-30424
> URL: https://issues.apache.org/jira/browse/FLINK-30424
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.16.0, 1.15.3, 1.16.1
> Reporter: Ran Tao
> Assignee: Ran Tao
> Priority: Major
> Labels: pull-request-available
>
> When a job start firstly, we can find 'assignPartitions' from log。but if
> source recover from state, we can not distinguish the newPartitions is from
> timed discover thread or from reader task state.
> We can add a helper log to distinguish and confirm the reader using split
> state in recover situation. it's very useful for troubleshooting.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)