Repository: flink
Updated Branches:
  refs/heads/master 5fa389014 -> 62192c783


[hotfix] [metrics] Supply name to view/reporter thread


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57208e65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57208e65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57208e65

Branch: refs/heads/master
Commit: 57208e650f94edece537ccdeb1a2c19b4f7ccca8
Parents: 5fa3890
Author: zentol <[email protected]>
Authored: Mon Nov 14 15:53:47 2016 +0100
Committer: zentol <[email protected]>
Committed: Mon Nov 14 15:53:47 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/metrics/MetricRegistry.java   | 25 +++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57208e65/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index b514fd1..c333c26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -42,7 +42,9 @@ import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
@@ -73,7 +75,7 @@ public class MetricRegistry {
 
                List<Tuple2<String, Configuration>> reporterConfigurations = 
config.getReporterConfigurations();
 
-               this.executor = Executors.newSingleThreadScheduledExecutor();
+               this.executor = Executors.newSingleThreadScheduledExecutor(new 
MetricRegistryThreadFactory());
 
                if (reporterConfigurations.isEmpty()) {
                        // no reporters defined
@@ -313,4 +315,25 @@ public class MetricRegistry {
                        }
                }
        }
+
+       private static final class MetricRegistryThreadFactory implements 
ThreadFactory {
+               private final ThreadGroup group;
+               private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+               MetricRegistryThreadFactory() {
+                       SecurityManager s = System.getSecurityManager();
+                       group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+               }
+
+               public Thread newThread(Runnable r) {
+                       Thread t = new Thread(group, r, "Flink-MetricRegistry-" 
+ threadNumber.incrementAndGet(), 0);
+                       if (t.isDaemon()) {
+                               t.setDaemon(false);
+                       }
+                       if (t.getPriority() != Thread.NORM_PRIORITY) {
+                               t.setPriority(Thread.NORM_PRIORITY);
+                       }
+                       return t;
+               }
+       }
 }

Reply via email to