[
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682476#comment-17682476
]
Yanfei Lei edited comment on FLINK-28440 at 2/3/23 2:18 AM:
------------------------------------------------------------
I suspect that it is caused by the concurrency of the snapshot thread and the
materialization thread.
h2. The DSTL lifecycle in a checkpoint:
TM side:
# {*}Snapshot sync phase{*}: ChangelogKeyedStateBackend.snapshot() ->
RunnableFuture stateChangelogWriter.persist()
# {*}Snapshot async phase{*}: Uploader upload DSTL, call
TaskChangelogRegistry.startTracking()
# {*}Snapshot async phase{*}: The callback of upload, build snapshot result:
FsStateChangelogWriter.handleUploadSuccess()
# {*}Snapshot async phase{*}: Report snapshot result to JM:
SubtaskCheckpointCoordinatorImpl.checkpointState() -> finishAndReportAsync()
JM side:
# {*}JM write metadata file{*}:
CheckpointCoordinator.completePendingCheckpoint() -> finalizeCheckpoint() ->
Checkpoints.storeCheckpointMetadata
# {*}JM notify TM checkpoint is completed{*}:
CheckpointCoordinator.completePendingCheckpoint() ->
cleanupAfterCompletedCheckpoint() -> sendAcknowledgeMessages()
TM side:
* *[{color:#ff0000}Bug{color}]* The pre-emptive upload is triggered
immediately after the last checkpoint. Due to the existence of batch upload,
exactly two changesets are placed in the same file. At this time, the task on
TM is canceled, *truncateAndClose()* is called, the pre-emptive file is
deleted, and part of the changeset of the last checkpoint is also deleted.
* ChangelogKeyedStateBackend.notifyCheckpointComplete() ->
StateChangelogWriter.confirm() -> TaskChangelogRegistry.stopTracking()
After discussing with [~Feifan Wang] offline, we plan to record the range SQN
of StreamStateHandle and check the range before discarding StreamStateHandle.
!image-2023-02-03-10-09-07-586.png|width=504,height=314!
was (Author: yanfei lei):
I suspect that it is caused by the concurrency of the snapshot thread and the
materialization thread.
h2. The DSTL lifecycle in a checkpoint:
TM side:
# {*}Snapshot sync phase{*}: ChangelogKeyedStateBackend.snapshot() ->
RunnableFuture stateChangelogWriter.persist()
# {*}Snapshot async phase{*}: Uploader upload DSTL, call
TaskChangelogRegistry.startTracking()
# {*}Snapshot async phase{*}: The callback of upload, build snapshot result:
FsStateChangelogWriter.handleUploadSuccess()
# {*}Snapshot async phase{*}: Report snapshot result to JM:
SubtaskCheckpointCoordinatorImpl.checkpointState() -> finishAndReportAsync()
JM side:
# {*}JM write metadata file{*}:
CheckpointCoordinator.completePendingCheckpoint() -> finalizeCheckpoint() ->
Checkpoints.storeCheckpointMetadata
# {*}JM notify TM checkpoint is completed{*}:
CheckpointCoordinator.completePendingCheckpoint() ->
cleanupAfterCompletedCheckpoint() -> sendAcknowledgeMessages()
TM side:
* *[{color:#ff0000}Bug{color}]* The pre-emptive upload is triggered
immediately after the last checkpoint. Due to the existence of batch upload,
exactly two changesets are placed in the same file. At this time, the task on
TM is canceled, *truncateAndClose()* is called, the pre-emptive file is
deleted, and part of the changeset of the last checkpoint is also deleted.
* ChangelogKeyedStateBackend.notifyCheckpointComplete() ->
StateChangelogWriter.confirm() -> TaskChangelogRegistry.stopTracking()
After discussing with [~Feifan Wang] offline, we plan to record the range SQN
of StreamStateHandle and check the range before discarding StreamStateHandle.
!image-2023-02-03-10-09-07-586.png!
> EventTimeWindowCheckpointingITCase failed with restore
> ------------------------------------------------------
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.16.0, 1.17.0
> Reporter: Huang Xingbo
> Assignee: Yanfei Lei
> Priority: Critical
> Labels: auto-deprioritized-critical, pull-request-available,
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png,
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png,
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png,
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
> ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException:
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
> (No such file or directory)
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
> at
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
> at
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.FileNotFoundException:
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
> at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:72)
> at
> org.apache.flink.changelog.fs.StateChangeFormat.read(StateChangeFormat.java:92)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
> ... 21 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37772&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=8916
> Other tests where this stacktrace was observed in test failures is
> {{ChangelogRecoveryITCase}} (FLINK-30107) and
> {{ChangelogRecoverySwitchStateBackendITCase}} (FLINK-28898).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)