Repository: flink
Updated Branches:
  refs/heads/master 0aa9db078 -> 50b665677


[FLINK-5663] [runtime] Prevent leaking SafetyNetCloseableRegistry through 
InheritableThreadLocal

This closes #3229


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50b66567
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50b66567
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50b66567

Branch: refs/heads/master
Commit: 50b665677831529ee492ceda40a3c8fd750d62ff
Parents: 0aa9db0
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Fri Jan 27 19:47:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 29 14:58:30 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/core/fs/FileSystem.java    | 21 +++---
 .../core/fs/SafetyNetCloseableRegistryTest.java | 71 +++++++++++++++++++-
 .../apache/flink/runtime/taskmanager/Task.java  | 12 +++-
 3 files changed, 88 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50b66567/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 33addbb..d8efcbc 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
-
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +78,7 @@ public abstract class FileSystem {
 
        // 
------------------------------------------------------------------------
 
-       private static final InheritableThreadLocal<SafetyNetCloseableRegistry> 
REGISTRIES = new InheritableThreadLocal<>();
+       private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES 
= new ThreadLocal<>();
 
        private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = 
"org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
 
@@ -99,14 +99,15 @@ public abstract class FileSystem {
         * main thread.
         */
        @Internal
-       public static void createFileSystemCloseableRegistryForTask() {
+       public static void createAndSetFileSystemCloseableRegistryForThread() {
                SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
-               if (null != oldRegistry) {
-                       IOUtils.closeQuietly(oldRegistry);
-                       LOG.warn("Found existing SafetyNetCloseableRegistry. 
Closed and replaced it.");
-               }
+               Preconditions.checkState(null == oldRegistry,
+                               "Found old CloseableRegistry " + oldRegistry +
+                                               ". This indicates a leak of the 
InheritableThreadLocal through a ThreadPool!");
+
                SafetyNetCloseableRegistry newRegistry = new 
SafetyNetCloseableRegistry();
                REGISTRIES.set(newRegistry);
+               LOG.info("Created new CloseableRegistry " + newRegistry + " for 
{}", Thread.currentThread().getName());
        }
 
        /**
@@ -114,7 +115,7 @@ public abstract class FileSystem {
         * main thread or when the task should be canceled.
         */
        @Internal
-       public static void disposeFileSystemCloseableRegistryForTask() {
+       public static void 
closeAndDisposeFileSystemCloseableRegistryForThread() {
                SafetyNetCloseableRegistry registry = REGISTRIES.get();
                if (null != registry) {
                        LOG.info("Ensuring all FileSystem streams are closed 
for {}", Thread.currentThread().getName());
@@ -123,7 +124,7 @@ public abstract class FileSystem {
                }
        }
 
-       private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) {
+       private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) 
{
                SafetyNetCloseableRegistry reg = REGISTRIES.get();
                return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : 
fs;
        }
@@ -306,7 +307,7 @@ public abstract class FileSystem {
         *         thrown if a reference to the file system instance could not 
be obtained
         */
        public static FileSystem get(URI uri) throws IOException {
-               return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri));
+               return 
wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/50b66567/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 6628407..40856b4 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.core.fs;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class SafetyNetCloseableRegistryTest {
@@ -32,7 +32,6 @@ public class SafetyNetCloseableRegistryTest {
        private SafetyNetCloseableRegistry closeableRegistry;
        private AtomicInteger unclosedCounter;
 
-       @Before
        public void setup() {
                this.closeableRegistry = new SafetyNetCloseableRegistry();
                this.unclosedCounter = new AtomicInteger(0);
@@ -56,8 +55,74 @@ public class SafetyNetCloseableRegistryTest {
        }
 
        @Test
+       public void testCorrectScopesForSafetyNet() throws Exception {
+               Thread t1 = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       FileSystem fs1 = 
FileSystem.getLocalFileSystem();
+                                       // ensure no safety net in place
+                                       Assert.assertFalse(fs1 instanceof 
SafetyNetWrapperFileSystem);
+                                       
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+                                       fs1 = FileSystem.getLocalFileSystem();
+                                       // ensure safety net is in place now
+                                       Assert.assertTrue(fs1 instanceof 
SafetyNetWrapperFileSystem);
+                                       Path tmp = new 
Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString());
+                                       try (FSDataOutputStream stream = 
fs1.create(tmp, false)) {
+                                               Thread t2 = new Thread() {
+                                                       @Override
+                                                       public void run() {
+                                                               FileSystem fs2 
= FileSystem.getLocalFileSystem();
+                                                               // ensure the 
safety net does not leak here
+                                                               
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
+                                                               
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+                                                               fs2 = 
FileSystem.getLocalFileSystem();
+                                                               // ensure we 
can bring another safety net in place
+                                                               
Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem);
+                                                               
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+                                                               fs2 = 
FileSystem.getLocalFileSystem();
+                                                               // and that we 
can remove it again
+                                                               
Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem);
+                                                       }
+                                               };
+                                               t2.start();
+                                               try {
+                                                       t2.join();
+                                               } catch (InterruptedException 
e) {
+                                                       Assert.fail();
+                                               }
+
+                                               //ensure stream is still open 
and was never closed by any interferences
+                                               stream.write(42);
+                                               
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
+
+                                               // ensure leaking stream was 
closed
+                                               try {
+                                                       stream.write(43);
+                                                       Assert.fail();
+                                               } catch (IOException ignore) {
+
+                                               }
+                                               fs1 = 
FileSystem.getLocalFileSystem();
+                                               // ensure safety net was removed
+                                               Assert.assertFalse(fs1 
instanceof SafetyNetWrapperFileSystem);
+                                       } finally {
+                                               fs1.delete(tmp, false);
+                                       }
+                               } catch (Exception e) {
+                                       e.printStackTrace();
+                                       Assert.fail();
+                               }
+                       }
+               };
+               t1.start();
+               t1.join();
+       }
+
+       @Test
        public void testClose() throws Exception {
 
+               setup();
                startThreads(Integer.MAX_VALUE);
 
                for (int i = 0; i < 5; ++i) {
@@ -98,7 +163,7 @@ public class SafetyNetCloseableRegistryTest {
 
        @Test
        public void testSafetyNetClose() throws Exception {
-
+               setup();
                startThreads(20);
 
                joinThreads();

http://git-wip-us.apache.org/repos/asf/flink/blob/50b66567/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 6925d0f..c2e6d09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -550,8 +550,8 @@ public class Task implements Runnable, TaskActions {
                        //  check for canceling as a shortcut
                        // ----------------------------
 
-                       // init closeable registry for this task
-                       FileSystem.createFileSystemCloseableRegistryForTask();
+                       // activate safety net for task thread
+                       
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
 
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
@@ -775,7 +775,8 @@ public class Task implements Runnable, TaskActions {
 
                                // remove all files in the distributed cache
                                removeCachedFiles(distributedCacheEntries, 
fileCache);
-                               
FileSystem.disposeFileSystemCloseableRegistryForTask();
+                               // close and de-activate safety net for task 
thread
+                               
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
 
                                notifyFinalState();
                        }
@@ -1115,6 +1116,8 @@ public class Task implements Runnable, TaskActions {
                                Runnable runnable = new Runnable() {
                                        @Override
                                        public void run() {
+                                               // activate safety net for 
checkpointing thread
+                                               
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
                                                try {
                                                        boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData);
                                                        if (!success) {
@@ -1133,6 +1136,9 @@ public class Task implements Runnable, TaskActions {
                                                                        "{} 
({}) while being not in state running.", checkpointID,
                                                                        
taskNameWithSubtask, executionId, t);
                                                        }
+                                               } finally {
+                                                       // close and 
de-activate safety net for checkpointing thread
+                                                       
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
                                                }
                                        }
                                };

Reply via email to