Repository: flink Updated Branches: refs/heads/master 0aa9db078 -> 50b665677
[FLINK-5663] [runtime] Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal This closes #3229 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50b66567 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50b66567 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50b66567 Branch: refs/heads/master Commit: 50b665677831529ee492ceda40a3c8fd750d62ff Parents: 0aa9db0 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Fri Jan 27 19:47:12 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Jan 29 14:58:30 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/core/fs/FileSystem.java | 21 +++--- .../core/fs/SafetyNetCloseableRegistryTest.java | 71 +++++++++++++++++++- .../apache/flink/runtime/taskmanager/Task.java | 12 +++- 3 files changed, 88 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/50b66567/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 33addbb..d8efcbc 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.IOUtils; import org.apache.flink.util.OperatingSystem; - +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,7 @@ public abstract class FileSystem { // ------------------------------------------------------------------------ - private static final InheritableThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new InheritableThreadLocal<>(); + private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>(); private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem"; @@ -99,14 +99,15 @@ public abstract class FileSystem { * main thread. */ @Internal - public static void createFileSystemCloseableRegistryForTask() { + public static void createAndSetFileSystemCloseableRegistryForThread() { SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get(); - if (null != oldRegistry) { - IOUtils.closeQuietly(oldRegistry); - LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it."); - } + Preconditions.checkState(null == oldRegistry, + "Found old CloseableRegistry " + oldRegistry + + ". This indicates a leak of the InheritableThreadLocal through a ThreadPool!"); + SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); REGISTRIES.set(newRegistry); + LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName()); } /** @@ -114,7 +115,7 @@ public abstract class FileSystem { * main thread or when the task should be canceled. */ @Internal - public static void disposeFileSystemCloseableRegistryForTask() { + public static void closeAndDisposeFileSystemCloseableRegistryForThread() { SafetyNetCloseableRegistry registry = REGISTRIES.get(); if (null != registry) { LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); @@ -123,7 +124,7 @@ public abstract class FileSystem { } } - private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) { + private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) { SafetyNetCloseableRegistry reg = REGISTRIES.get(); return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs; } @@ -306,7 +307,7 @@ public abstract class FileSystem { * thrown if a reference to the file system instance could not be obtained */ public static FileSystem get(URI uri) throws IOException { - return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri)); + return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/50b66567/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java index 6628407..40856b4 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java @@ -19,11 +19,11 @@ package org.apache.flink.core.fs; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.io.Closeable; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; public class SafetyNetCloseableRegistryTest { @@ -32,7 +32,6 @@ public class SafetyNetCloseableRegistryTest { private SafetyNetCloseableRegistry closeableRegistry; private AtomicInteger unclosedCounter; - @Before public void setup() { this.closeableRegistry = new SafetyNetCloseableRegistry(); this.unclosedCounter = new AtomicInteger(0); @@ -56,8 +55,74 @@ public class SafetyNetCloseableRegistryTest { } @Test + public void testCorrectScopesForSafetyNet() throws Exception { + Thread t1 = new Thread() { + @Override + public void run() { + try { + FileSystem fs1 = FileSystem.getLocalFileSystem(); + // ensure no safety net in place + Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem); + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + fs1 = FileSystem.getLocalFileSystem(); + // ensure safety net is in place now + Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem); + Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString()); + try (FSDataOutputStream stream = fs1.create(tmp, false)) { + Thread t2 = new Thread() { + @Override + public void run() { + FileSystem fs2 = FileSystem.getLocalFileSystem(); + // ensure the safety net does not leak here + Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem); + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + fs2 = FileSystem.getLocalFileSystem(); + // ensure we can bring another safety net in place + Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem); + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + fs2 = FileSystem.getLocalFileSystem(); + // and that we can remove it again + Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem); + } + }; + t2.start(); + try { + t2.join(); + } catch (InterruptedException e) { + Assert.fail(); + } + + //ensure stream is still open and was never closed by any interferences + stream.write(42); + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + + // ensure leaking stream was closed + try { + stream.write(43); + Assert.fail(); + } catch (IOException ignore) { + + } + fs1 = FileSystem.getLocalFileSystem(); + // ensure safety net was removed + Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem); + } finally { + fs1.delete(tmp, false); + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + }; + t1.start(); + t1.join(); + } + + @Test public void testClose() throws Exception { + setup(); startThreads(Integer.MAX_VALUE); for (int i = 0; i < 5; ++i) { @@ -98,7 +163,7 @@ public class SafetyNetCloseableRegistryTest { @Test public void testSafetyNetClose() throws Exception { - + setup(); startThreads(20); joinThreads(); http://git-wip-us.apache.org/repos/asf/flink/blob/50b66567/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 6925d0f..c2e6d09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -550,8 +550,8 @@ public class Task implements Runnable, TaskActions { // check for canceling as a shortcut // ---------------------------- - // init closeable registry for this task - FileSystem.createFileSystemCloseableRegistryForTask(); + // activate safety net for task thread + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes @@ -775,7 +775,8 @@ public class Task implements Runnable, TaskActions { // remove all files in the distributed cache removeCachedFiles(distributedCacheEntries, fileCache); - FileSystem.disposeFileSystemCloseableRegistryForTask(); + // close and de-activate safety net for task thread + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); notifyFinalState(); } @@ -1115,6 +1116,8 @@ public class Task implements Runnable, TaskActions { Runnable runnable = new Runnable() { @Override public void run() { + // activate safety net for checkpointing thread + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData); if (!success) { @@ -1133,6 +1136,9 @@ public class Task implements Runnable, TaskActions { "{} ({}) while being not in state running.", checkpointID, taskNameWithSubtask, executionId, t); } + } finally { + // close and de-activate safety net for checkpointing thread + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); } } };