This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a731312  SAMZA-2669: Inject MetricsRegistry into SamzaContainer so 
that it can be shared in higher layers (#1518)
a731312 is described below

commit a7313129659bd068ab11176bcf8ea4597be3713b
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Aug 24 14:38:02 2021 -0700

    SAMZA-2669: Inject MetricsRegistry into SamzaContainer so that it can be 
shared in higher layers (#1518)
    
    Changes:
    1. Inject a MetricsRegistryMap as an argument into SamzaContainer.apply
    2. Remove unused name field from MetricsRegistryMap.
    
    API changes and upgrade/usage instructions: MetricsRegistryMap no longer 
has a "name" associated with it. There is no longer a constructor for 
MetricsRegistryMap which accepts a name, and MetricsRegistryMap no longer has a 
getName method. Since MetricsRegistryMap is in samza-core, this is not a public 
API, but putting a note here just in case.
---
 .../java/org/apache/samza/processor/StreamProcessor.java     | 12 +++++++++---
 .../java/org/apache/samza/runtime/ContainerLaunchUtil.java   |  2 ++
 .../scala/org/apache/samza/container/SamzaContainer.scala    |  3 ++-
 .../scala/org/apache/samza/job/local/ThreadJobFactory.scala  |  3 ++-
 .../scala/org/apache/samza/metrics/MetricsRegistryMap.scala  |  6 +-----
 .../apache/samza/storage/kv/TestRocksDbKeyValueStore.scala   |  4 ++--
 .../org/apache/samza/system/mock/MockSystemConsumer.java     |  2 +-
 .../samza/webapp/TestYarnContainerHeartbeatServlet.java      |  2 +-
 8 files changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 47c1754..ea5308f 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -404,12 +404,18 @@ public class StreamProcessor {
       LOGGER.warn("StartpointManager cannot be instantiated because no 
metadata store defined for this stream processor");
     }
 
+    /*
+     * StreamProcessor has a metricsRegistry instance variable, but 
StreamProcessor registers its metrics on its own
+     * with the reporters. Therefore, don't reuse the 
StreamProcessor.metricsRegistry, because SamzaContainer also
+     * registers the registry, and that will result in unnecessary duplicate 
metrics.
+     */
+    MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
+
     return SamzaContainer.apply(processorId, jobModel, 
ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
-        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config, 
jobModel),
+        metricsRegistryMap, this.taskFactory, 
JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
         
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        Option.apply(this.externalContextOptional.orElse(null)), null, 
startpointManager,
-        diagnosticsManager);
+        Option.apply(this.externalContextOptional.orElse(null)), null, 
startpointManager, diagnosticsManager);
   }
 
   private static JobCoordinator createJobCoordinator(Config config, String 
processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 004c416..6c04e0a 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -126,10 +126,12 @@ public class ContainerLaunchUtil {
         diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
         
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
       }
+      MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
           containerId, jobModel,
           ScalaJavaUtil.toScalaMap(metricsReporters),
+          metricsRegistryMap,
           taskFactory,
           JobContextImpl.fromConfigWithDefaults(config, jobModel),
           
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index fe27552..c1a6f0d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -124,6 +124,8 @@ object SamzaContainer extends Logging {
     containerId: String,
     jobModel: JobModel,
     customReporters: Map[String, MetricsReporter] = Map[String, 
MetricsReporter](),
+    // TODO SAMZA-2671: there is further room for improvement for metrics 
wiring in general
+    registry: MetricsRegistryMap,
     taskFactory: TaskFactory[_],
     jobContext: JobContext,
     applicationContainerContextFactoryOption: 
Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
@@ -154,7 +156,6 @@ object SamzaContainer extends Logging {
     startupLog("Using configuration: %s" format config)
     startupLog("Using container model: %s" format containerModel)
 
-    val registry = new MetricsRegistryMap(containerName)
     val samzaContainerMetrics = new SamzaContainerMetrics(containerName, 
registry)
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 9b5a073..cf56070 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -69,7 +69,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     }
 
     val metricsRegistry = new MetricsRegistryMap()
-    val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
+    val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, metricsRegistry)
     coordinatorStreamStore.init()
 
     val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
@@ -148,6 +148,7 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
         containerId,
         jobModel,
         Map[String, MetricsReporter](),
+        metricsRegistry,
         taskFactory,
         JobContextImpl.fromConfigWithDefaults(config, jobModel),
         Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index 9751f68..4c167d3 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap
  * A class that holds all metrics registered with it. It can be registered
  * with one or more MetricReporters to flush metrics.
  */
-class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry 
with Logging {
+class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
   var listeners = Set[ReadableMetricsRegistryListener]()
 
   /*
@@ -34,8 +34,6 @@ class MetricsRegistryMap(val name: String) extends 
ReadableMetricsRegistry with
    */
   val metrics = new ConcurrentHashMap[String, ConcurrentHashMap[String, 
Metric]]
 
-  def this() = this("unknown")
-
   def newCounter(group: String, counter: Counter) = {
     debug("Add new counter %s %s %s." format (group, counter.getName, counter))
     putAndGetGroup(group).putIfAbsent(counter.getName, counter)
@@ -82,8 +80,6 @@ class MetricsRegistryMap(val name: String) extends 
ReadableMetricsRegistry with
     metrics.get(group)
   }
 
-  def getName = name
-
   def getGroups = metrics.keySet()
 
   def getGroup(group: String) = metrics.get(group)
diff --git 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 54dca8f..e6e1c62 100644
--- 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -199,7 +199,7 @@ class TestRocksDbKeyValueStore
 
   @Test
   def testMetricsConfig(): Unit = {
-    val registry = new MetricsRegistryMap("registrymap")
+    val registry = new MetricsRegistryMap()
     val metrics = new KeyValueStoreMetrics("dbstore", registry)
 
     val map = new util.HashMap[String, String]()
@@ -229,7 +229,7 @@ class TestRocksDbKeyValueStore
 
   @Test
   def testRocksDBMetricsWithBulkLoadRWRecreate(): Unit = {
-    val registry = new MetricsRegistryMap("registrymap")
+    val registry = new MetricsRegistryMap()
     val metrics = new KeyValueStoreMetrics("dbstore", registry)
 
     // Sample metric values for estimate-num-keys metrics
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
index 2e27a4c..c39dae3 100644
--- 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
+++ 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
@@ -67,7 +67,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap {
    *          How long each thread should sleep between batch writes.
    */
   public MockSystemConsumer(int messagesPerBatch, int threadCount, int 
brokerSleepMs) {
-    super(new MetricsRegistryMap("test-container-performance"), new Clock() {
+    super(new MetricsRegistryMap(), new Clock() {
       @Override
       public long currentTimeMillis() {
         return System.currentTimeMillis();
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index b69c52a..f3b5980 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -55,7 +55,7 @@ public class TestYarnContainerHeartbeatServlet {
   public void setup()
       throws Exception {
     container = mock(YarnContainer.class);
-    ReadableMetricsRegistry registry = new MetricsRegistryMap("test-registry");
+    ReadableMetricsRegistry registry = new MetricsRegistryMap();
 
     yarnAppState =
         new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), 
"testHost", 1, 1);

Reply via email to