[
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843170#comment-15843170
]
ASF GitHub Bot commented on FLINK-5663:
---------------------------------------
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/3228
Making the registry not inherited is a good quick fix.
How about moving the initialization and closing of that registry into the
task's trigger checkpoint action?
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1119
That way it is tied to the dedicated asynchronous thread, which is safer, I
think. Having the registry initialization/closing in the
`triggerCheckpoint(...)` method means the method can never be called by the
main thread. It probably does not happen currently, but it seems quite easy to
accidentally violate.
There is also some additional refactoring in this pull request that makes
the reaper thread a static variable, shared across all registries. Currently
that thread seems to never stopped by anyone. How about factoring out the
changes for the single static reaper thread into a separate issue (that we do
not merge for 1.2) and addressing the thread stopping as well (via an atomic
count how many registries are currently open, stopping it when ging to zero,
starting when going from zero).
> Checkpoint fails because of closed registry
> -------------------------------------------
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Stefan Richter
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source (3/8)
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom
> Source (3/8)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator
> Source: Custom Source (3/8).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
> ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator
> Source: Custom Source (3/8).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
> ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
> ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is
> already closed. Closing argument.
> at
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
> ... 15 more
> {code}
> The job recovered and kept running.
> [[email protected]] Can this be a race with the closable registry?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)