Repository: flink
Updated Branches:
  refs/heads/release-1.2 db3c5f388 -> b5ec14641


[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b29f5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b29f5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b29f5a

Branch: refs/heads/release-1.2
Commit: 74b29f5a3dc4f1413bbf8addb6b4234a5bfe6581
Parents: db3c5f3
Author: Stephan Ewen <[email protected]>
Authored: Thu Feb 23 16:21:46 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Feb 23 16:26:01 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/core/fs/FileSystem.java   | 6 ------
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java | 8 ++++++++
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74b29f5a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d8efcbc..991c718 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -33,8 +33,6 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -86,8 +84,6 @@ public abstract class FileSystem {
 
        private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(FileSystem.class);
-
        /** This lock guards the methods {@link #initOutPathLocalFS(Path, 
WriteMode, boolean)} and
         * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are 
otherwise susceptible to races */
        private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new 
ReentrantLock(true);
@@ -107,7 +103,6 @@ public abstract class FileSystem {
 
                SafetyNetCloseableRegistry newRegistry = new 
SafetyNetCloseableRegistry();
                REGISTRIES.set(newRegistry);
-               LOG.info("Created new CloseableRegistry " + newRegistry + " for 
{}", Thread.currentThread().getName());
        }
 
        /**
@@ -118,7 +113,6 @@ public abstract class FileSystem {
        public static void 
closeAndDisposeFileSystemCloseableRegistryForThread() {
                SafetyNetCloseableRegistry registry = REGISTRIES.get();
                if (null != registry) {
-                       LOG.info("Ensuring all FileSystem streams are closed 
for {}", Thread.currentThread().getName());
                        REGISTRIES.remove();
                        IOUtils.closeQuietly(registry);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/74b29f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ff81827..d242d7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -539,6 +539,7 @@ public class Task implements Runnable, TaskActions {
                        // ----------------------------
 
                        // activate safety net for task thread
+                       LOG.info("Creating FileSystem stream leak safety net 
for task {}", this);
                        
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
 
                        // first of all, get a user-code classloader
@@ -763,7 +764,9 @@ public class Task implements Runnable, TaskActions {
 
                                // remove all files in the distributed cache
                                removeCachedFiles(distributedCacheEntries, 
fileCache);
+
                                // close and de-activate safety net for task 
thread
+                               LOG.info("Ensuring all FileSystem streams are 
closed for task {}", this);
                                
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
 
                                notifyFinalState();
@@ -1106,7 +1109,9 @@ public class Task implements Runnable, TaskActions {
                                        @Override
                                        public void run() {
                                                // activate safety net for 
checkpointing thread
+                                               LOG.debug("Creating FileSystem 
stream leak safety net for {}", Thread.currentThread().getName());
                                                
FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+
                                                try {
                                                        boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData);
                                                        if (!success) {
@@ -1127,6 +1132,9 @@ public class Task implements Runnable, TaskActions {
                                                        }
                                                } finally {
                                                        // close and 
de-activate safety net for checkpointing thread
+                                                       LOG.debug("Ensuring all 
FileSystem streams are closed for {}",
+                                                                       
Thread.currentThread().getName());
+
                                                        
FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
                                                }
                                        }

Reply via email to