[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683655#comment-17683655
 ] 

Feifan Wang commented on FLINK-28440:
-------------------------------------

Hi [~ym] , [~roman] , I communicated with [~Yanfei Lei]  offline, we think 
there are two situations that will cause the changelog file not found problem。
h2. Case-1 :

first is the “First case” I mentioned above :

!image-2023-02-03-12-03-16-155.png|width=708,height=315!

 
  # checkpoint-1 trigger and completed, upTo sqn=1
 # materialization-1 triggered, upTo sqn=2. ChangeSet[sqn=1] will be added to 
{*}_notUploaded_{*}.
 # materialization-1 completed, upTo sqn=3
 # checkpoint-2 trigger and completed, upTo sqn=3. Since materialization-1 has 
been completed when checkpoint-2 is triggered, checkpoint-2 is from sqn=2. 
ChangeSet[sqn=1] stays in notUploaded.
 # checkpoint-3 trigger and complated, upTo sqn=4.
 # before chk-2 subsumed, ChangeSet[sqn=4] trigger a pre-emptively upload, 
ChangeSet[sqn=1] and ChangeSet[sqn=4] are saved in the same file.
 # checkpoint-2 subsumed, FsStateChangelogWriter try to truncate to sqn=2. 
ChangeSet[sqn=1] will be delete, but also delete ChangeSet[sqn=4] which in the 
same file.
 # checkpoint-4 trigger and completed, it contains changelog [sqn=2 ~ sqn=4], 
but ChangeSet[sqn=4] already be deleted.

h2. Case-2 :

case-2 is provided by yanfei :

!image-2023-02-03-12-03-56-614.png|width=632,height=308!
 # checkpoint-1 trigger , upTo sqn=1. But BatchingStateChangeUploadScheduler is 
waiting for persist-delay
 # ChangeSet[sqn=1] trigger a pre-emptively upload, ChangeSet[sqn=0] and 
ChangeSet[sqn=1] are saved in the same file, and checkpoint-1 completed.
 # the task was canceled for other reasons before checkpoint-1 was confirmed 
(JobManager completed checkpoint-1 , but the confirm message has not yet 
reached the task). FsStateChangelogWriter try to truncateAndClose from sqn=1, 
ChangeSet[sqn=1] will be delete, but also delete ChangeSet[sqn=0] which in the 
same file.

h2. How to reproduce :

FsStateChangelogWriterTest#testChangelogFileAvailable and 
FsStateChangelogWriterTest#testChangelogFileAvailableAgain [in this 
PR|https://github.com/apache/flink/pull/21812] correspond to case-1 and case-2 
above respectively.
h2. Problem analysis :

Like I mentioned above, I think the source of the problem is 
FsStateChangelogWriter#notifyStateNotUsed. A SteamStateHandle may correspond to 
multiple UploadResults. We cannot assume that the entire StreamStateHandle is 
no longer needed by this backend when a certain UploadResult is not used.

!image-2023-02-01-01-19-12-182.png|width=652,height=268!
h2. Proposal :

I propose to record the reference count of the StreamStateHandle in the 
TaskChangelogRegistry instead of the backend collection. 
TaskChangelogRegistry#notUsed(streamStateHandle, uploadResult) only decrements 
the reference count by one, and deletes the steamStateHandle when the reference 
count reaches zero.

What do you think  [~ym] , [~roman]  ?

> EventTimeWindowCheckpointingITCase failed with restore
> ------------------------------------------------------
>
>                 Key: FLINK-28440
>                 URL: https://issues.apache.org/jira/browse/FLINK-28440
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Huang Xingbo
>            Assignee: Yanfei Lei
>            Priority: Critical
>              Labels: auto-deprioritized-critical, pull-request-available, 
> test-stability
>         Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>       ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>       at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>       at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>       at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>       at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>       at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>       at java.io.FileInputStream.open0(Native Method)
>       at java.io.FileInputStream.open(FileInputStream.java:195)
>       at java.io.FileInputStream.<init>(FileInputStream.java:138)
>       at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>       at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
>       at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
>       at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:72)
>       at 
> org.apache.flink.changelog.fs.StateChangeFormat.read(StateChangeFormat.java:92)
>       at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
>       ... 21 more
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37772&view=logs&j=4d4a0d10-fca2-5507-8eed-c07f0bdf4887&t=7b25afdf-cc6c-566f-5459-359dc2585798&l=8916
> Other tests where this stacktrace was observed in test failures is 
> {{ChangelogRecoveryITCase}} (FLINK-30107) and 
> {{ChangelogRecoverySwitchStateBackendITCase}} (FLINK-28898).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to