[
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682833#comment-17682833
]
Yanfei Lei commented on FLINK-28440:
------------------------------------
[~Feifan Wang] Thanks for the investigation, I don't think the materialization
in Figure 1 is quite accurate. Generally speaking, the materialization interval
is much longer than checkpoint interval. Once materialization will record all
the previous state changes to the snapshot of delegated state backends(like
RocksDB), and clean up the existing DSTL. If there is no materialization, the
DSTL file will grow infinitely.
As you mentioned, the subsequent checkpoint will contain the state changes of
the previous checkpoint:
{code:java}
chk-1: m0,c0,c1
chk-2: m0,c0,c1,c2,c3
{code}
Before chk-1 subsumed, TM would call `TaskChangelogRegistry.stopTracking()` to
transfer control of DSTL files to JM. In most cases, JM decides when to delete
files, and on JM side, each `ChangelogStateBackendHandle.registerSharedStates`
would update the `SharedStateEntry.lastUsedCheckpointID` of DSTL file, for
example, when chk-1 subsumed, the lastUsedCheckpointID of *c0* and *c1* are
{*}chk-2{*}, so the c0,c1 would *not* be discarded.
On TM side, for `TaskChangelogRegistry.notUsed()`, if the
`TaskChangelogRegistry.stopTracking()` be called before it,
`TaskChangelogRegistry.notUsed()` would return directly at the first if
statement.
{code:java}
public void notUsed(StreamStateHandle handle, UUID backendId) {
PhysicalStateHandleID key = handle.getStreamStateHandleID();
Set<UUID> backends = entries.get(key);
if (backends == null) {
return; // if stopTracking() called before this, return directly, no
more discard.
}
backends.remove(backendId);
if (backends.isEmpty() && entries.remove(key) != null) {
scheduleDiscard(handle);
}
} {code}
Your test does not consider `TaskChangelogRegistry.stopTracking()`, may it is
inaccurate.
> EventTimeWindowCheckpointingITCase failed with restore
> ------------------------------------------------------
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.16.0, 1.17.0
> Reporter: Huang Xingbo
> Assignee: Yanfei Lei
> Priority: Critical
> Labels: auto-deprioritized-critical, pull-request-available,
> test-stability
> Attachments: image-2023-02-01-00-51-54-506.png,
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
> ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException:
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
> (No such file or directory)
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
> at
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
> at
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.FileNotFoundException:
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> at
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
> at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:72)
> at
> org.apache.flink.changelog.fs.StateChangeFormat.read(StateChangeFormat.java:92)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
> ... 21 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37772&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=8916
> Other tests where this stacktrace was observed in test failures is
> {{ChangelogRecoveryITCase}} (FLINK-30107) and
> {{ChangelogRecoverySwitchStateBackendITCase}} (FLINK-28898).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)