Repository: samza
Updated Branches:
  refs/heads/master 4c55a837f -> 2b7c0bdd5


SAMZA-991: Continue to report old AppMaster metrics to maintain backward 
compatibility


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2b7c0bdd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2b7c0bdd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2b7c0bdd

Branch: refs/heads/master
Commit: 2b7c0bdd5b4c8fa94e21d77a79acd403aa53cb83
Parents: 4c55a83
Author: Jagadish Venkatraman <[email protected]>
Authored: Tue Aug 2 16:58:27 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Tue Aug 2 16:58:27 2016 -0700

----------------------------------------------------------------------
 .../job/yarn/YarnClusterResourceManager.java    |  5 ++
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  | 83 ++++++++++++++++++++
 2 files changed, 88 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2b7c0bdd/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 1fac7f4..96d3d7c 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -101,6 +101,8 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
   private final ConcurrentHashMap<SamzaResource, Container> allocatedResources 
= new ConcurrentHashMap<>();
   private final ConcurrentHashMap<SamzaResourceRequest, 
AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>();
 
+  private final SamzaAppMasterMetrics metrics;
+
   final AtomicBoolean started = new AtomicBoolean(false);
   private final Object lock = new Object();
 
@@ -119,6 +121,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
 
     MetricsRegistryMap registry = new MetricsRegistryMap();
+    metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry);
 
     // parse configs from the Yarn environment
     String containerIdStr = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
@@ -157,6 +160,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
       log.info("Attempting to start an already started ContainerManager");
       return;
     }
+    metrics.start();
     service.onInit();
     log.info("Starting YarnContainerManager.");
     amClient.init(hConfig);
@@ -317,6 +321,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     amClient.stop();
     log.info("Stopping the AM service " );
     service.onShutdown();
+    metrics.stop();
 
     if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
       cleanupStagingDir();

http://git-wip-us.apache.org/repos/asf/samza/blob/2b7c0bdd/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
new file mode 100644
index 0000000..8a5b4aa
--- /dev/null
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.config.Config
+import org.apache.samza.task.TaskContext
+import org.apache.samza.Partition
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.util.Util
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.util.Logging
+import org.apache.samza.SamzaException
+import java.util.Timer
+import java.util.TimerTask
+import org.apache.samza.metrics.MetricsHelper
+
+object SamzaAppMasterMetrics {
+  val sourceName = "ApplicationMaster"
+}
+
+/**
+ * Responsible for wiring up Samza's metrics. Given that Samza has a metric
+ * registry, we might as well use it. This class takes Samza's application
+ * master state, and converts it to metrics.
+ */
+class SamzaAppMasterMetrics(
+                             val config: Config,
+                             val state: SamzaApplicationState,
+                             val registry: ReadableMetricsRegistry) extends 
MetricsHelper with Logging {
+
+  val reporters = config.getMetricReporterNames.map(reporterName => {
+    val metricsFactoryClassName = config
+      .getMetricsFactoryClass(reporterName)
+      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class 
config" format reporterName))
+
+    val reporter =
+      Util
+        .getObj[MetricsReporterFactory](metricsFactoryClassName)
+        .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, 
config)
+
+    reporter.register(SamzaAppMasterMetrics.sourceName, registry)
+    (reporterName, reporter)
+  }).toMap
+
+  def start() {
+    val mRunningContainers = newGauge("running-containers", () => 
state.runningContainers.size)
+    val mNeededContainers = newGauge("needed-containers", () => 
state.neededContainers.get())
+    val mCompletedContainers = newGauge("completed-containers", () => 
state.completedContainers.get())
+    val mFailedContainers = newGauge("failed-containers", () => 
state.failedContainers.get())
+    val mReleasedContainers = newGauge("released-containers", () => 
state.releasedContainers.get())
+    val mContainers = newGauge("container-count", () => state.containerCount)
+    val mJobHealthy = newGauge("job-healthy", () => if 
(state.jobHealthy.get()) 1 else 0)
+
+    reporters.values.foreach(_.start)
+  }
+
+  def stop() {
+    reporters.values.foreach(_.stop)
+  }
+}

Reply via email to