Repository: flink Updated Branches: refs/heads/release-1.2 db3c5f388 -> b5ec14641
[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b29f5a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b29f5a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b29f5a Branch: refs/heads/release-1.2 Commit: 74b29f5a3dc4f1413bbf8addb6b4234a5bfe6581 Parents: db3c5f3 Author: Stephan Ewen <[email protected]> Authored: Thu Feb 23 16:21:46 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Feb 23 16:26:01 2017 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/flink/core/fs/FileSystem.java | 6 ------ .../main/java/org/apache/flink/runtime/taskmanager/Task.java | 8 ++++++++ 2 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/74b29f5a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d8efcbc..991c718 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -33,8 +33,6 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.IOUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.File; @@ -86,8 +84,6 @@ public abstract class FileSystem { private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper"; - private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); - /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */ private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true); @@ -107,7 +103,6 @@ public abstract class FileSystem { SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); REGISTRIES.set(newRegistry); - LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName()); } /** @@ -118,7 +113,6 @@ public abstract class FileSystem { public static void closeAndDisposeFileSystemCloseableRegistryForThread() { SafetyNetCloseableRegistry registry = REGISTRIES.get(); if (null != registry) { - LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); REGISTRIES.remove(); IOUtils.closeQuietly(registry); } http://git-wip-us.apache.org/repos/asf/flink/blob/74b29f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index ff81827..d242d7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -539,6 +539,7 @@ public class Task implements Runnable, TaskActions { // ---------------------------- // activate safety net for task thread + LOG.info("Creating FileSystem stream leak safety net for task {}", this); FileSystem.createAndSetFileSystemCloseableRegistryForThread(); // first of all, get a user-code classloader @@ -763,7 +764,9 @@ public class Task implements Runnable, TaskActions { // remove all files in the distributed cache removeCachedFiles(distributedCacheEntries, fileCache); + // close and de-activate safety net for task thread + LOG.info("Ensuring all FileSystem streams are closed for task {}", this); FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); notifyFinalState(); @@ -1106,7 +1109,9 @@ public class Task implements Runnable, TaskActions { @Override public void run() { // activate safety net for checkpointing thread + LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData); if (!success) { @@ -1127,6 +1132,9 @@ public class Task implements Runnable, TaskActions { } } finally { // close and de-activate safety net for checkpointing thread + LOG.debug("Ensuring all FileSystem streams are closed for {}", + Thread.currentThread().getName()); + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); } }
