[ 
https://issues.apache.org/jira/browse/FLINK-30561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655240#comment-17655240
 ] 

Yanfei Lei commented on FLINK-30561:
------------------------------------

Hi [~Feifan Wang], could you please share what circumstances the error occurs? 

> This happens when changeIterator.read(tuple2.f0, tuple2.f1) throws an 
> exception (for example, when the task is canceled for other reasons during 
> the restore process) 

IIUC, when the task is canceled for other reason, the whole job is canceled, 
for the next restart, the `FileNotFoundException` will not affect the next run, 
and the refCount would be reset in the next run. Will the previous refCount 
still be used after the job is canceled?

> ChangelogStreamHandleReaderWithCache cause FileNotFoundException
> ----------------------------------------------------------------
>
>                 Key: FLINK-30561
>                 URL: https://issues.apache.org/jira/browse/FLINK-30561
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.16.0
>            Reporter: Feifan Wang
>            Priority: Major
>
> When a job with state changelog enabled continues to restart, the following 
> exceptions may occur :
> {code:java}
> java.lang.RuntimeException: java.io.FileNotFoundException: 
> /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
>  (No such file or directory)
>     at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>     at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>     at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>     at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107)
>     at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78)
>     at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94)
>     at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: 
> /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
>  (No such file or directory)
>     at java.io.FileInputStream.open0(Native Method)
>     at java.io.FileInputStream.open(FileInputStream.java:195)
>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>     at 
> org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158)
>     at 
> org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95)
>     at 
> org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42)
>     at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
>     ... 21 more {code}
> *Problem causes:*
>  # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local 
> cache file. The reference count is incremented when the input stream is 
> opened from the cache file, and decremented by one when the input stream is 
> closed. So the input stream must be closed and only once.
>  # _*StateChangelogHandleStreamHandleReader#getChanges()*_ may cause the 
> input stream to be closed twice. This happens when 
> changeIterator.read(tuple2.f0, tuple2.f1) throws an exception (for example, 
> when the task is canceled for other reasons during the restore process) the 
> current state change iterator will be closed twice.
> {code:java}
> private void advance() {
>     while (!current.hasNext() && handleIterator.hasNext()) {
>         try {
>             current.close();
>             Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
>             LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
>             current = changeIterator.read(tuple2.f0, tuple2.f1);
>         } catch (Exception e) {
>             ExceptionUtils.rethrow(e);
>         }
>     }
> }
> @Override
> public void close() throws Exception {
>     current.close();
> }{code}
> So show make sure current state change iterator only be closed once. I 
> suggest to make the following changes to 
> _StateChangelogHandleStreamHandleReader **_ :
> {code:java}
> private boolean currentClosed = false;
> private void advance() {
>     while (!current.hasNext() && handleIterator.hasNext()) {
>         try {
>             current.close();
>             currentClosed = true;
>             Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
>             LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
>             current = changeIterator.read(tuple2.f0, tuple2.f1);
>             currentClosed = false;
>         } catch (Exception e) {
>             ExceptionUtils.rethrow(e);
>         }
>     }
> }
> @Override
> public void close() throws Exception {
>     if (!currentClosed) {
>         current.close();
>     }
> }{code}
>  
> cc [~yuanmei] , [~roman] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to