SAMZA-817 - Add metrics to gauge effectiveness of host-affinity solution

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/94f1e7ec
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/94f1e7ec
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/94f1e7ec

Branch: refs/heads/samza-sql
Commit: 94f1e7ecdcd6d0d15aac9e0a8a050ee183d89b68
Parents: 6bc141b
Author: Navina <[email protected]>
Authored: Thu Nov 19 00:23:31 2015 -0800
Committer: Navina <[email protected]>
Committed: Thu Nov 19 00:23:31 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 13 +++-
 .../samza/container/SamzaContainerMetrics.scala | 12 +++-
 .../samza/container/TestSamzaContainer.scala    | 64 +++++++++++++++++++-
 .../job/yarn/AbstractContainerAllocator.java    |  1 +
 .../apache/samza/job/yarn/ContainerUtil.java    |  9 +++
 .../job/yarn/HostAwareContainerAllocator.java   |  2 +-
 .../apache/samza/job/yarn/SamzaAppState.java    |  4 ++
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |  9 +++
 8 files changed, 105 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3787b85..db6074b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -404,6 +404,7 @@ object SamzaContainer extends Logging {
             val systemConsumer = systemFactories
               .getOrElse(changeLogSystemStream.getSystem, throw new 
SamzaException("Changelog system %s for store %s does not exist in the config." 
format (changeLogSystemStream, storeName)))
               .getConsumer(changeLogSystemStream.getSystem, config, 
taskInstanceMetrics.registry)
+            samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName)
             (storeName, systemConsumer)
         }.toMap
 
@@ -622,8 +623,16 @@ class SamzaContainer(
 
   def startStores {
     info("Starting task instance stores.")
-
-    taskInstances.values.foreach(_.startStores)
+    taskInstances.values.foreach(taskInstance => {
+      val startTime = System.currentTimeMillis()
+      taskInstance.startStores
+      // Measuring the time to restore the stores
+      val timeToRestore = System.currentTimeMillis() - startTime
+      val taskGauge = 
metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null)
+      if (taskGauge != null) {
+        taskGauge.set(timeToRestore)
+      }
+    })
   }
 
   def startTask {

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 127f3a1..6fae650 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -19,9 +19,9 @@
 
 package org.apache.samza.container
 
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsHelper
+import java.util
+
+import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, 
MetricsRegistryMap, MetricsHelper}
 
 class SamzaContainerMetrics(
   val source: String = "unknown",
@@ -38,4 +38,10 @@ class SamzaContainerMetrics(
   val processNs = newTimer("process-ns")
   val commitNs = newTimer("commit-ns")
   val utilization = newGauge("event-loop-utilization", 0.0F);
+
+  val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new 
util.HashMap[TaskName, Gauge[Long]]()
+
+  def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
+    taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" 
format(taskName.toString, storeName), -1L))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index d91b1da..365ff0a 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -20,6 +20,11 @@
 package org.apache.samza.container
 
 import java.util
+import org.apache.samza.storage.TaskStorageManager
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.mock.MockitoSugar
+
 import scala.collection.JavaConversions._
 import org.apache.samza.Partition
 import org.apache.samza.config.Config
@@ -30,7 +35,6 @@ import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
-import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemConsumer
@@ -53,10 +57,10 @@ import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
 import java.lang.Thread.UncaughtExceptionHandler
 import org.apache.samza.serializers._
-import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.mockito.Mockito._
 
-class TestSamzaContainer extends AssertionsForJUnit {
+class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Test
   def testReadJobModel {
     val config = new MapConfig(Map("a" -> "b"))
@@ -202,6 +206,60 @@ class TestSamzaContainer extends AssertionsForJUnit {
     t.join
     assertTrue(caughtException)
   }
+
+  @Test
+  def testStartStoresIncrementsCounter {
+    val task = new StreamTask {
+      def process(envelope: IncomingMessageEnvelope, collector: 
MessageCollector, coordinator: TaskCoordinator) {
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, 
Set[TaskName](taskName))
+    val mockTaskStorageManager = mock[TaskStorageManager]
+
+    when(mockTaskStorageManager.init).thenAnswer(new Answer[String] {
+      override def answer(invocation: InvocationOnMock): String = {
+        Thread.sleep(1)
+        ""
+      }
+    })
+
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext,
+      storageManager = mockTaskStorageManager
+    )
+    val containerMetrics = new SamzaContainerMetrics()
+    containerMetrics.addStoreRestorationGauge(taskName, "store")
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = null,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = containerMetrics,
+      jmxServer = null
+    )
+    container.startStores
+    assertNotNull(containerMetrics.taskStoreRestorationMetrics)
+    assertNotNull(containerMetrics.taskStoreRestorationMetrics.get(taskName))
+    
assertTrue(containerMetrics.taskStoreRestorationMetrics.get(taskName).getValue 
>= 1)
+
+  }
 }
 
 class MockCheckpointManager extends CheckpointManager {

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
index 6edd477..9ee2dac 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
@@ -100,6 +100,7 @@ public abstract class AbstractContainerAllocator implements 
Runnable {
         expectedContainerId,
         preferredHost);
     containerRequestState.updateRequestState(request);
+    containerUtil.incrementContainerRequests();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
index 55dbfea..1fb6a5f 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java
@@ -77,6 +77,15 @@ public class ContainerUtil {
     this.nmClient = nmClient;
   }
 
+  public void incrementContainerRequests() {
+    state.containerRequests.incrementAndGet();
+  }
+
+  public void runMatchedContainer(int samzaContainerId, Container container) {
+    state.matchedContainerRequests.incrementAndGet();
+    runContainer(samzaContainerId, container);
+  }
+
   public void runContainer(int samzaContainerId, Container container) {
     String containerIdStr = ConverterUtils.toString(container.getId());
     log.info("Got available container ID ({}) for container: {}", 
samzaContainerId, container);

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
index ad1587d..ff22dbf 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java
@@ -76,7 +76,7 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
             containerRequestState.updateStateAfterAssignment(request, 
preferredHost, container);
 
             log.info("Running {} on {}", expectedContainerId, 
container.getId());
-            containerUtil.runContainer(expectedContainerId, container);
+            containerUtil.runMatchedContainer(expectedContainerId, container);
           } else {
             // No allocated container on preferredHost
             log.info(

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
index 3df927e..bc5b606 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java
@@ -158,6 +158,10 @@ public class SamzaAppState {
    */
   public AtomicBoolean jobHealthy = new AtomicBoolean(true);
 
+  public AtomicInteger containerRequests = new AtomicInteger(0);
+
+  public AtomicInteger matchedContainerRequests = new AtomicInteger(0);
+
   public SamzaAppState(JobCoordinator jobCoordinator,
                        int taskId,
                        ContainerId amContainerId,

http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index ae5a674..054d8b6 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -76,6 +76,15 @@ class SamzaAppMasterMetrics(
     val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort)
     val mAppAttemptId = newGauge("app-attempt-id", () => 
state.appAttemptId.toString)
     val mJobHealthy = newGauge("job-healthy", () => if 
(state.jobHealthy.get()) 1 else 0)
+    val mLocalityMatchedRequests = newGauge(
+      "locality-matched",
+      () => {
+        if (state.containerRequests.get() != 0) {
+          state.matchedContainerRequests.get() / state.containerRequests.get()
+        } else {
+          0L
+        }
+      })
 
     jvm.start
     reporters.values.foreach(_.start)

Reply via email to