[
https://issues.apache.org/jira/browse/FLINK-31414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699928#comment-17699928
]
Feifan Wang commented on FLINK-31414:
-------------------------------------
Thanks for reply [~pnowojski] , sorry for the lack of clarity in the previous
description, let me answer your question first :
{quote}the stack trace doesn't match to the master code, so I'm not sure what
Flink version you are using?
{quote}
based on *release-1.16.1* , cherry-picked some bug fix.
{quote}doesn't the error message "switched from RUNNING to FAILED" refer to
actually subtask/task switching to FAILED state, contradicting your statement
that the exception is being ignored?
{quote}
Yes, it is a subtask switching to FAILED state. I mean the exception thrown in
the alignment timer task is being ignored, causing the subtask thread to
continue executing to trigger the exception I posted above.
Here is the more complete log ( I change some log level from debug to info ) :
{code:java}
2023-03-10 12:09:42,416 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
- MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1):
Received barrier from channel InputChannelInfo{gateIdx=1, inputChannelIdx=586}
@ 17.
2023-03-10 12:09:42,673 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1
starting checkpoint 17
(CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default),
alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807})
2023-03-10 12:09:42,673 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 put
ChannelStateWriteResult : 17
2023-03-10 12:09:42,675 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
- MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1):
Triggering checkpoint 17 on the barrier announcement at 1678421367671.
2023-03-10 12:09:42,675 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.streaming.runtime.tasks.StreamTask -
triggerCheckpointOnBarrier Starting checkpoint 17
CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD} on
task MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1
2023-03-10 12:09:42,675 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting
checkpoint 17 CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD} on task MV_J_PV ->
mv-join-after-operator -> extract-event-identifier (4517/4800)#1
2023-03-10 12:09:42,675 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1
requested write result, checkpoint 17
2023-03-10 12:09:42,676 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.state.changelog.ChangelogKeyedStateBackend - snapshot of
MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1 for checkpoint 17, change range: 39..46, materialization ID 4
2023-03-10 12:09:42,677 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain - Could not
complete snapshot 17 for operator MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 17 for operator MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:730)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1291)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1236)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:489)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:50)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$registerAlignmentTimer$3(SingleCheckpointBarrierHandler.java:321)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: The upload for 44 has already failed previously
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java:429)
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java:213)
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java:208)
at
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java:402)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246)
... 31 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException):
The directory item limit of
/flink-yg-test01/feifan-changelog-test/dstl/460359d4d9311744142797ba23e69d16/dstl
is exceeded: limit=1048576 items=1048576
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyMaxDirItems(FSDirectory.java:1056)
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.addLastINode(FSDirectory.java:1101)
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.addINode(FSDirectory.java:967)
at
org.apache.hadoop.hdfs.server.namenode.FSDirectory.addFile(FSDirectory.java:531)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2997)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2856)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2691)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:815)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:450)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:713)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:975)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1002)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:923)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1726)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2786) at
org.apache.hadoop.ipc.Client.call(Client.java:1578)
at org.apache.hadoop.ipc.Client.call(Client.java:1505)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
at com.sun.proxy.$Proxy22.create(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:332)
at sun.reflect.GeneratedMethodAccessor181.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy23.create(Unknown Source)
at sun.reflect.GeneratedMethodAccessor181.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.hdfs.RpcResponseHandler.invoke(RpcResponseHandler.java:55)
at com.sun.proxy.$Proxy23.create(Unknown Source)
at
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1982)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1937)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1871)
at
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:452)
at
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:448)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:391)
at org.apache.hadoop.fs.FilterFileSystem.create(FilterFileSystem.java:179)
at
org.apache.hadoop.fs.viewfs.ChRootedFileSystem.create(ChRootedFileSystem.java:189)
at
org.apache.hadoop.fs.viewfs.ViewFileSystem.create(ViewFileSystem.java:323)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1027)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1008)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:154)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
at
org.apache.flink.changelog.fs.DuplicatingStateChangeFsUploader.prepareStream(DuplicatingStateChangeFsUploader.java:96)
at
org.apache.flink.changelog.fs.AbstractStateChangeFsUploader.uploadInternal(AbstractStateChangeFsUploader.java:76)
at
org.apache.flink.changelog.fs.AbstractStateChangeFsUploader.upload(AbstractStateChangeFsUploader.java:69)
at
org.apache.flink.changelog.fs.BatchingStateChangeUploadScheduler$1.tryExecute(BatchingStateChangeUploadScheduler.java:310)
at
org.apache.flink.changelog.fs.BatchingStateChangeUploadScheduler$1.tryExecute(BatchingStateChangeUploadScheduler.java:307)
at
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.run(RetryingExecutor.java:225)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
2023-03-10 12:09:42,677 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1
aborting, checkpoint 17, cleanup:true
2023-03-10 12:09:42,723 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
- MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1):
Received barrier from channel InputChannelInfo{gateIdx=0, inputChannelIdx=324}
@ 17.
2023-03-10 12:09:42,723 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1
starting checkpoint 17
(CheckpointOptions{checkpointType=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}, targetLocation=(default),
alignmentType=UNALIGNED, alignedCheckpointTimeout=9223372036854775807})
2023-03-10 12:09:42,724 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1 put
ChannelStateWriteResult : 17
2023-03-10 12:09:42,724 INFO [Channel state writer MV_J_PV ->
mv-join-after-operator -> extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
- MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1 discarding 645 drained requests
2023-03-10 12:09:42,724 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
- MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1 discarding 1023 drained requests
2023-03-10 12:09:42,725 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.state.common.PeriodicMaterializationManager - Shutting down
PeriodicMaterializationManager.
2023-03-10 12:09:42,725 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl - MV_J_PV
-> mv-join-after-operator -> extract-event-identifier (4517/4800)#1
aborting, checkpoint 17, cleanup:false
2023-03-10 12:09:42,726 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
- MV_J_PV -> mv-join-after-operator -> extract-event-identifier
(4517/4800)#1 discarding 1 drained requests
2023-03-10 12:09:43,099 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Closed
RocksDB State Backend. Cleaning up RocksDB working directory
/data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671247042382_4677267/tm_container_e74_1671247042382_4677267_01_000039/tmp/job_460359d4d9311744142797ba23e69d16_op_KeyedCoProcessOperator_4f7e0f4c19a43f929bda6907ee1f3150__4517_4800__uuid_785a6359-d191-4691-9272-d0590d2d696b.
2023-03-10 12:09:43,125 WARN [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.taskmanager.Task - MV_J_PV ->
mv-join-after-operator -> extract-event-identifier (4517/4800)#1
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1)
switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException:
unable to send request to worker
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.io.IOException: java.lang.IllegalStateException: writer
not found for request start 17
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
... 3 more
Caused by: java.lang.IllegalStateException: writer not found for request
start 17
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
... 1 more
Caused by: java.lang.IllegalStateException: not running
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244)
... 27 more
[CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found for
request start 17]
2023-03-10 12:09:43,125 INFO [MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1]
org.apache.flink.runtime.taskmanager.Task - Freeing task
resources for MV_J_PV -> mv-join-after-operator ->
extract-event-identifier (4517/4800)#1
(cb2e56879557c676c9897cda44fe3c9e_4f7e0f4c19a43f929bda6907ee1f3150_4516_1).
{code}
Later I will try to write a test to explain the problem.
> exceptions in the alignment timer are ignored
> ---------------------------------------------
>
> Key: FLINK-31414
> URL: https://issues.apache.org/jira/browse/FLINK-31414
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Reporter: Feifan Wang
> Priority: Major
> Labels: pull-request-available
>
> Alignment timer task in alternating aligned checkpoint run as a future task
> in mailbox thread, causing the exceptions
> ([SingleCheckpointBarrierHandler#registerAlignmentTimer()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java#L327])
> to be ignored. These exceptions should have failed the task, but now this
> will cause the same checkpoint to fire twice initInputsCheckpoints in my test.
>
> {code:java}
> switched from RUNNING to FAILED with failure cause:
> java.lang.RuntimeException: unable to send request to worker
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:247)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:161)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.prepareSnapshot(StreamTaskNetworkInput.java:103)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.prepareSnapshot(StreamOneInputProcessor.java:83)
> at
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.prepareSnapshot(StreamMultipleInputProcessor.java:122)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.prepareInputSnapshot(StreamTask.java:518)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.prepareInflightDataSnapshot(SubtaskCheckpointCoordinatorImpl.java:655)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint(SubtaskCheckpointCoordinatorImpl.java:515)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.initInputsCheckpoint(SingleCheckpointBarrierHandler.java:516)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriers.alignmentTimeout(AlternatingCollectingBarriers.java:46)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:54)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.io.IOException: java.lang.IllegalStateException:
> writer not found for request start 17
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.close(ChannelStateWriteRequestExecutorImpl.java:175)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.close(ChannelStateWriterImpl.java:235)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cancel(SubtaskCheckpointCoordinatorImpl.java:564)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:551)
> at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
> at
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
> at
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> ... 3 more
> Caused by: java.lang.IllegalStateException: writer not found for
> request start 17
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:75)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:62)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
> ... 1 more
> Caused by: java.lang.IllegalStateException: not running
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:244)
> ... 27 more
> [CIRCULAR REFERENCE:java.lang.IllegalStateException: writer not found
> for request start 17] {code}
>
>
> see :
> [BarrierAlignmentUtil#createRegisterTimerCallback()|https://github.com/apache/flink/blob/65ab8e820a3714d2134dfb4c9772a10c998bd45a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java#L50]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)