[
https://issues.apache.org/jira/browse/FLINK-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843316#comment-15843316
]
ASF GitHub Bot commented on FLINK-5663:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/3229
[FLINK-5663] Prevent leaking SafetyNetCloseableRegistry through Inher…
This PR prevents the SafetyNetCloseableRegistry from leaking into pooled
threads through InheritableThreadLocal.
As first step, we use ThreadLocal instead of InheritableThreadLocal to hold
the closeable registries.
Additionally, we also create safety nets for the file system at the scope
of the checkpointing thread. We hope that this covers already covers most
cases. Other threads could actually also create safety nets for their scope
right now.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink safetyNetThreadLocal
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3229.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3229
----
commit 31ae956fdd83a2f65bf22c3ad601d4d65ad61439
Author: Stefan Richter <[email protected]>
Date: 2017-01-27T18:47:12Z
[FLINK-5663] Prevent leaking SafetyNetCloseableRegistry through
InheritableThreadLocal
----
> Checkpoint fails because of closed registry
> -------------------------------------------
>
> Key: FLINK-5663
> URL: https://issues.apache.org/jira/browse/FLINK-5663
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Stefan Richter
>
> While testing the 1.2.0 release I got the following Exception:
> {code}
> 2017-01-26 17:29:20,602 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source (3/8)
> (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
> java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom
> Source (3/8)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator
> Source: Custom Source (3/8).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
> ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 2 for operator
> Source: Custom Source (3/8).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
> ... 6 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> ... 11 more
> Caused by: java.io.IOException: Could not open output stream for state backend
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
> ... 13 more
> Caused by: java.io.IOException: Cannot register Closeable, registry is
> already closed. Closing argument.
> at
> org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
> ... 15 more
> {code}
> The job recovered and kept running.
> [[email protected]] Can this be a race with the closable registry?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)