[FLINK-7813] [metrics] Replace MetricRegistryThreadFactory This closes #4803.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e14f2dbd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e14f2dbd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e14f2dbd Branch: refs/heads/master Commit: e14f2dbddbf1b4c4dc29cf20e47b6b04c5876109 Parents: 7d026aa Author: zentol <[email protected]> Authored: Wed Oct 11 15:13:00 2017 +0200 Committer: zentol <[email protected]> Committed: Wed Oct 18 12:51:00 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/metrics/MetricRegistry.java | 26 ++------------------ 1 file changed, 2 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e14f2dbd/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index ec4cf1c..2e07370 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.Preconditions; import akka.actor.ActorRef; @@ -47,9 +48,7 @@ import java.util.List; import java.util.TimerTask; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import scala.concurrent.Await; import scala.concurrent.Future; @@ -86,7 +85,7 @@ public class MetricRegistry { List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations(); - this.executor = Executors.newSingleThreadScheduledExecutor(new MetricRegistryThreadFactory()); + this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry")); if (reporterConfigurations.isEmpty()) { // no reporters defined @@ -402,25 +401,4 @@ public class MetricRegistry { } } } - - private static final class MetricRegistryThreadFactory implements ThreadFactory { - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - - MetricRegistryThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - } - - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, "Flink-MetricRegistry-" + threadNumber.getAndIncrement(), 0); - if (t.isDaemon()) { - t.setDaemon(false); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - } }
