This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9e28134 SAMZA-2532: Refactor MetricsSnapshotReporter and
MetricsSnapshotReporterFactory (#1368)
9e28134 is described below
commit 9e281345441b79994822f1383f06d39b9e74ba67
Author: Abhishek Shivanna <[email protected]>
AuthorDate: Wed Jun 3 12:22:17 2020 -0700
SAMZA-2532: Refactor MetricsSnapshotReporter and
MetricsSnapshotReporterFactory (#1368)
---
.../metrics/reporter/MetricsSnapshotReporter.scala | 34 ++++---
.../reporter/MetricsSnapshotReporterFactory.scala | 103 +++++++++++++--------
.../samza/metrics/TestMetricsSnapshotReporter.java | 97 +++++++++++++++++--
3 files changed, 173 insertions(+), 61 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index f9cf819..7a0d79f 100644
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -43,12 +43,12 @@ import scala.collection.JavaConverters._
* taskName // container_567890
* host // eat1-app128.gird
* version // 0.0.1
- * blacklist // Regex of metrics to ignore when flushing
+ * blacklist // Regex of metrics to ignore when flushing
*/
class MetricsSnapshotReporter(
producer: SystemProducer,
out: SystemStream,
- pollingInterval: Int,
+ reportingInterval: Int,
jobName: String,
jobId: String,
containerName: String,
@@ -67,8 +67,8 @@ class MetricsSnapshotReporter(
var registries = List[(String, ReadableMetricsRegistry)]()
var blacklistedMetrics = Set[String]()
- info("got metrics snapshot reporter properties [job name: %s, job id: %s,
containerName: %s, version: %s, samzaVersion: %s, host: %s, pollingInterval %s]"
- format (jobName, jobId, containerName, version, samzaVersion, host,
pollingInterval))
+ info("got metrics snapshot reporter properties [job name: %s, job id: %s,
containerName: %s, version: %s, samzaVersion: %s, host: %s, reportingInterval
%s]"
+ format(jobName, jobId, containerName, version, samzaVersion, host,
reportingInterval))
def start {
info("Starting producer.")
@@ -77,7 +77,7 @@ class MetricsSnapshotReporter(
info("Starting reporter timer.")
- executor.scheduleWithFixedDelay(this, 0, pollingInterval, TimeUnit.SECONDS)
+ executor.scheduleWithFixedDelay(this, 0, reportingInterval,
TimeUnit.SECONDS)
}
def register(source: String, registry: ReadableMetricsRegistry) {
@@ -91,7 +91,7 @@ class MetricsSnapshotReporter(
def stop = {
// Scheduling an event with 0 delay to ensure flushing of metrics one last
time before shutdown
- executor.schedule(this,0, TimeUnit.SECONDS)
+ executor.schedule(this, 0, TimeUnit.SECONDS)
info("Stopping reporter timer.")
// Allow the scheduled task above to finish, and block for termination
(for max 60 seconds)
@@ -106,9 +106,20 @@ class MetricsSnapshotReporter(
}
}
- def run {
- debug("Begin flushing metrics.")
+ def run() {
+ try {
+ innerRun()
+ } catch {
+ case e: Exception =>
+ // Ignore all exceptions - because subsequent executions of this
scheduled task will be suppressed
+ // by the executor if the current task throws an unhandled exception.
+ warn("Error while reporting metrics. Will retry in " +
reportingInterval + " seconds.", e)
+ }
+
+ }
+ def innerRun(): Unit = {
+ debug("Begin flushing metrics.")
for ((source, registry) <- registries) {
debug("Flushing metrics for %s." format source)
@@ -140,7 +151,7 @@ class MetricsSnapshotReporter(
val header = new MetricsHeader(jobName, jobId, containerName,
execEnvironmentContainerId, source, version, samzaVersion, host, clock(),
resetTime)
val metrics = new Metrics(metricsMsg)
- debug("Flushing metrics for %s to %s with header and map: header=%s,
map=%s." format(source, out, header.getAsMap, metrics.getAsMap))
+ debug("Flushing metrics for %s to %s with header and map: header=%s,
map=%s." format(source, out, header.getAsMap, metrics.getAsMap()))
val metricsSnapshot = new MetricsSnapshot(header, metrics)
val maybeSerialized = if (serializer != null) {
@@ -160,12 +171,9 @@ class MetricsSnapshotReporter(
}
}
}
-
-
debug("Finished flushing metrics.")
}
-
def shouldIgnore(group: String, metricName: String) = {
var isBlacklisted = blacklist.isDefined
val fullMetricName = group + "." + metricName
@@ -173,7 +181,7 @@ class MetricsSnapshotReporter(
if (isBlacklisted && !blacklistedMetrics.contains(fullMetricName)) {
if (fullMetricName.matches(blacklist.get)) {
blacklistedMetrics += fullMetricName
- info("Blacklisted metric %s because it matched blacklist regex: %s"
format(fullMetricName, blacklist.get))
+ debug("Blacklisted metric %s because it matched blacklist regex: %s"
format(fullMetricName, blacklist.get))
} else {
isBlacklisted = false
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 441d834..2f9a0ba 100644
---
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -19,53 +19,47 @@
package org.apache.samza.metrics.reporter
-import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JobConfig, MetricsConfig,
SerializerConfig, StreamConfig, SystemConfig}
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory}
-import org.apache.samza.system.SystemFactory
+import org.apache.samza.config._
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter,
MetricsReporterFactory}
+import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde,
SerdeFactory}
+import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
class MetricsSnapshotReporterFactory extends MetricsReporterFactory with
Logging {
- def getMetricsReporter(name: String, containerName: String, config: Config):
MetricsReporter = {
- info("Creating new metrics snapshot reporter.")
-
- val jobConfig = new JobConfig(config)
- val jobName = JavaOptionals.toRichOptional(jobConfig.getName).toOption
- .getOrElse(throw new SamzaException("Job name must be defined in
config."))
- val jobId = jobConfig.getJobId
-
- val metricsConfig = new MetricsConfig(config)
- val metricsSystemStreamName =
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(name))
- .toOption
- .getOrElse(throw new SamzaException("No metrics stream defined in
config."))
-
- val systemStream =
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
-
- info("Got system stream %s." format systemStream)
-
- val systemName = systemStream.getSystem
+ protected def getProducer(reporterName: String, config: Config, registry:
MetricsRegistryMap): SystemProducer = {
val systemConfig = new SystemConfig(config)
+ val systemName = getSystemStream(reporterName, config).getSystem
val systemFactoryClassName =
JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
.getOrElse(throw new SamzaException("Trying to fetch system factory for
system %s, which isn't defined in config." format systemName))
-
val systemFactory = ReflectionUtil.getObj(systemFactoryClassName,
classOf[SystemFactory])
info("Got system factory %s." format systemFactory)
+ val producer = systemFactory.getProducer(systemName, config, registry)
+ info("Got producer %s." format producer)
- val registry = new MetricsRegistryMap
+ producer
+ }
- val producer = systemFactory.getProducer(systemName, config, registry)
+ protected def getSystemStream(reporterName: String, config: Config):
SystemStream = {
+ val metricsConfig = new MetricsConfig(config)
+ val metricsSystemStreamName =
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(reporterName))
+ .toOption
+ .getOrElse(throw new SamzaException("No metrics stream defined in
config."))
+ val systemStream =
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
+ info("Got system stream %s." format systemStream)
+ systemStream
+ }
- info("Got producer %s." format producer)
+ protected def getSerde(reporterName: String, config: Config):
Serde[MetricsSnapshot] = {
val streamConfig = new StreamConfig(config)
+ val systemConfig = new SystemConfig(config)
+ val systemStream = getSystemStream(reporterName, config)
val streamSerdeName = streamConfig.getStreamMsgSerde(systemStream)
- val systemSerdeName = systemConfig.getSystemMsgSerde(systemName)
+ val systemSerdeName =
systemConfig.getSystemMsgSerde(systemStream.getSystem)
val serdeName = streamSerdeName.orElse(systemSerdeName.orElse(null))
val serializerConfig = new SerializerConfig(config)
val serde = if (serdeName != null) {
@@ -77,29 +71,62 @@ class MetricsSnapshotReporterFactory extends
MetricsReporterFactory with Logging
} else {
new MetricsSnapshotSerdeV2
}
-
info("Got serde %s." format serde)
+ serde
+ }
+
- val pollingInterval: Int =
metricsConfig.getMetricsSnapshotReporterInterval(name)
+ protected def getBlacklist(reporterName: String, config: Config):
Option[String] = {
+ val metricsConfig = new MetricsConfig(config)
+ val blacklist =
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
+ info("Got blacklist as: %s" format blacklist)
+ blacklist
+ }
- info("Setting polling interval to %d" format pollingInterval)
+ protected def getReportingInterval(reporterName: String, config: Config):
Int = {
+ val metricsConfig = new MetricsConfig(config)
+ val reportingInterval =
metricsConfig.getMetricsSnapshotReporterInterval(reporterName)
+ info("Got reporting interval: %d" format reportingInterval)
+ reportingInterval
+ }
+
+ protected def getJobId(config: Config): String = {
+ val jobConfig = new JobConfig(config)
+ jobConfig.getJobId
+ }
+
+ protected def getJobName(config: Config): String = {
+ val jobConfig = new JobConfig(config)
+ JavaOptionals.toRichOptional(jobConfig.getName).toOption
+ .getOrElse(throw new SamzaException("Job name must be defined in
config."))
+ }
+
+
+ def getMetricsReporter(reporterName: String, containerName: String, config:
Config): MetricsReporter = {
+ info("Creating new metrics snapshot reporter.")
+ val registry = new MetricsRegistryMap
- val blacklist =
JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(name)).toOption
- info("Setting blacklist to %s" format blacklist)
+ val systemStream = getSystemStream(reporterName, config)
+ val producer = getProducer(reporterName, config, registry)
+ val reportingInterval = getReportingInterval(reporterName, config);
+ val jobName = getJobName(config)
+ val jobId = getJobId(config)
+ val serde = getSerde(reporterName, config)
+ val blacklist = getBlacklist(reporterName, config)
val reporter = new MetricsSnapshotReporter(
producer,
systemStream,
- pollingInterval,
+ reportingInterval,
jobName,
jobId,
containerName,
Util.getTaskClassVersion(config),
- Util.getSamzaVersion(),
+ Util.getSamzaVersion,
Util.getLocalHost.getHostName,
serde, blacklist)
- reporter.register(this.getClass.getSimpleName.toString, registry)
+ reporter.register(this.getClass.getSimpleName, registry)
reporter
}
diff --git
a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
index 1ddf70f..1f69a7e 100644
---
a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
+++
b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
@@ -19,23 +19,54 @@
package org.apache.samza.metrics;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.serializers.Serializer;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.inmemory.InMemorySystemProducer;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import scala.Some;
import scala.runtime.AbstractFunction0;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.eq;
+
public class TestMetricsSnapshotReporter {
+
private MetricsSnapshotReporter metricsSnapshotReporter;
private static final String BLACKLIST_ALL = ".*";
private static final String BLACKLIST_NONE = "";
private static final String BLACKLIST_GROUPS =
".*(SystemConsumersMetrics|CachedStoreMetrics).*";
private static final String BLACKLIST_ALL_BUT_TWO_GROUPS =
"^(?!.*?(?:SystemConsumersMetrics|CachedStoreMetrics)).*$";
+ private static final SystemStream SYSTEM_STREAM = new SystemStream("test
system", "test stream");
+ private static final String JOB_NAME = "test job";
+ private static final String JOB_ID = "test jobID";
+ private static final String CONTAINER_NAME = "samza-container-0";
+ private static final String TASK_VERSION = "test version";
+ private static final String SAMZA_VERSION = "test samza version";
+ private static final String HOSTNAME = "test host";
+ private static final int REPORTING_INTERVAL = 60000;
+
+ private Serializer<MetricsSnapshot> serializer;
+ private SystemProducer producer;
+
+ @Before
+ public void setup() {
+ producer = mock(SystemProducer.class);
+ serializer = new MetricsSnapshotSerdeV2();
+ }
+
@Test
public void testBlacklistAll() {
this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_ALL);
@@ -101,15 +132,61 @@ public class TestMetricsSnapshotReporter {
"poll-count"));
}
+ @Test
+ public void testMetricsEmission() {
+ // setup
+ serializer = null;
+ String source = "testSource";
+ String group = "someGroup";
+ String metricName = "someName";
+ MetricsRegistryMap registry = new MetricsRegistryMap();
+
+ metricsSnapshotReporter =
getMetricsSnapshotReporter(TestMetricsSnapshotReporter.BLACKLIST_NONE);
+ registry.newGauge(group, metricName, 42);
+ metricsSnapshotReporter.register(source, registry);
+
+ ArgumentCaptor<OutgoingMessageEnvelope>
outgoingMessageEnvelopeArgumentCaptor =
+ ArgumentCaptor.forClass(OutgoingMessageEnvelope.class);
+
+ // run
+ metricsSnapshotReporter.run();
+
+ // assert
+ verify(producer, times(1)).send(eq(source),
outgoingMessageEnvelopeArgumentCaptor.capture());
+ verify(producer, times(1)).flush(eq(source));
+
+ List<OutgoingMessageEnvelope> envelopes =
outgoingMessageEnvelopeArgumentCaptor.getAllValues();
+
+ Assert.assertEquals(1, envelopes.size());
+
+ MetricsSnapshot metricsSnapshot = (MetricsSnapshot)
envelopes.get(0).getMessage();
+
+ Assert.assertEquals(JOB_NAME, metricsSnapshot.getHeader().getJobName());
+ Assert.assertEquals(JOB_ID, metricsSnapshot.getHeader().getJobId());
+ Assert.assertEquals(CONTAINER_NAME,
metricsSnapshot.getHeader().getContainerName());
+ Assert.assertEquals(source, metricsSnapshot.getHeader().getSource());
+ Assert.assertEquals(SAMZA_VERSION,
metricsSnapshot.getHeader().getSamzaVersion());
+ Assert.assertEquals(TASK_VERSION,
metricsSnapshot.getHeader().getVersion());
+ Assert.assertEquals(HOSTNAME, metricsSnapshot.getHeader().getHost());
+
+ Map<String, Map<String, Object>> metricMap =
metricsSnapshot.getMetrics().getAsMap();
+ Assert.assertEquals(1, metricMap.size());
+ Assert.assertTrue(metricMap.containsKey(group));
+ Assert.assertTrue(metricMap.get(group).containsKey(metricName));
+ Assert.assertEquals(42, metricMap.get(group).get(metricName));
+ }
+
private MetricsSnapshotReporter getMetricsSnapshotReporter(String blacklist)
{
- return new MetricsSnapshotReporter(new InMemorySystemProducer("test
system", null),
- new SystemStream("test system", "test stream"), 60000, "test job",
"test jobID", "samza-container-0",
- "test version", "test samza version", "test host", new
MetricsSnapshotSerdeV2(), new Some<>(blacklist),
- new AbstractFunction0<Object>() {
- @Override
- public Object apply() {
- return System.currentTimeMillis();
- }
- });
+ return new MetricsSnapshotReporter(producer, SYSTEM_STREAM,
REPORTING_INTERVAL, JOB_NAME, JOB_ID, CONTAINER_NAME,
+ TASK_VERSION, SAMZA_VERSION, HOSTNAME, serializer, new
Some<>(blacklist), getClock());
+ }
+
+ private AbstractFunction0<Object> getClock() {
+ return new AbstractFunction0<Object>() {
+ @Override
+ public Object apply() {
+ return System.currentTimeMillis();
+ }
+ };
}
}