This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 7008e0d Bugfix: Recent CSM refactor was causing some metrics to not
be emitted. Fixed <task>-restore-time metric.
7008e0d is described below
commit 7008e0d4130d429063493ce0bd413fe52d9463f1
Author: Ray Matharu <[email protected]>
AuthorDate: Fri Mar 8 12:21:20 2019 -0800
Bugfix: Recent CSM refactor was causing some metrics to not be emitted.
Fixed <task>-restore-time metric.
Author: Ray Matharu <[email protected]>
Reviewers: prateekm
Closes #942 from rmatharu/test-metricsfix and squashes the following
commits:
c5a072f4 [Ray Matharu] minor fix
3c1e25ad [Ray Matharu] minor
b13b485a [Ray Matharu] minor
7e60ad8b [Ray Matharu] minor
7634a470 [Ray Matharu] removing CSM's registerMetrics
78ee37e1 [Ray Matharu] removing unused imports
f207f03d [Ray Matharu] minor
8aec4fa7 [Ray Matharu] minor
0c121262 [Ray Matharu] Fixing metrics after moving sideInputs to CSM
8bd3b19f [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
6b58c862 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
89be3652 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
6fe29268 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
96e3d8f3 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
40f68a61 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
497602ab [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
1a72dc48 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
36c0b339 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
12ca96bb [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
ee7daac8 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
08006871 [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
916f66ae [Ray Matharu] Merge branch 'master' of
https://github.com/apache/samza
2c09b081 [Ray Matharu] Rocksdb bug fix
---
.../org/apache/samza/container/SamzaContainer.scala | 9 +++++----
.../apache/samza/container/SamzaContainerMetrics.scala | 4 ++--
.../scala/org/apache/samza/container/TaskInstance.scala | 7 -------
.../apache/samza/storage/ContainerStorageManager.java | 17 +++++++++--------
.../apache/samza/system/SystemConsumersMetrics.scala | 7 +++++--
.../java/org/apache/samza/task/TestAsyncRunLoop.java | 1 -
.../org/apache/samza/container/TestSamzaContainer.scala | 1 +
.../samza/processor/StreamProcessorTestUtils.scala | 1 +
8 files changed, 23 insertions(+), 24 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5df4678..ab89396 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -594,7 +594,6 @@ object SamzaContainer extends Logging {
offsetManager = offsetManager,
storageManager = storageManager,
tableManager = tableManager,
- reporters = reporters,
systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
exceptionHandler =
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
jobModel = jobModel,
@@ -663,6 +662,7 @@ object SamzaContainer extends Logging {
new SamzaContainer(
config = config,
taskInstances = taskInstances,
+ taskInstanceMetrics = taskInstanceMetrics,
runLoop = runLoop,
systemAdmins = systemAdmins,
consumerMultiplexer = consumerMultiplexer,
@@ -700,6 +700,7 @@ object SamzaContainer extends Logging {
class SamzaContainer(
config: Config,
taskInstances: Map[TaskName, TaskInstance],
+ taskInstanceMetrics: Map[TaskName, TaskInstanceMetrics],
runLoop: Runnable,
systemAdmins: SystemAdmins,
consumerMultiplexer: SystemConsumers,
@@ -879,9 +880,9 @@ class SamzaContainer(
}
def startMetrics {
- info("Registering task instances with metrics.")
-
- taskInstances.values.foreach(_.registerMetrics)
+ info("Registering task instance metrics.")
+ reporters.values.foreach(reporter =>
+ taskInstanceMetrics.values.foreach(taskMetrics =>
reporter.register(taskMetrics.source, taskMetrics.registry)))
info("Starting JVM metrics.")
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index d5cf6c6..326156b 100644
---
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -51,8 +51,8 @@ class SamzaContainerMetrics(
val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
- def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
- taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time"
format(taskName.toString, storeName), -1L))
+ def addStoresRestorationGauge(taskName: TaskName) {
+ taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time"
format(taskName.toString), -1L))
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 0c8102b..fa17f24 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -52,7 +52,6 @@ class TaskInstance(
val offsetManager: OffsetManager = new OffsetManager,
storageManager: TaskStorageManager = null,
tableManager: TableManager = null,
- reporters: Map[String, MetricsReporter] = Map(),
val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
val exceptionHandler: TaskInstanceExceptionHandler = new
TaskInstanceExceptionHandler,
jobModel: JobModel = null,
@@ -105,12 +104,6 @@ class TaskInstance(
val streamsToDeleteCommittedMessages: Set[String] =
config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
- def registerMetrics {
- debug("Registering metrics for taskName: %s" format taskName)
-
- reporters.values.foreach(_.register(metrics.source, metrics.registry))
- }
-
def registerOffsets {
debug("Registering offsets for taskName: %s" format taskName)
offsetManager.register(taskName, systemStreamPartitions)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index ad9637d..da61a35 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -113,7 +113,8 @@ public class ContainerStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStorageManager.class);
private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush
Thread";
- private static final String SIDEINPUTS_METRICS_NAME =
"samza-container-%s-sideinputs";
+ private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
+ // We use a prefix to differentiate the SystemConsumersMetrics for
side-inputs from the ones in SamzaContainer
/** Maps containing relevant per-task objects */
private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -215,7 +216,7 @@ public class ContainerStorageManager {
this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams,
storeSystemConsumers);
// creating task restore managers
- this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+ this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock,
this.samzaContainerMetrics);
// create side input storage managers
sideInputStorageManagers = createSideInputStorageManagers(clock);
@@ -229,15 +230,15 @@ public class ContainerStorageManager {
scala.collection.immutable.Map<SystemStream, SystemStreamMetadata>
inputStreamMetadata =
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(),
false);
- SystemConsumersMetrics systemConsumersMetrics = new
SystemConsumersMetrics(
- new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME,
containerModel.getId())));
+ SystemConsumersMetrics sideInputSystemConsumersMetrics = new
SystemConsumersMetrics(samzaContainerMetrics.registry(),
SIDEINPUTS_METRICS_PREFIX);
+ // we use the same registry as samza-container-metrics
MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new
RoundRobinChooserFactory(), config,
- systemConsumersMetrics.registry(), systemAdmins);
+ sideInputSystemConsumersMetrics.registry(), systemAdmins);
sideInputSystemConsumers =
new SystemConsumers(chooser,
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
- systemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+ sideInputSystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
SystemConsumers.DEFAULT_POLL_INTERVAL_MS(),
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
}
@@ -336,11 +337,12 @@ public class ContainerStorageManager {
return storeConsumers;
}
- private Map<TaskName, TaskRestoreManager>
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
+ private Map<TaskName, TaskRestoreManager>
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock,
SamzaContainerMetrics samzaContainerMetrics) {
Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
taskRestoreManagers.put(taskName,
new TaskRestoreManager(taskModel, changelogSystemStreams,
getNonSideInputStores(taskName), systemAdmins, clock));
+ samzaContainerMetrics.addStoresRestorationGauge(taskName);
});
return taskRestoreManagers;
}
@@ -573,7 +575,6 @@ public class ContainerStorageManager {
return
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
}
-
public void start() throws SamzaException {
restoreStores();
if (sideInputsPresent()) {
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index 43d381b..afdce08 100644
---
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -19,12 +19,13 @@
package org.apache.samza.system
-import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.metrics.Counter
import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.ReadableMetricsRegistry
-class SystemConsumersMetrics(val registry: MetricsRegistry = new
MetricsRegistryMap) extends MetricsHelper {
+class SystemConsumersMetrics(val registry: ReadableMetricsRegistry = new
MetricsRegistryMap,
+ val prefix: String = "") extends MetricsHelper {
val choseNull = newCounter("chose-null")
val choseObject = newCounter("chose-object")
val deserializationError = newCounter("deserialization error")
@@ -55,6 +56,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry =
new MetricsRegistry
}
}
+ override def getPrefix: String = prefix
+
def registerSystemStreamPartition(systemStreamPartition:
SystemStreamPartition) {
systemStreamMessagesChosen += systemStreamPartition ->
newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem,
systemStreamPartition.getStream,
systemStreamPartition.getPartition.getPartitionId))
}
diff --git
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index acaecdb..48f8619 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -97,7 +97,6 @@ public class TestAsyncRunLoop {
manager,
null,
null,
- null,
sspSet,
new TaskInstanceExceptionHandler(taskInstanceMetrics, new
scala.collection.immutable.HashSet<String>()),
null,
diff --git
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index d1f60bc..e75fe54 100644
---
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -280,6 +280,7 @@ class TestSamzaContainer extends AssertionsForJUnit with
MockitoSugar {
this.samzaContainer = new SamzaContainer(
this.config,
Map(TASK_NAME -> this.taskInstance),
+ Map(TASK_NAME -> new TaskInstanceMetrics),
this.runLoop,
this.systemAdmins,
this.consumerMultiplexer,
diff --git
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 9bb485a..3ff651b 100644
---
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -63,6 +63,7 @@ object StreamProcessorTestUtils {
val container = new SamzaContainer(
config = config,
taskInstances = Map(taskName -> taskInstance),
+ taskInstanceMetrics = Map(taskName -> new TaskInstanceMetrics),
runLoop = mockRunloop,
systemAdmins = adminMultiplexer,
consumerMultiplexer = consumerMultiplexer,