Repository: flink Updated Branches: refs/heads/master 5fa389014 -> 62192c783
[hotfix] [metrics] Supply name to view/reporter thread Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57208e65 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57208e65 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57208e65 Branch: refs/heads/master Commit: 57208e650f94edece537ccdeb1a2c19b4f7ccca8 Parents: 5fa3890 Author: zentol <[email protected]> Authored: Mon Nov 14 15:53:47 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Nov 14 15:53:47 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/metrics/MetricRegistry.java | 25 +++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/57208e65/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index b514fd1..c333c26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -42,7 +42,9 @@ import java.util.List; import java.util.TimerTask; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the @@ -73,7 +75,7 @@ public class MetricRegistry { List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations(); - this.executor = Executors.newSingleThreadScheduledExecutor(); + this.executor = Executors.newSingleThreadScheduledExecutor(new MetricRegistryThreadFactory()); if (reporterConfigurations.isEmpty()) { // no reporters defined @@ -313,4 +315,25 @@ public class MetricRegistry { } } } + + private static final class MetricRegistryThreadFactory implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + MetricRegistryThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, "Flink-MetricRegistry-" + threadNumber.incrementAndGet(), 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } }
