Repository: samza Updated Branches: refs/heads/master 4c55a837f -> 2b7c0bdd5
SAMZA-991: Continue to report old AppMaster metrics to maintain backward compatibility Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2b7c0bdd Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2b7c0bdd Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2b7c0bdd Branch: refs/heads/master Commit: 2b7c0bdd5b4c8fa94e21d77a79acd403aa53cb83 Parents: 4c55a83 Author: Jagadish Venkatraman <[email protected]> Authored: Tue Aug 2 16:58:27 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Aug 2 16:58:27 2016 -0700 ---------------------------------------------------------------------- .../job/yarn/YarnClusterResourceManager.java | 5 ++ .../samza/job/yarn/SamzaAppMasterMetrics.scala | 83 ++++++++++++++++++++ 2 files changed, 88 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2b7c0bdd/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 1fac7f4..96d3d7c 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -101,6 +101,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>(); private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>(); + private final SamzaAppMasterMetrics metrics; + final AtomicBoolean started = new AtomicBoolean(false); private final Object lock = new Object(); @@ -119,6 +121,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement hConfig.set("fs.http.impl", HttpFileSystem.class.getName()); MetricsRegistryMap registry = new MetricsRegistryMap(); + metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry); // parse configs from the Yarn environment String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString()); @@ -157,6 +160,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement log.info("Attempting to start an already started ContainerManager"); return; } + metrics.start(); service.onInit(); log.info("Starting YarnContainerManager."); amClient.init(hConfig); @@ -317,6 +321,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement amClient.stop(); log.info("Stopping the AM service " ); service.onShutdown(); + metrics.stop(); if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) { cleanupStagingDir(); http://git-wip-us.apache.org/repos/asf/samza/blob/2b7c0bdd/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala new file mode 100644 index 0000000..8a5b4aa --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import org.apache.samza.clustermanager.SamzaApplicationState +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.metrics.JvmMetrics +import org.apache.samza.config.Config +import org.apache.samza.task.TaskContext +import org.apache.samza.Partition +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.config.StreamConfig.Config2Stream +import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.metrics.MetricsReporterFactory +import org.apache.samza.util.Util +import org.apache.samza.metrics.ReadableMetricsRegistry +import org.apache.samza.util.Logging +import org.apache.samza.SamzaException +import java.util.Timer +import java.util.TimerTask +import org.apache.samza.metrics.MetricsHelper + +object SamzaAppMasterMetrics { + val sourceName = "ApplicationMaster" +} + +/** + * Responsible for wiring up Samza's metrics. Given that Samza has a metric + * registry, we might as well use it. This class takes Samza's application + * master state, and converts it to metrics. + */ +class SamzaAppMasterMetrics( + val config: Config, + val state: SamzaApplicationState, + val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging { + + val reporters = config.getMetricReporterNames.map(reporterName => { + val metricsFactoryClassName = config + .getMetricsFactoryClass(reporterName) + .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName)) + + val reporter = + Util + .getObj[MetricsReporterFactory](metricsFactoryClassName) + .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, config) + + reporter.register(SamzaAppMasterMetrics.sourceName, registry) + (reporterName, reporter) + }).toMap + + def start() { + val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size) + val mNeededContainers = newGauge("needed-containers", () => state.neededContainers.get()) + val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers.get()) + val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get()) + val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get()) + val mContainers = newGauge("container-count", () => state.containerCount) + val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0) + + reporters.values.foreach(_.start) + } + + def stop() { + reporters.values.foreach(_.stop) + } +}
