[
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843125#comment-15843125
]
ASF GitHub Bot commented on FLINK-5663:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/3228
[FLINK-5663] Prevent leaking SafetyNetCloseableRegistry though
InheritableThreadLocal
This PR prevents the `SafetyNetCloseableRegistry` from leaking into pooled
threads through `InheritableThreadLocal`.
As first step, we use `ThreadLocal` instead of `InheritableThreadLocal` to
hold the closeable registries.
Additionally, we also create safety nets for the file system at the scope
of the checkpointing thread. We hope that this covers already covers most
cases. Other threads could actually also create safety nets for their scope
right now.
As a last change, we made the reaper thread a singleton, because we could
potentially create more registries now and it is not required to have one
reaper thread per registry.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink safetyNet2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3228.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3228
----
commit 21e2a31ece3c56a9d79cb127f9829f770ebe56cf
Author: Stefan Richter <[email protected]>
Date: 2017-01-27T15:32:35Z
[FLINK-5663] Prevent leaking safetynet closeable registry
----
> Checkpoint fails because of closed registry
> -------------------------------------------
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Stefan Richter
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source (3/8)
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom
> Source (3/8)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator
> Source: Custom Source (3/8).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
> ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator
> Source: Custom Source (3/8).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
> ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
> ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is
> already closed. Closing argument.
> at
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
> ... 15 more
> {code}
> The job recovered and kept running.
> [[email protected]] Can this be a race with the closable registry?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)