SAMZA-817 - Add metrics to gauge effectiveness of host-affinity solution
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/94f1e7ec Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/94f1e7ec Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/94f1e7ec Branch: refs/heads/samza-sql Commit: 94f1e7ecdcd6d0d15aac9e0a8a050ee183d89b68 Parents: 6bc141b Author: Navina <[email protected]> Authored: Thu Nov 19 00:23:31 2015 -0800 Committer: Navina <[email protected]> Committed: Thu Nov 19 00:23:31 2015 -0800 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 13 +++- .../samza/container/SamzaContainerMetrics.scala | 12 +++- .../samza/container/TestSamzaContainer.scala | 64 +++++++++++++++++++- .../job/yarn/AbstractContainerAllocator.java | 1 + .../apache/samza/job/yarn/ContainerUtil.java | 9 +++ .../job/yarn/HostAwareContainerAllocator.java | 2 +- .../apache/samza/job/yarn/SamzaAppState.java | 4 ++ .../samza/job/yarn/SamzaAppMasterMetrics.scala | 9 +++ 8 files changed, 105 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 3787b85..db6074b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -404,6 +404,7 @@ object SamzaContainer extends Logging { val systemConsumer = systemFactories .getOrElse(changeLogSystemStream.getSystem, throw new SamzaException("Changelog system %s for store %s does not exist in the config." format (changeLogSystemStream, storeName))) .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry) + samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName) (storeName, systemConsumer) }.toMap @@ -622,8 +623,16 @@ class SamzaContainer( def startStores { info("Starting task instance stores.") - - taskInstances.values.foreach(_.startStores) + taskInstances.values.foreach(taskInstance => { + val startTime = System.currentTimeMillis() + taskInstance.startStores + // Measuring the time to restore the stores + val timeToRestore = System.currentTimeMillis() - startTime + val taskGauge = metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null) + if (taskGauge != null) { + taskGauge.set(timeToRestore) + } + }) } def startTask { http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index 127f3a1..6fae650 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -19,9 +19,9 @@ package org.apache.samza.container -import org.apache.samza.metrics.ReadableMetricsRegistry -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.MetricsHelper +import java.util + +import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, MetricsRegistryMap, MetricsHelper} class SamzaContainerMetrics( val source: String = "unknown", @@ -38,4 +38,10 @@ class SamzaContainerMetrics( val processNs = newTimer("process-ns") val commitNs = newTimer("commit-ns") val utilization = newGauge("event-loop-utilization", 0.0F); + + val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() + + def addStoreRestorationGauge(taskName: TaskName, storeName: String) { + taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index d91b1da..365ff0a 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -20,6 +20,11 @@ package org.apache.samza.container import java.util +import org.apache.samza.storage.TaskStorageManager +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.mock.MockitoSugar + import scala.collection.JavaConversions._ import org.apache.samza.Partition import org.apache.samza.config.Config @@ -30,7 +35,6 @@ import org.apache.samza.coordinator.server.JobServlet import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel -import org.apache.samza.serializers.SerdeManager import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.StreamMetadataCache import org.apache.samza.system.SystemConsumer @@ -53,10 +57,10 @@ import org.junit.Test import org.scalatest.junit.AssertionsForJUnit import java.lang.Thread.UncaughtExceptionHandler import org.apache.samza.serializers._ -import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} +import org.mockito.Mockito._ -class TestSamzaContainer extends AssertionsForJUnit { +class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @Test def testReadJobModel { val config = new MapConfig(Map("a" -> "b")) @@ -202,6 +206,60 @@ class TestSamzaContainer extends AssertionsForJUnit { t.join assertTrue(caughtException) } + + @Test + def testStartStoresIncrementsCounter { + val task = new StreamTask { + def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { + } + } + val config = new MapConfig + val taskName = new TaskName("taskName") + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) + val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName)) + val mockTaskStorageManager = mock[TaskStorageManager] + + when(mockTaskStorageManager.init).thenAnswer(new Answer[String] { + override def answer(invocation: InvocationOnMock): String = { + Thread.sleep(1) + "" + } + }) + + val taskInstance: TaskInstance = new TaskInstance( + task, + taskName, + config, + new TaskInstanceMetrics, + null, + consumerMultiplexer, + collector, + containerContext, + storageManager = mockTaskStorageManager + ) + val containerMetrics = new SamzaContainerMetrics() + containerMetrics.addStoreRestorationGauge(taskName, "store") + val container = new SamzaContainer( + containerContext = containerContext, + taskInstances = Map(taskName -> taskInstance), + runLoop = null, + consumerMultiplexer = consumerMultiplexer, + producerMultiplexer = producerMultiplexer, + metrics = containerMetrics, + jmxServer = null + ) + container.startStores + assertNotNull(containerMetrics.taskStoreRestorationMetrics) + assertNotNull(containerMetrics.taskStoreRestorationMetrics.get(taskName)) + assertTrue(containerMetrics.taskStoreRestorationMetrics.get(taskName).getValue >= 1) + + } } class MockCheckpointManager extends CheckpointManager { http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java index 6edd477..9ee2dac 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java @@ -100,6 +100,7 @@ public abstract class AbstractContainerAllocator implements Runnable { expectedContainerId, preferredHost); containerRequestState.updateRequestState(request); + containerUtil.incrementContainerRequests(); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java index 55dbfea..1fb6a5f 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java @@ -77,6 +77,15 @@ public class ContainerUtil { this.nmClient = nmClient; } + public void incrementContainerRequests() { + state.containerRequests.incrementAndGet(); + } + + public void runMatchedContainer(int samzaContainerId, Container container) { + state.matchedContainerRequests.incrementAndGet(); + runContainer(samzaContainerId, container); + } + public void runContainer(int samzaContainerId, Container container) { String containerIdStr = ConverterUtils.toString(container.getId()); log.info("Got available container ID ({}) for container: {}", samzaContainerId, container); http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java index ad1587d..ff22dbf 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java @@ -76,7 +76,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator { containerRequestState.updateStateAfterAssignment(request, preferredHost, container); log.info("Running {} on {}", expectedContainerId, container.getId()); - containerUtil.runContainer(expectedContainerId, container); + containerUtil.runMatchedContainer(expectedContainerId, container); } else { // No allocated container on preferredHost log.info( http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java index 3df927e..bc5b606 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java @@ -158,6 +158,10 @@ public class SamzaAppState { */ public AtomicBoolean jobHealthy = new AtomicBoolean(true); + public AtomicInteger containerRequests = new AtomicInteger(0); + + public AtomicInteger matchedContainerRequests = new AtomicInteger(0); + public SamzaAppState(JobCoordinator jobCoordinator, int taskId, ContainerId amContainerId, http://git-wip-us.apache.org/repos/asf/samza/blob/94f1e7ec/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala index ae5a674..054d8b6 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -76,6 +76,15 @@ class SamzaAppMasterMetrics( val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort) val mAppAttemptId = newGauge("app-attempt-id", () => state.appAttemptId.toString) val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0) + val mLocalityMatchedRequests = newGauge( + "locality-matched", + () => { + if (state.containerRequests.get() != 0) { + state.matchedContainerRequests.get() / state.containerRequests.get() + } else { + 0L + } + }) jvm.start reporters.values.foreach(_.start)
