This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e28134  SAMZA-2532: Refactor MetricsSnapshotReporter and 
MetricsSnapshotReporterFactory (#1368)
9e28134 is described below

commit 9e281345441b79994822f1383f06d39b9e74ba67
Author: Abhishek Shivanna <[email protected]>
AuthorDate: Wed Jun 3 12:22:17 2020 -0700

    SAMZA-2532: Refactor MetricsSnapshotReporter and 
MetricsSnapshotReporterFactory (#1368)
---
 .../metrics/reporter/MetricsSnapshotReporter.scala |  34 ++++---
 .../reporter/MetricsSnapshotReporterFactory.scala  | 103 +++++++++++++--------
 .../samza/metrics/TestMetricsSnapshotReporter.java |  97 +++++++++++++++++--
 3 files changed, 173 insertions(+), 61 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index f9cf819..7a0d79f 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -43,12 +43,12 @@ import scala.collection.JavaConverters._
  * taskName // container_567890
  * host // eat1-app128.gird
  * version // 0.0.1
-  * blacklist // Regex of metrics to ignore when flushing
+ * blacklist // Regex of metrics to ignore when flushing
  */
 class MetricsSnapshotReporter(
   producer: SystemProducer,
   out: SystemStream,
-  pollingInterval: Int,
+  reportingInterval: Int,
   jobName: String,
   jobId: String,
   containerName: String,
@@ -67,8 +67,8 @@ class MetricsSnapshotReporter(
   var registries = List[(String, ReadableMetricsRegistry)]()
   var blacklistedMetrics = Set[String]()
 
-  info("got metrics snapshot reporter properties [job name: %s, job id: %s, 
containerName: %s, version: %s, samzaVersion: %s, host: %s, pollingInterval %s]"
-    format (jobName, jobId, containerName, version, samzaVersion, host, 
pollingInterval))
+  info("got metrics snapshot reporter properties [job name: %s, job id: %s, 
containerName: %s, version: %s, samzaVersion: %s, host: %s, reportingInterval 
%s]"
+    format(jobName, jobId, containerName, version, samzaVersion, host, 
reportingInterval))
 
   def start {
     info("Starting producer.")
@@ -77,7 +77,7 @@ class MetricsSnapshotReporter(
 
     info("Starting reporter timer.")
 
-    executor.scheduleWithFixedDelay(this, 0, pollingInterval, TimeUnit.SECONDS)
+    executor.scheduleWithFixedDelay(this, 0, reportingInterval, 
TimeUnit.SECONDS)
   }
 
   def register(source: String, registry: ReadableMetricsRegistry) {
@@ -91,7 +91,7 @@ class MetricsSnapshotReporter(
   def stop = {
 
     // Scheduling an event with 0 delay to ensure flushing of metrics one last 
time before shutdown
-    executor.schedule(this,0, TimeUnit.SECONDS)
+    executor.schedule(this, 0, TimeUnit.SECONDS)
 
     info("Stopping reporter timer.")
     // Allow the scheduled task above to finish, and block for termination 
(for max 60 seconds)
@@ -106,9 +106,20 @@ class MetricsSnapshotReporter(
     }
   }
 
-  def run {
-    debug("Begin flushing metrics.")
+  def run() {
+    try {
+      innerRun()
+    } catch {
+      case e: Exception =>
+        // Ignore all exceptions - because subsequent executions of this 
scheduled task will be suppressed
+        // by the executor if the current task throws an unhandled exception.
+        warn("Error while reporting metrics. Will retry in " + 
reportingInterval + " seconds.", e)
+    }
+
+  }
 
+  def innerRun(): Unit = {
+    debug("Begin flushing metrics.")
     for ((source, registry) <- registries) {
       debug("Flushing metrics for %s." format source)
 
@@ -140,7 +151,7 @@ class MetricsSnapshotReporter(
         val header = new MetricsHeader(jobName, jobId, containerName, 
execEnvironmentContainerId, source, version, samzaVersion, host, clock(), 
resetTime)
         val metrics = new Metrics(metricsMsg)
 
-        debug("Flushing metrics for %s to %s with header and map: header=%s, 
map=%s." format(source, out, header.getAsMap, metrics.getAsMap))
+        debug("Flushing metrics for %s to %s with header and map: header=%s, 
map=%s." format(source, out, header.getAsMap, metrics.getAsMap()))
 
         val metricsSnapshot = new MetricsSnapshot(header, metrics)
         val maybeSerialized = if (serializer != null) {
@@ -160,12 +171,9 @@ class MetricsSnapshotReporter(
         }
       }
     }
-
-
     debug("Finished flushing metrics.")
   }
 
-
   def shouldIgnore(group: String, metricName: String) = {
     var isBlacklisted = blacklist.isDefined
     val fullMetricName = group + "." + metricName
@@ -173,7 +181,7 @@ class MetricsSnapshotReporter(
     if (isBlacklisted && !blacklistedMetrics.contains(fullMetricName)) {
       if (fullMetricName.matches(blacklist.get)) {
         blacklistedMetrics += fullMetricName
-        info("Blacklisted metric %s because it matched blacklist regex: %s" 
format(fullMetricName, blacklist.get))
+        debug("Blacklisted metric %s because it matched blacklist regex: %s" 
format(fullMetricName, blacklist.get))
       } else {
         isBlacklisted = false
       }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 441d834..2f9a0ba 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -19,53 +19,47 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig, 
SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, 
MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, 
SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
 
 class MetricsSnapshotReporterFactory extends MetricsReporterFactory with 
Logging {
-  def getMetricsReporter(name: String, containerName: String, config: Config): 
MetricsReporter = {
-    info("Creating new metrics snapshot reporter.")
-
-    val jobConfig = new JobConfig(config)
-    val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
-      .getOrElse(throw new SamzaException("Job name must be defined in 
config."))
-    val jobId = jobConfig.getJobId
-
-    val metricsConfig = new MetricsConfig(config)
-    val metricsSystemStreamName = 
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
-      .toOption
-      .getOrElse(throw new SamzaException("No metrics stream defined in 
config."))
-
-    val systemStream = 
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
-    info("Got system stream %s." format systemStream)
-
-    val systemName = systemStream.getSystem
 
+  protected def getProducer(reporterName: String, config: Config, registry: 
MetricsRegistryMap): SystemProducer = {
     val systemConfig = new SystemConfig(config)
+    val systemName = getSystemStream(reporterName, config).getSystem
     val systemFactoryClassName = 
JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
       .getOrElse(throw new SamzaException("Trying to fetch system factory for 
system %s, which isn't defined in config." format systemName))
-
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, 
classOf[SystemFactory])
 
     info("Got system factory %s." format systemFactory)
+    val producer = systemFactory.getProducer(systemName, config, registry)
+    info("Got producer %s." format producer)
 
-    val registry = new MetricsRegistryMap
+    producer
+  }
 
-    val producer = systemFactory.getProducer(systemName, config, registry)
+  protected def getSystemStream(reporterName: String, config: Config): 
SystemStream = {
+    val metricsConfig = new MetricsConfig(config)
+    val metricsSystemStreamName = 
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(reporterName))
+      .toOption
+      .getOrElse(throw new SamzaException("No metrics stream defined in 
config."))
+    val systemStream = 
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
+    info("Got system stream %s." format systemStream)
+    systemStream
+  }
 
-    info("Got producer %s." format producer)
+  protected def getSerde(reporterName: String, config: Config): 
Serde[MetricsSnapshot] = {
     val streamConfig = new StreamConfig(config)
+    val systemConfig = new SystemConfig(config)
+    val systemStream = getSystemStream(reporterName, config)
 
     val streamSerdeName = streamConfig.getStreamMsgSerde(systemStream)
-    val systemSerdeName = systemConfig.getSystemMsgSerde(systemName)
+    val systemSerdeName = 
systemConfig.getSystemMsgSerde(systemStream.getSystem)
     val serdeName = streamSerdeName.orElse(systemSerdeName.orElse(null))
     val serializerConfig = new SerializerConfig(config)
     val serde = if (serdeName != null) {
@@ -77,29 +71,62 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
     } else {
       new MetricsSnapshotSerdeV2
     }
-
     info("Got serde %s." format serde)
+    serde
+  }
+
 
-    val pollingInterval: Int = 
metricsConfig.getMetricsSnapshotReporterInterval(name)
+  protected def getBlacklist(reporterName: String, config: Config): 
Option[String] = {
+    val metricsConfig = new MetricsConfig(config)
+    val blacklist = 
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
+    info("Got blacklist as: %s" format blacklist)
+    blacklist
+  }
 
-    info("Setting polling interval to %d" format pollingInterval)
+  protected def getReportingInterval(reporterName: String, config: Config): 
Int = {
+    val metricsConfig = new MetricsConfig(config)
+    val reportingInterval = 
metricsConfig.getMetricsSnapshotReporterInterval(reporterName)
+    info("Got reporting interval: %d" format reportingInterval)
+    reportingInterval
+  }
+
+  protected def getJobId(config: Config): String = {
+    val jobConfig = new JobConfig(config)
+    jobConfig.getJobId
+  }
+
+  protected def getJobName(config: Config): String = {
+    val jobConfig = new JobConfig(config)
+    JavaOptionals.toRichOptional(jobConfig.getName).toOption
+      .getOrElse(throw new SamzaException("Job name must be defined in 
config."))
+  }
+
+
+  def getMetricsReporter(reporterName: String, containerName: String, config: 
Config): MetricsReporter = {
+    info("Creating new metrics snapshot reporter.")
+    val registry = new MetricsRegistryMap
 
-    val blacklist = 
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(name)).toOption
-    info("Setting blacklist to %s" format blacklist)
+    val systemStream = getSystemStream(reporterName, config)
+    val producer = getProducer(reporterName, config, registry)
+    val reportingInterval = getReportingInterval(reporterName, config);
+    val jobName = getJobName(config)
+    val jobId = getJobId(config)
+    val serde = getSerde(reporterName, config)
+    val blacklist = getBlacklist(reporterName, config)
 
     val reporter = new MetricsSnapshotReporter(
       producer,
       systemStream,
-      pollingInterval,
+      reportingInterval,
       jobName,
       jobId,
       containerName,
       Util.getTaskClassVersion(config),
-      Util.getSamzaVersion(),
+      Util.getSamzaVersion,
       Util.getLocalHost.getHostName,
       serde, blacklist)
 
-    reporter.register(this.getClass.getSimpleName.toString, registry)
+    reporter.register(this.getClass.getSimpleName, registry)
 
     reporter
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
 
b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
index 1ddf70f..1f69a7e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
+++ 
b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
@@ -19,23 +19,54 @@
 
 package org.apache.samza.metrics;
 
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.serializers.Serializer;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.inmemory.InMemorySystemProducer;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import scala.Some;
 import scala.runtime.AbstractFunction0;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.eq;
+
 
 public class TestMetricsSnapshotReporter {
+
   private MetricsSnapshotReporter metricsSnapshotReporter;
   private static final String BLACKLIST_ALL = ".*";
   private static final String BLACKLIST_NONE = "";
   private static final String BLACKLIST_GROUPS = 
".*(SystemConsumersMetrics|CachedStoreMetrics).*";
   private static final String BLACKLIST_ALL_BUT_TWO_GROUPS = 
"^(?!.*?(?:SystemConsumersMetrics|CachedStoreMetrics)).*$";
 
+  private static final SystemStream SYSTEM_STREAM = new SystemStream("test 
system", "test stream");
+  private static final String JOB_NAME = "test job";
+  private static final String JOB_ID = "test jobID";
+  private static final String CONTAINER_NAME = "samza-container-0";
+  private static final String TASK_VERSION = "test version";
+  private static final String SAMZA_VERSION = "test samza version";
+  private static final String HOSTNAME = "test host";
+  private static final int REPORTING_INTERVAL = 60000;
+
+  private Serializer<MetricsSnapshot> serializer;
+  private SystemProducer producer;
+
+  @Before
+  public void setup() {
+    producer = mock(SystemProducer.class);
+    serializer = new MetricsSnapshotSerdeV2();
+  }
+
   @Test
   public void testBlacklistAll() {
     this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_ALL);
@@ -101,15 +132,61 @@ public class TestMetricsSnapshotReporter {
             "poll-count"));
   }
 
+  @Test
+  public void testMetricsEmission() {
+    // setup
+    serializer = null;
+    String source = "testSource";
+    String group = "someGroup";
+    String metricName = "someName";
+    MetricsRegistryMap registry = new MetricsRegistryMap();
+
+    metricsSnapshotReporter = 
getMetricsSnapshotReporter(TestMetricsSnapshotReporter.BLACKLIST_NONE);
+    registry.newGauge(group, metricName, 42);
+    metricsSnapshotReporter.register(source, registry);
+
+    ArgumentCaptor<OutgoingMessageEnvelope> 
outgoingMessageEnvelopeArgumentCaptor =
+        ArgumentCaptor.forClass(OutgoingMessageEnvelope.class);
+
+    // run
+    metricsSnapshotReporter.run();
+
+    // assert
+    verify(producer, times(1)).send(eq(source), 
outgoingMessageEnvelopeArgumentCaptor.capture());
+    verify(producer, times(1)).flush(eq(source));
+
+    List<OutgoingMessageEnvelope> envelopes = 
outgoingMessageEnvelopeArgumentCaptor.getAllValues();
+
+    Assert.assertEquals(1, envelopes.size());
+
+    MetricsSnapshot metricsSnapshot = (MetricsSnapshot) 
envelopes.get(0).getMessage();
+
+    Assert.assertEquals(JOB_NAME, metricsSnapshot.getHeader().getJobName());
+    Assert.assertEquals(JOB_ID, metricsSnapshot.getHeader().getJobId());
+    Assert.assertEquals(CONTAINER_NAME, 
metricsSnapshot.getHeader().getContainerName());
+    Assert.assertEquals(source, metricsSnapshot.getHeader().getSource());
+    Assert.assertEquals(SAMZA_VERSION, 
metricsSnapshot.getHeader().getSamzaVersion());
+    Assert.assertEquals(TASK_VERSION, 
metricsSnapshot.getHeader().getVersion());
+    Assert.assertEquals(HOSTNAME, metricsSnapshot.getHeader().getHost());
+
+    Map<String, Map<String, Object>> metricMap = 
metricsSnapshot.getMetrics().getAsMap();
+    Assert.assertEquals(1, metricMap.size());
+    Assert.assertTrue(metricMap.containsKey(group));
+    Assert.assertTrue(metricMap.get(group).containsKey(metricName));
+    Assert.assertEquals(42, metricMap.get(group).get(metricName));
+  }
+
   private MetricsSnapshotReporter getMetricsSnapshotReporter(String blacklist) 
{
-    return new MetricsSnapshotReporter(new InMemorySystemProducer("test 
system", null),
-        new SystemStream("test system", "test stream"), 60000, "test job", 
"test jobID", "samza-container-0",
-        "test version", "test samza version", "test host", new 
MetricsSnapshotSerdeV2(), new Some<>(blacklist),
-        new AbstractFunction0<Object>() {
-          @Override
-          public Object apply() {
-            return System.currentTimeMillis();
-          }
-        });
+    return new MetricsSnapshotReporter(producer, SYSTEM_STREAM, 
REPORTING_INTERVAL, JOB_NAME, JOB_ID, CONTAINER_NAME,
+        TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, new 
Some<>(blacklist), getClock());
+  }
+
+  private AbstractFunction0<Object> getClock() {
+    return new AbstractFunction0<Object>() {
+      @Override
+      public Object apply() {
+        return System.currentTimeMillis();
+      }
+    };
   }
 }

Reply via email to