[ 
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699928#comment-17699928
 ] 

Feifan Wang commented on FLINK-31414:
-------------------------------------

Thanks for reply [~pnowojski] , sorry for the lack of clarity in the previous 
description, let me answer your question first :
{quote}the stack trace doesn't match to the master code, so I'm not sure what 
Flink version you are using?
{quote}
based on *release-1.16.1* , cherry-picked some bug fix.
{quote}doesn't the error message "switched from RUNNING to FAILED" refer to 
actually subtask/task switching to FAILED state, contradicting your statement 
that the exception is being ignored?
{quote}
Yes, it is a subtask switching to FAILED state. I mean the exception thrown in 
the alignment timer task is being ignored, causing the subtask thread to 
continue executing to trigger the exception I posted above.

Here is the more complete log ( I change some log level from debug to info ) :

 
{code:java}
2023-03-10 12:09:42,416 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Received barrier from channel InputChannelInfo{gateIdx=1, inputChannelIdx=586} 
@ 17.
2023-03-10 12:09:42,673 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
starting checkpoint 17 
(CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default), 
alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807})
2023-03-10 12:09:42,673 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 put 
ChannelStateWriteResult : 17
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Triggering checkpoint 17 on the barrier announcement at 1678421367671.
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.StreamTask           - 
triggerCheckpointOnBarrier Starting checkpoint 17 
CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on 
task MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting 
checkpoint 17 CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD} on task MV_J_PV -> 
mv-join-after-operator -> extract-event-identifier (4517/4800)#1
2023-03-10 12:09:42,675 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
requested write result, checkpoint 17
2023-03-10 12:09:42,676 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend   - snapshot of 
MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 for checkpoint 17, change range: 39..46, materialization ID 4
2023-03-10 12:09:42,677 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain  - Could not 
complete snapshot 17 for operator MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 17 for operator MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:730)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1291)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1279)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1236)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:489)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:50)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$registerAlignmentTimer$3(SingleCheckpointBarrierHandler.java:321)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: The upload for 44 has already failed previously
    at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java:429)
    at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java:213)
    at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java:208)
    at 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java:402)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246)
    ... 31 more
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException):
 The directory item limit of 
/flink-yg-test01/feifan-changelog-test/dstl/460359d4d9311744142797ba23e69d16/dstl
 is exceeded: limit=1048576 items=1048576
    at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyMaxDirItems(FSDirectory.java:1056)
    at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.addLastINode(FSDirectory.java:1101)
    at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.addINode(FSDirectory.java:967)
    at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.addFile(FSDirectory.java:531)
    at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2997)
    at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2856)
    at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2691)
    at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:815)
    at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:450)
    at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:713)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:975)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1002)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:923)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1726)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2786)    at 
org.apache.hadoop.ipc.Client.call(Client.java:1578)
    at org.apache.hadoop.ipc.Client.call(Client.java:1505)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
    at com.sun.proxy.$Proxy22.create(Unknown Source)
    at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:332)
    at sun.reflect.GeneratedMethodAccessor181.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
    at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
    at com.sun.proxy.$Proxy23.create(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor181.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.hadoop.hdfs.RpcResponseHandler.invoke(RpcResponseHandler.java:55)
    at com.sun.proxy.$Proxy23.create(Unknown Source)
    at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1982)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1937)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1871)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:452)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
    at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:448)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:391)
    at org.apache.hadoop.fs.FilterFileSystem.create(FilterFileSystem.java:179)
    at 
org.apache.hadoop.fs.viewfs.ChRootedFileSystem.create(ChRootedFileSystem.java:189)
    at 
org.apache.hadoop.fs.viewfs.ViewFileSystem.create(ViewFileSystem.java:323)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1027)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1008)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:154)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
    at 
org.apache.flink.changelog.fs.DuplicatingStateChangeFsUploader.prepareStream(DuplicatingStateChangeFsUploader.java:96)
    at 
org.apache.flink.changelog.fs.AbstractStateChangeFsUploader.uploadInternal(AbstractStateChangeFsUploader.java:76)
    at 
org.apache.flink.changelog.fs.AbstractStateChangeFsUploader.upload(AbstractStateChangeFsUploader.java:69)
    at 
org.apache.flink.changelog.fs.BatchingStateChangeUploadScheduler$1.tryExecute(BatchingStateChangeUploadScheduler.java:310)
    at 
org.apache.flink.changelog.fs.BatchingStateChangeUploadScheduler$1.tryExecute(BatchingStateChangeUploadScheduler.java:307)
    at 
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.run(RetryingExecutor.java:225)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
2023-03-10 12:09:42,677 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
aborting, checkpoint 17, cleanup:true
2023-03-10 12:09:42,723 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1): 
Received barrier from channel InputChannelInfo{gateIdx=0, inputChannelIdx=324} 
@ 17.
2023-03-10 12:09:42,723 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
starting checkpoint 17 
(CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default), 
alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807})
2023-03-10 12:09:42,724 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 put 
ChannelStateWriteResult : 17
2023-03-10 12:09:42,724 INFO  [Channel state writer MV_J_PV -> 
mv-join-after-operator -> extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 discarding 645 drained requests
2023-03-10 12:09:42,724 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 discarding 1023 drained requests
2023-03-10 12:09:42,725 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.state.common.PeriodicMaterializationManager  - Shutting down 
PeriodicMaterializationManager.
2023-03-10 12:09:42,725 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl  - MV_J_PV 
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
aborting, checkpoint 17, cleanup:false
2023-03-10 12:09:42,726 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
  - MV_J_PV -> mv-join-after-operator -> extract-event-identifier 
(4517/4800)#1 discarding 1 drained requests
2023-03-10 12:09:43,099 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Closed 
RocksDB State Backend. Cleaning up RocksDB working directory 
/data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671247042382_4677267/tm_container_e74_1671247042382_4677267_01_000039/tmp/job_460359d4d9311744142797ba23e69d16_op_KeyedCoProcessOperator_4f7e0f4c19a43f929bda6907ee1f3150__4517_4800__uuid_785a6359-d191-4691-9272-d0590d2d696b.
2023-03-10 12:09:43,125 WARN  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.taskmanager.Task                     - MV_J_PV -> 
mv-join-after-operator -> extract-event-identifier (4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1) 
switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: 
unable to send request to worker
    at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
    at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.io.IOException: java.lang.IllegalStateException: writer 
not found for request start 17
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551)
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
        at 
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
        at 
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
        at 
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
        ... 3 more
    Caused by: java.lang.IllegalStateException: writer not found for request 
start 17
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75)
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62)
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
        at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
        ... 1 more
Caused by: java.lang.IllegalStateException: not running
    at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
    at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
    at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
    at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244)
    ... 27 more
    [CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found for 
request start 17]
2023-03-10 12:09:43,125 INFO  [MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1] 
org.apache.flink.runtime.taskmanager.Task                     - Freeing task 
resources for MV_J_PV -> mv-join-after-operator -> 
extract-event-identifier (4517/4800)#1 
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1). 
{code}
 

 

Later I will try to write a test to explain the problem.

> exceptions in the alignment timer are ignored
> ---------------------------------------------
>
>                 Key: FLINK-31414
>                 URL: https://issues.apache.org/jira/browse/FLINK-31414
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Priority: Major
>              Labels: pull-request-available
>
> Alignment timer task in alternating aligned checkpoint run as a future task 
> in mailbox thread, causing the exceptions 
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
>  to be ignored. These exceptions should have failed the task, but now this 
> will cause the same checkpoint to fire twice initInputsCheckpoints in my test.
>  
> {code:java}
>  switched from RUNNING to FAILED with failure cause: 
> java.lang.RuntimeException: unable to send request to worker
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
>         at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
>         at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
>         at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>         at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>         at java.lang.Thread.run(Thread.java:748)
>         Suppressed: java.io.IOException: java.lang.IllegalStateException: 
> writer not found for request start 17
>                 at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
>                 at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
>                 at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
>                 at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551)
>                 at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
>                 at 
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
>                 at 
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>                 ... 3 more
>         Caused by: java.lang.IllegalStateException: writer not found for 
> request start 17
>                 at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>                 at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75)
>                 at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62)
>                 at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
>                 at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
>                 ... 1 more
> Caused by: java.lang.IllegalStateException: not running
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
>         at 
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244)
>         ... 27 more
>         [CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found 
> for request start 17] {code}
>  
>  
> see : 
> [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to