Repository: samza Updated Branches: refs/heads/master c98d7b0d8 -> 3235929bc
SAMZA-1733: Adding containerID to metric header Adding containerID to MetricsHeader (published by MetricsSnapshotReporter). It is populated using the value set for the env variable ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID Author: [email protected] <[email protected]> Reviewers: Yi Pan<[email protected]> Closes #572 from rmatharu/containerid Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3235929b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3235929b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3235929b Branch: refs/heads/master Commit: 3235929bc55f9bc0f7cc9a8b7ad2994200cb7f35 Parents: c98d7b0 Author: [email protected] <[email protected]> Authored: Wed Jul 25 15:03:02 2018 -0700 Committer: Jagadish <[email protected]> Committed: Wed Jul 25 15:03:02 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/metrics/reporter/MetricsHeader.scala | 3 +++ .../samza/metrics/reporter/MetricsSnapshotReporter.scala | 6 +++++- .../apache/samza/serializers/TestMetricsSnapshotSerde.scala | 3 ++- 3 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala index 8359b17..2fef04e 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala @@ -29,6 +29,7 @@ object MetricsHeader { map.get("job-name").toString, map.get("job-id").toString, map.get("container-name").toString, + map.get("exec-env-container-id").toString, map.get("source").toString, map.get("version").toString, map.get("samza-version").toString, @@ -45,6 +46,7 @@ class MetricsHeader( @BeanProperty val jobName: String, @BeanProperty val jobId: String, @BeanProperty val containerName: String, + @BeanProperty val execEnvironmentContainerId: String, @BeanProperty val source: String, @BeanProperty val version: String, @BeanProperty val samzaVersion: String, @@ -57,6 +59,7 @@ class MetricsHeader( map.put("job-name", jobName) map.put("job-id", jobId) map.put("container-name", containerName) + map.put("exec-env-container-id", execEnvironmentContainerId) map.put("source", source) map.put("version", version) map.put("samza-version", samzaVersion) http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index d300e90..eca22ff 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -31,6 +31,8 @@ import java.util.Map import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import org.apache.samza.config.ShellCommandConfig + import scala.collection.JavaConverters._ /** @@ -55,6 +57,8 @@ class MetricsSnapshotReporter( serializer: Serializer[MetricsSnapshot] = null, clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging { + val execEnvironmentContainerId = Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("") + val executor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build()) val resetTime = clock() @@ -125,7 +129,7 @@ class MetricsSnapshotReporter( metricsMsg.put(group, groupMsg) }) - val header = new MetricsHeader(jobName, jobId, containerName, source, version, samzaVersion, host, clock(), resetTime) + val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime) val metrics = new Metrics(metricsMsg) debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format (source, out, header.getAsMap, metrics.getAsMap)) http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala index 5bc0be6..360e6fa 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala @@ -22,6 +22,7 @@ package org.apache.samza.serializers import java.util.HashMap import java.util.Map +import org.apache.samza.config.ShellCommandConfig import org.apache.samza.metrics.reporter.MetricsSnapshot import org.apache.samza.metrics.reporter.MetricsHeader import org.apache.samza.metrics.reporter.Metrics @@ -33,7 +34,7 @@ class TestMetricsSnapshotSerde { @Ignore @Test def testMetricsSerdeShouldSerializeAndDeserializeAMetric { - val header = new MetricsHeader("test", "testjobid", "task", "test", "version", "samzaversion", "host", 1L, 2L) + val header = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id", "test source", "version", "samzaversion", "host", 1L, 2L) val metricsMap = new HashMap[String, Object]() metricsMap.put("test2", "foo") val metricsGroupMap = new HashMap[String, Map[String, Object]]()
