[
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682677#comment-17682677
]
Feifan Wang commented on FLINK-28440:
-------------------------------------
Hi [~Yanfei Lei] ,[~ym] ,[~roman] , We also encountered the problem of
changelog FileNotFoundException during restore. After a period of
investigation, I found two reasons for this problem. They are all caused by
pre-emptive upload and BatchingStateChangeUploadScheduler :
h3. *First case :*
Pre-emptive upload causes the stale StateChangeSet to be uploaded into the same
file along with the latest StateChangeSet.
!image-2023-02-01-00-51-54-506.png|width=617,height=342!
The small square with a number in the middle represents a StateChange, and the
number in it indicates the SequenceNumber to which the StateChange belongs.
StateChange-3 triggers a pre-emptive upload, and then StateChange-1 and
StateChange-3 are uploaded to the same file.
{code:java}
if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
LOG.info(
"pre-emptively flush {}MB of appended changes to the common store",
activeChangeSetSize / 1024 / 1024);
persistInternal(notUploaded.isEmpty() ? activeSequenceNumber :
notUploaded.firstKey());
} {code}
When checkpoint-3 is completed, checkpoint-2 will be resumed. When checkpoint-3
is completed, checkpoint-2 will be resumed. Then StateChange-1 will be
truncate, further causing file-3 to be deleted. If restoring from checkpoint-3,
a FileNotFoundException will occur.
h3. *Second case :*
!image-2023-02-01-01-10-01-521.png|width=484,height=391!
StateChange-0,1,2 are uploaded in the same file due to
dstl.dfs.batch.persist-delay and pre-emptive. Checkpoint-1 will be resume after
checkpoint-2 complete, then file-1 will be delete. But actually checkpoint-2
needs file-1 ( checkpoint-2 needs StateChange-2 ).
h3. *Summarize :*
The root of the problem is FsStateChangelogWriter#notifyStateNotUsed.
!image-2023-02-01-01-19-12-182.png|width=759,height=312!
I suggest to record the largest SequenceNumber in each StreamStateHandle in the
FsStateChangelogWriter and checking it before calling
changelogRegistry.notUsed().
WDYT [~ym] ,[~roman] ,[~Yanfei Lei] ?
> EventTimeWindowCheckpointingITCase failed with restore
> ------------------------------------------------------
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.16.0, 1.17.0
> Reporter: Huang Xingbo
> Assignee: Yanfei Lei
> Priority: Critical
> Labels: auto-deprioritized-critical, test-stability
> Attachments: image-2023-02-01-00-51-54-506.png,
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
> ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException:
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
> (No such file or directory)
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
> at
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
> at
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.FileNotFoundException:
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
> at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:72)
> at
> org.apache.flink.changelog.fs.StateChangeFormat.read(StateChangeFormat.java:92)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
> ... 21 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37772&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=8916
> Other tests where this stacktrace was observed in test failures is
> {{ChangelogRecoveryITCase}} (FLINK-30107) and
> {{ChangelogRecoverySwitchStateBackendITCase}} (FLINK-28898).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)