This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a731312 SAMZA-2669: Inject MetricsRegistry into SamzaContainer so
that it can be shared in higher layers (#1518)
a731312 is described below
commit a7313129659bd068ab11176bcf8ea4597be3713b
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Aug 24 14:38:02 2021 -0700
SAMZA-2669: Inject MetricsRegistry into SamzaContainer so that it can be
shared in higher layers (#1518)
Changes:
1. Inject a MetricsRegistryMap as an argument into SamzaContainer.apply
2. Remove unused name field from MetricsRegistryMap.
API changes and upgrade/usage instructions: MetricsRegistryMap no longer
has a "name" associated with it. There is no longer a constructor for
MetricsRegistryMap which accepts a name, and MetricsRegistryMap no longer has a
getName method. Since MetricsRegistryMap is in samza-core, this is not a public
API, but putting a note here just in case.
---
.../java/org/apache/samza/processor/StreamProcessor.java | 12 +++++++++---
.../java/org/apache/samza/runtime/ContainerLaunchUtil.java | 2 ++
.../scala/org/apache/samza/container/SamzaContainer.scala | 3 ++-
.../scala/org/apache/samza/job/local/ThreadJobFactory.scala | 3 ++-
.../scala/org/apache/samza/metrics/MetricsRegistryMap.scala | 6 +-----
.../apache/samza/storage/kv/TestRocksDbKeyValueStore.scala | 4 ++--
.../org/apache/samza/system/mock/MockSystemConsumer.java | 2 +-
.../samza/webapp/TestYarnContainerHeartbeatServlet.java | 2 +-
8 files changed, 20 insertions(+), 14 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 47c1754..ea5308f 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -404,12 +404,18 @@ public class StreamProcessor {
LOGGER.warn("StartpointManager cannot be instantiated because no
metadata store defined for this stream processor");
}
+ /*
+ * StreamProcessor has a metricsRegistry instance variable, but
StreamProcessor registers its metrics on its own
+ * with the reporters. Therefore, don't reuse the
StreamProcessor.metricsRegistry, because SamzaContainer also
+ * registers the registry, and that will result in unnecessary duplicate
metrics.
+ */
+ MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
+
return SamzaContainer.apply(processorId, jobModel,
ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
- this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config,
jobModel),
+ metricsRegistryMap, this.taskFactory,
JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
- Option.apply(this.externalContextOptional.orElse(null)), null,
startpointManager,
- diagnosticsManager);
+ Option.apply(this.externalContextOptional.orElse(null)), null,
startpointManager, diagnosticsManager);
}
private static JobCoordinator createJobCoordinator(Config config, String
processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 004c416..6c04e0a 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -126,10 +126,12 @@ public class ContainerLaunchUtil {
diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
diagnosticsManagerReporterPair.get().getValue());
}
+ MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
SamzaContainer container = SamzaContainer$.MODULE$.apply(
containerId, jobModel,
ScalaJavaUtil.toScalaMap(metricsReporters),
+ metricsRegistryMap,
taskFactory,
JobContextImpl.fromConfigWithDefaults(config, jobModel),
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index fe27552..c1a6f0d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -124,6 +124,8 @@ object SamzaContainer extends Logging {
containerId: String,
jobModel: JobModel,
customReporters: Map[String, MetricsReporter] = Map[String,
MetricsReporter](),
+ // TODO SAMZA-2671: there is further room for improvement for metrics
wiring in general
+ registry: MetricsRegistryMap,
taskFactory: TaskFactory[_],
jobContext: JobContext,
applicationContainerContextFactoryOption:
Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
@@ -154,7 +156,6 @@ object SamzaContainer extends Logging {
startupLog("Using configuration: %s" format config)
startupLog("Using container model: %s" format containerModel)
- val registry = new MetricsRegistryMap(containerName)
val samzaContainerMetrics = new SamzaContainerMetrics(containerName,
registry)
val systemProducersMetrics = new SystemProducersMetrics(registry)
val systemConsumersMetrics = new SystemConsumersMetrics(registry)
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 9b5a073..cf56070 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -69,7 +69,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
}
val metricsRegistry = new MetricsRegistryMap()
- val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
+ val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, metricsRegistry)
coordinatorStreamStore.init()
val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
@@ -148,6 +148,7 @@ class ThreadJobFactory extends StreamJobFactory with
Logging {
containerId,
jobModel,
Map[String, MetricsReporter](),
+ metricsRegistry,
taskFactory,
JobContextImpl.fromConfigWithDefaults(config, jobModel),
Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index 9751f68..4c167d3 100644
---
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap
* A class that holds all metrics registered with it. It can be registered
* with one or more MetricReporters to flush metrics.
*/
-class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry
with Logging {
+class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
var listeners = Set[ReadableMetricsRegistryListener]()
/*
@@ -34,8 +34,6 @@ class MetricsRegistryMap(val name: String) extends
ReadableMetricsRegistry with
*/
val metrics = new ConcurrentHashMap[String, ConcurrentHashMap[String,
Metric]]
- def this() = this("unknown")
-
def newCounter(group: String, counter: Counter) = {
debug("Add new counter %s %s %s." format (group, counter.getName, counter))
putAndGetGroup(group).putIfAbsent(counter.getName, counter)
@@ -82,8 +80,6 @@ class MetricsRegistryMap(val name: String) extends
ReadableMetricsRegistry with
metrics.get(group)
}
- def getName = name
-
def getGroups = metrics.keySet()
def getGroup(group: String) = metrics.get(group)
diff --git
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 54dca8f..e6e1c62 100644
---
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -199,7 +199,7 @@ class TestRocksDbKeyValueStore
@Test
def testMetricsConfig(): Unit = {
- val registry = new MetricsRegistryMap("registrymap")
+ val registry = new MetricsRegistryMap()
val metrics = new KeyValueStoreMetrics("dbstore", registry)
val map = new util.HashMap[String, String]()
@@ -229,7 +229,7 @@ class TestRocksDbKeyValueStore
@Test
def testRocksDBMetricsWithBulkLoadRWRecreate(): Unit = {
- val registry = new MetricsRegistryMap("registrymap")
+ val registry = new MetricsRegistryMap()
val metrics = new KeyValueStoreMetrics("dbstore", registry)
// Sample metric values for estimate-num-keys metrics
diff --git
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
index 2e27a4c..c39dae3 100644
---
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
+++
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
@@ -67,7 +67,7 @@ public class MockSystemConsumer extends BlockingEnvelopeMap {
* How long each thread should sleep between batch writes.
*/
public MockSystemConsumer(int messagesPerBatch, int threadCount, int
brokerSleepMs) {
- super(new MetricsRegistryMap("test-container-performance"), new Clock() {
+ super(new MetricsRegistryMap(), new Clock() {
@Override
public long currentTimeMillis() {
return System.currentTimeMillis();
diff --git
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
index b69c52a..f3b5980 100644
---
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
+++
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -55,7 +55,7 @@ public class TestYarnContainerHeartbeatServlet {
public void setup()
throws Exception {
container = mock(YarnContainer.class);
- ReadableMetricsRegistry registry = new MetricsRegistryMap("test-registry");
+ ReadableMetricsRegistry registry = new MetricsRegistryMap();
yarnAppState =
new YarnAppState(-1,
ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"),
"testHost", 1, 1);