[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682476#comment-17682476
 ] 

Yanfei Lei edited comment on FLINK-28440 at 2/3/23 2:17 AM:
------------------------------------------------------------

I suspect that it is caused by the concurrency of the snapshot thread and the 
materialization thread.
h2. The DSTL lifecycle in a checkpoint:

TM side:
 # {*}Snapshot sync phase{*}: ChangelogKeyedStateBackend.snapshot() -> 
RunnableFuture stateChangelogWriter.persist()
 # {*}Snapshot async phase{*}: Uploader upload DSTL, call 
TaskChangelogRegistry.startTracking()
 # {*}Snapshot async phase{*}: The callback of upload, build snapshot result: 
FsStateChangelogWriter.handleUploadSuccess()
 # {*}Snapshot async phase{*}: Report snapshot result to JM: 
SubtaskCheckpointCoordinatorImpl.checkpointState() -> finishAndReportAsync()

JM side:
 # {*}JM write metadata file{*}: 
CheckpointCoordinator.completePendingCheckpoint() -> finalizeCheckpoint() -> 
Checkpoints.storeCheckpointMetadata
 # {*}JM notify TM checkpoint is completed{*}: 
CheckpointCoordinator.completePendingCheckpoint() -> 
cleanupAfterCompletedCheckpoint() -> sendAcknowledgeMessages()

TM side:
 * *[{color:#ff0000}Bug{color}]* The pre-emptive upload is triggered 
immediately after the last checkpoint. Due to the existence of batch upload, 
exactly two changesets are placed in the same file. At this time, the task on 
TM is canceled, *truncateAndClose()* is called, the pre-emptive file is 
deleted, and part of the changeset of the last checkpoint is also deleted.
 * ChangelogKeyedStateBackend.notifyCheckpointComplete() -> 
StateChangelogWriter.confirm() -> TaskChangelogRegistry.stopTracking()

 

After discussing with [~Feifan Wang] offline, we plan to record the range SQN 
of StreamStateHandle and check the range before discarding StreamStateHandle.

!image-2023-02-03-10-09-07-586.png!


was (Author: yanfei lei):
I suspect that it is caused by the concurrency of the snapshot thread and the 
materialization thread.
h2. The DSTL lifecycle in a checkpoint:

TM side:
 # {*}Snapshot sync phase{*}: ChangelogKeyedStateBackend.snapshot() -> 
RunnableFuture stateChangelogWriter.persist()
 # {*}Snapshot async phase{*}: Uploader upload DSTL, call 
TaskChangelogRegistry.startTracking()
 # {*}Snapshot async phase{*}: The callback of upload, build snapshot result: 
FsStateChangelogWriter.handleUploadSuccess()
 # {*}Snapshot async phase{*}: Report snapshot result to JM: 
SubtaskCheckpointCoordinatorImpl.checkpointState() -> finishAndReportAsync()

JM side:
 # {*}JM write metadata file{*}: 
CheckpointCoordinator.completePendingCheckpoint() -> finalizeCheckpoint() -> 
Checkpoints.storeCheckpointMetadata
 # {*}JM notify TM checkpoint is completed{*}: 
CheckpointCoordinator.completePendingCheckpoint() -> 
cleanupAfterCompletedCheckpoint() -> sendAcknowledgeMessages()

TM side:
 * *[{color:#FF0000}Bug{color}]* The next materialization finish, *delete the 
original DSTL files,* just met the job failover, the job would *restore from 
the previous materialization and some DSTL files* as JM write to metadata file: 
PeriodicMaterializationManager.handleMaterializationResult() -> 
changelogTruncateHelper.materialized() -> truncate() -> 
TaskChangelogRegistry.notUsed()
 * ChangelogKeyedStateBackend.notifyCheckpointComplete() -> 
StateChangelogWriter.confirm() -> TaskChangelogRegistry.stopTracking()

h2. How to reproduce:
 # Comment out stopTracking() to simulate "the truncate materialization happens 
before checkpoint complete"

{code:java}
public void stopTracking(StreamStateHandle handle) {
   LOG.info(
   "stop tracking state, key: {}, state: {}",     
handle.getStreamStateHandleID(), handle);
//entries.remove(handle.getStreamStateHandleID());
} {code}
     2. Enable changelog in EventTimeWindowCheckpointingITCase explicitly, set 
checkpoint interval as 100ms, materialization interval as 5s.

     3. FLINK-30107 I'm guessing that 
ChangelogRecoveryITCase#testMaterialization() fails for the same reason, which 
can be reproduced using the same method.

[~roman] what do you think? If the suspicion is correct, I feel it is a bit 
difficult to fix, do you have any suggestions?

> EventTimeWindowCheckpointingITCase failed with restore
> ------------------------------------------------------
>
>                 Key: FLINK-28440
>                 URL: https://issues.apache.org/jira/browse/FLINK-28440
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Huang Xingbo
>            Assignee: Yanfei Lei
>            Priority: Critical
>              Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
>         Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>       ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>       at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>       at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>       at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>       at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>       at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>       at java.io.FileInputStream.open0(Native Method)
>       at java.io.FileInputStream.open(FileInputStream.java:195)
>       at java.io.FileInputStream.<init>(FileInputStream.java:138)
>       at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>       at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
>       at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
>       at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:72)
>       at 
> org.apache.flink.changelog.fs.StateChangeFormat.read(StateChangeFormat.java:92)
>       at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
>       ... 21 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37772&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=8916
> Other tests where this stacktrace was observed in test failures is 
> {{ChangelogRecoveryITCase}} (FLINK-30107) and 
> {{ChangelogRecoverySwitchStateBackendITCase}} (FLINK-28898).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to