Feifan Wang created FLINK-30561:
-----------------------------------
Summary: ChangelogStreamHandleReaderWithCache cause
FileNotFoundException
Key: FLINK-30561
URL: https://issues.apache.org/jira/browse/FLINK-30561
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 1.16.0
Reporter: Feifan Wang
When a job with state changelog enabled continues to restart, the following
exceptions may occur :
{code:java}
java.lang.RuntimeException: java.io.FileNotFoundException:
/data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
(No such file or directory)
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
at
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
at
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107)
at
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78)
at
org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94)
at
org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException:
/data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158)
at
org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95)
at
org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42)
at
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
... 21 more {code}
*Problem causes:*
# *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local
cache file. The reference count is incremented when the input stream is opened
from the cache file, and decremented by one when the input stream is closed. So
the input stream must be closed and only once.
# _*StateChangelogHandleStreamHandleReader#getChanges()*_ may cause the input
stream to be closed twice. This happens when changeIterator.read(tuple2.f0,
tuple2.f1) throws an exception (for example, when the task is canceled for
other reasons during the restore process) the current state change iterator
will be closed twice.
{code:java}
private void advance() {
while (!current.hasNext() && handleIterator.hasNext()) {
try {
current.close();
Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
current = changeIterator.read(tuple2.f0, tuple2.f1);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
}
@Override
public void close() throws Exception {
current.close();
}{code}
So show make sure current state change iterator only be closed once. I suggest
to make the following changes to _StateChangelogHandleStreamHandleReader **_ :
{code:java}
private boolean currentClosed = false;
private void advance() {
while (!current.hasNext() && handleIterator.hasNext()) {
try {
current.close();
currentClosed = true;
Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
current = changeIterator.read(tuple2.f0, tuple2.f1);
currentClosed = false;
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
}
@Override
public void close() throws Exception {
if (!currentClosed) {
current.close();
}
}{code}
cc [~yuanmei] , [~roman] .
--
This message was sent by Atlassian Jira
(v8.20.10#820010)