[
https://issues.apache.org/jira/browse/FLINK-30561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-30561:
-----------------------------------
Labels: pull-request-available (was: )
> ChangelogStreamHandleReaderWithCache cause FileNotFoundException
> ----------------------------------------------------------------
>
> Key: FLINK-30561
> URL: https://issues.apache.org/jira/browse/FLINK-30561
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.16.0
> Reporter: Feifan Wang
> Priority: Major
> Labels: pull-request-available
>
> When a job with state changelog enabled continues to restart, the following
> exceptions may occur :
> {code:java}
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
> (No such file or directory)
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107)
> at
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78)
> at
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94)
> at
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException:
> /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
> org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158)
> at
> org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95)
> at
> org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42)
> at
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
> ... 21 more {code}
> *Problem causes:*
> # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local
> cache file. The reference count is incremented when the input stream is
> opened from the cache file, and decremented by one when the input stream is
> closed. So the input stream must be closed and only once.
> # _*StateChangelogHandleStreamHandleReader#getChanges()*_ may cause the
> input stream to be closed twice. This happens when
> changeIterator.read(tuple2.f0, tuple2.f1) throws an exception (for example,
> when the task is canceled for other reasons during the restore process) the
> current state change iterator will be closed twice.
> {code:java}
> private void advance() {
> while (!current.hasNext() && handleIterator.hasNext()) {
> try {
> current.close();
> Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
> LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
> current = changeIterator.read(tuple2.f0, tuple2.f1);
> } catch (Exception e) {
> ExceptionUtils.rethrow(e);
> }
> }
> }
> @Override
> public void close() throws Exception {
> current.close();
> }{code}
> So we should make sure current state change iterator only be closed once. I
> suggest to make the following changes to
> _*StateChangelogHandleStreamHandleReader*_ :
> {code:java}
> private boolean currentClosed = false;
> private void advance() {
> while (!current.hasNext() && handleIterator.hasNext()) {
> try {
> current.close();
> currentClosed = true;
> Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
> LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
> current = changeIterator.read(tuple2.f0, tuple2.f1);
> currentClosed = false;
> } catch (Exception e) {
> ExceptionUtils.rethrow(e);
> }
> }
> }
> @Override
> public void close() throws Exception {
> if (!currentClosed) {
> current.close();
> }
> }{code}
>
> cc [~yuanmei] , [~roman] .
--
This message was sent by Atlassian Jira
(v8.20.10#820010)