[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6e6e7ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6e6e7ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6e6e7ec Branch: refs/heads/master Commit: f6e6e7ecf4d287f76698302417a9ff2ffc869477 Parents: 15ae922 Author: Stephan Ewen <[email protected]> Authored: Thu Feb 23 16:21:46 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 24 12:15:18 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/core/fs/FileSystemSafetyNet.java | 7 ------- .../main/java/org/apache/flink/runtime/taskmanager/Task.java | 7 +++++++ 2 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java index b18cb13..eb28504 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java @@ -21,9 +21,6 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.Internal; import org.apache.flink.util.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.URI; import static org.apache.flink.util.Preconditions.checkState; @@ -65,8 +62,6 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public class FileSystemSafetyNet { - private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class); - /** The map from thread to the safety net registry for that thread */ private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>(); @@ -93,7 +88,6 @@ public class FileSystemSafetyNet { SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); REGISTRIES.set(newRegistry); - LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName()); } /** @@ -107,7 +101,6 @@ public class FileSystemSafetyNet { public static void closeSafetyNetAndGuardedResourcesForThread() { SafetyNetCloseableRegistry registry = REGISTRIES.get(); if (null != registry) { - LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); REGISTRIES.remove(); IOUtils.closeQuietly(registry); } http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index c9f17b8..8732c60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -553,6 +553,7 @@ public class Task implements Runnable, TaskActions { // ---------------------------- // activate safety net for task thread + LOG.info("Creating FileSystem stream leak safety net for task {}", this); FileSystemSafetyNet.initializeSafetyNetForThread(); // first of all, get a user-code classloader @@ -792,6 +793,7 @@ public class Task implements Runnable, TaskActions { removeCachedFiles(distributedCacheEntries, fileCache); // close and de-activate safety net for task thread + LOG.info("Ensuring all FileSystem streams are closed for task {}", this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); notifyFinalState(); @@ -1138,7 +1140,9 @@ public class Task implements Runnable, TaskActions { @Override public void run() { // activate safety net for checkpointing thread + LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystemSafetyNet.initializeSafetyNetForThread(); + try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { @@ -1159,6 +1163,9 @@ public class Task implements Runnable, TaskActions { } } finally { // close and de-activate safety net for checkpointing thread + LOG.debug("Ensuring all FileSystem streams are closed for {}", + Thread.currentThread().getName()); + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } }
