Updated Branches:
  refs/heads/master 15c4587b4 -> afc2fb7af

SAMZA-96; add names to daemon threads for JvmMetrics and MetricsSnapshotReporter


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/afc2fb7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/afc2fb7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/afc2fb7a

Branch: refs/heads/master
Commit: afc2fb7af974aa7780f8ea57a202fe7202780548
Parents: 15c4587
Author: Steve Yates <[email protected]>
Authored: Thu Feb 6 09:51:33 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Feb 6 09:51:33 2014 -0800

----------------------------------------------------------------------
 .../org/apache/samza/metrics/JvmMetrics.scala   | 10 ++++--
 .../reporter/MetricsSnapshotReporter.scala      |  9 ++++-
 .../apache/samza/util/DaemonThreadFactory.scala | 11 +++++-
 .../samza/util/TestDaemonThreadFactory.scala    | 36 ++++++++++++++++++++
 .../apache/samza/system/kafka/BrokerProxy.scala |  9 +++++
 5 files changed, 71 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
index 301a5a0..ed1e8af 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -26,10 +26,16 @@ import java.lang.Thread.State._
 import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit
 import grizzled.slf4j.Logging
-import org.apache.samza.util.Util
 import org.apache.samza.util.DaemonThreadFactory
 
 /**
+ *  Companion object for class JvmMetrics encapsulating various constants
+ */
+object JvmMetrics {
+  val JVM_METRICS_THREAD_NAME_PREFIX = "JVM-METRICS"
+}
+
+/**
  * Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
  */
 class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with 
Runnable with Logging {
@@ -39,7 +45,7 @@ class JvmMetrics(val registry: MetricsRegistry) extends 
MetricsHelper with Runna
   val gcBeans = ManagementFactory.getGarbageCollectorMXBeans()
   val threadMXBean = ManagementFactory.getThreadMXBean()
   var gcBeanCounters = Map[String, (Counter, Counter)]()
-  val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
+  val executor = Executors.newScheduledThreadPool(1, new 
DaemonThreadFactory(JvmMetrics.JVM_METRICS_THREAD_NAME_PREFIX))
 
   // jvm metrics
   val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 15af7aa..9a56754 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -37,6 +37,13 @@ import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 
 /**
+ *  Companion object for class MetricsSnapshotReporter encapsulating various 
constants
+ */
+object MetricsSnapshotReporter {
+  val METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX = "METRIC-SNAPSHOT-REPORTER"
+}
+
+/**
  * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to 
a stream.
  *
  * jobName // my-samza-job
@@ -57,7 +64,7 @@ class MetricsSnapshotReporter(
   serializer: Serializer[MetricsSnapshot] = null,
   clock: () => Long = () => { System.currentTimeMillis }) extends 
MetricsReporter with Runnable with Logging {
 
-  val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
+  val executor = Executors.newScheduledThreadPool(1, new 
DaemonThreadFactory(MetricsSnapshotReporter.METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX))
   val resetTime = clock()
   var registries = List[(String, ReadableMetricsRegistry)]()
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
index 04e67a2..d2015ab 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala
@@ -21,10 +21,19 @@ package org.apache.samza.util
 
 import java.util.concurrent.ThreadFactory
 
-class DaemonThreadFactory extends ThreadFactory {
+
+object ThreadNamePrefix {
+  val SAMZA_THREAD_NAME_PREFIX = "SAMZA-"
+}
+
+class DaemonThreadFactory(name: String) extends ThreadFactory {
+
   def newThread(r: Runnable) = {
     val thread = new Thread(r)
     thread.setDaemon(true)
+    if (name.nonEmpty) {
+      thread.setName(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+name)
+    }
     thread
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
new file mode 100644
index 0000000..6353378
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestDaemonThreadFactory {
+  @Test
+  def testDaemonThreadFactoryCanCreatThreadGivenName() {
+    val testThreadName = "JvmMetrics"
+    val dtf = new DaemonThreadFactory(testThreadName)
+    val threadWithName = dtf.newThread(new Runnable {
+      def run() {
+        //Not testing this particular method
+      }
+    })
+    assertEquals(threadWithName.getName, 
ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+testThreadName)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 89730db..5095e70 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -31,6 +31,14 @@ import java.nio.channels.ClosedByInterruptException
 import java.util.Map.Entry
 import scala.collection.mutable
 import kafka.consumer.ConsumerConfig
+import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
+
+/**
+ *  Companion object for class JvmMetrics encapsulating various constants
+ */
+object BrokerProxy {
+  val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY"
+}
 
 /**
  * A BrokerProxy consolidates Kafka fetches meant for a particular broker and 
retrieves them all at once, providing
@@ -238,6 +246,7 @@ abstract class BrokerProxy(
     info("Starting " + toString)
 
     thread.setDaemon(true)
+    
thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX)
     thread.start
   }
 

Reply via email to