This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6b0c20a SAMZA-2251: Minor diagnostics manager change to emit
additional job details
6b0c20a is described below
commit 6b0c20ae63f9f15da89f35dbb15189300cfab371
Author: Ray Matharu <[email protected]>
AuthorDate: Mon Jun 24 17:40:28 2019 -0700
SAMZA-2251: Minor diagnostics manager change to emit additional job details
Author: Ray Matharu <[email protected]>
Reviewers: Cameron Lee<[email protected]>
Closes #1083 from rmatharu/test-diagnostics-improvements
---
.../clustermanager/ContainerProcessManager.java | 8 +-
.../org/apache/samza/config/StorageConfig.java | 8 +
.../apache/samza/processor/StreamProcessor.java | 2 +-
.../apache/samza/runtime/ContainerLaunchUtil.java | 2 +-
.../org/apache/samza/util/DiagnosticsUtil.java | 35 ++-
.../samza/diagnostics/DiagnosticsManager.java | 118 ++++++---
.../diagnostics/DiagnosticsStreamMessage.java | 284 +++++++++++++++++++++
.../samza/diagnostics/ProcessorStopEvent.java | 63 +++++
.../samza/diagnostics/TestDiagnosticsManager.java | 245 ++++++++++++++++++
.../diagnostics/TestDiagnosticsStreamMessage.java | 148 +++++++++++
10 files changed, 862 insertions(+), 51 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f75e217..b2cf6b9 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -146,7 +146,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
String jobId = new JobConfig(config).getJobId();
Optional<String> execEnvContainerId =
Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
- DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId,
METRICS_SOURCE_NAME, execEnvContainerId, config);
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId,
state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId,
config);
if (diagnosticsManagerReporterPair.isPresent()) {
diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
@@ -308,11 +308,13 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
String containerId = resourceStatus.getContainerId();
String processorId = null;
+ String hostName = null;
for (Map.Entry<String, SamzaResource> entry:
state.runningProcessors.entrySet()) {
if
(entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
log.info("Container ID: {} matched running Processor ID: {} on host:
{}", containerId, entry.getKey(), entry.getValue().getHost());
processorId = entry.getKey();
+ hostName = entry.getValue().getHost();
break;
}
}
@@ -435,6 +437,10 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
}
}
+
+ if (this.diagnosticsManager.isDefined()) {
+ this.diagnosticsManager.get().addProcessorStopEvent(processorId,
resourceStatus.getContainerId(), hostName, exitStatus);
+ }
}
@Override
diff --git
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index bc38e56..32e0d8e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -205,4 +205,12 @@ public class StorageConfig extends MapConfig {
Config subConfig = subset(STORE_PREFIX, true);
return subConfig.keySet().stream().anyMatch(key ->
key.endsWith(CHANGELOG_SUFFIX));
}
+
+ /**
+ * Helper method to get the number of stores configured with a changelog.
+ */
+ public int getNumStoresWithChangelog() {
+ Config subConfig = subset(STORE_PREFIX, true);
+ return new Long(subConfig.keySet().stream().filter(key ->
key.endsWith(CHANGELOG_SUFFIX)).count()).intValue();
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index c90c8f4..75dc62d 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -341,7 +341,7 @@ public class StreamProcessor {
String jobName = new JobConfig(config).getName().get();
String jobId = new JobConfig(config).getJobId();
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
- DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, processorId,
Optional.empty(), config);
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel,
processorId, Optional.empty(), config);
Option<DiagnosticsManager> diagnosticsManager = Option.empty();
if (diagnosticsManagerReporterPair.isPresent()) {
diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index e748dd2..c84a4a1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -99,7 +99,7 @@ public class ContainerLaunchUtil {
Map<String, MetricsReporter> metricsReporters =
loadMetricsReporters(appDesc, containerId, config);
// Creating diagnostics manager and reporter, and wiring it respectively
- Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, containerId,
execEnvContainerId, config);
+ Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId,
execEnvContainerId, config);
Option<DiagnosticsManager> diagnosticsManager = Option.empty();
if (diagnosticsManagerReporterPair.isPresent()) {
diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index c290e6c..637c01b 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -24,13 +24,16 @@ import java.util.Optional;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
@@ -50,7 +53,6 @@ import scala.Option;
public class DiagnosticsUtil {
private static final Logger log =
LoggerFactory.getLogger(DiagnosticsUtil.class);
-
// Write a file in the samza.log.dir named {exec-env-container-id}.metadata
that contains
// metadata about the container such as containerId, jobName, jobId,
hostname, timestamp, version info, and others.
// The file contents are serialized using {@link JsonSerde}.
@@ -61,9 +63,9 @@ public class DiagnosticsUtil {
if (metadataFile.isDefined()) {
MetricsHeader metricsHeader =
- new MetricsHeader(jobName, jobId, "samza-container-" + containerId,
execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
- Util.getTaskClassVersion(config), Util.getSamzaVersion(),
Util.getLocalHost().getHostName(),
- System.currentTimeMillis(), System.currentTimeMillis());
+ new MetricsHeader(jobName, jobId, "samza-container-" + containerId,
execEnvContainerId.orElse(""),
+ LocalContainerRunner.class.getName(),
Util.getTaskClassVersion(config), Util.getSamzaVersion(),
+ Util.getLocalHost().getHostName(), System.currentTimeMillis(),
System.currentTimeMillis());
class MetadataFileContents {
public final String version;
@@ -76,26 +78,30 @@ public class DiagnosticsUtil {
}
MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new
Metrics());
- MetadataFileContents metadataFileContents = new
MetadataFileContents("1", new String(new
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+ MetadataFileContents metadataFileContents =
+ new MetadataFileContents("1", new String(new
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
FileUtil.writeToTextFile(metadataFile.get(), new String(new
JsonSerde<>().toBytes(metadataFileContents)), false);
} else {
log.info("Skipping writing metadata file.");
}
}
-
/**
* Create a pair of DiagnosticsManager and Reporter for the given jobName,
jobId, containerId, and execEnvContainerId,
* if diagnostics is enabled.
* execEnvContainerId is the ID assigned to the container by the cluster
manager (e.g., YARN).
*/
- public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
buildDiagnosticsManager(String jobName, String jobId,
- String containerId, Optional<String> execEnvContainerId, Config config) {
+ public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
buildDiagnosticsManager(String jobName,
+ String jobId, JobModel jobModel, String containerId, Optional<String>
execEnvContainerId, Config config) {
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair = Optional.empty();
if (new JobConfig(config).getDiagnosticsEnabled()) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
+ int containerNumCores = clusterManagerConfig.getNumCores();
+
// Diagnostic stream, producer, and reporter related parameters
String diagnosticsReporterName =
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
MetricsConfig metricsConfig = new MetricsConfig(config);
@@ -111,16 +117,21 @@ public class DiagnosticsUtil {
SystemStream diagnosticsSystemStream =
StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
- Optional<String> diagnosticsSystemFactoryName = new
SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
+ Optional<String> diagnosticsSystemFactoryName =
+ new
SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
if (!diagnosticsSystemFactoryName.isPresent()) {
throw new SamzaException("Missing factory in config for system " +
diagnosticsSystemStream.getSystem());
}
// Create a systemProducer for giving to diagnostic-reporter and
diagnosticsManager
SystemFactory systemFactory =
Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
- SystemProducer systemProducer =
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new
MetricsRegistryMap());
- DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName,
jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
- samzaVersion, hostName, diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
+ SystemProducer systemProducer =
+ systemFactory.getProducer(diagnosticsSystemStream.getSystem(),
config, new MetricsRegistryMap());
+ DiagnosticsManager diagnosticsManager =
+ new DiagnosticsManager(jobName, jobId, jobModel.getContainers(),
containerMemoryMb, containerNumCores,
+ new StorageConfig(config).getNumStoresWithChangelog(),
containerId, execEnvContainerId.orElse(""),
+ taskClassVersion, samzaVersion, hostName,
diagnosticsSystemStream, systemProducer,
+ Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
Option<String> blacklist =
ScalaJavaUtil.JavaOptionals$.MODULE$.toRichOptional(
metricsConfig.getMetricsSnapshotReporterBlacklist(diagnosticsReporterName)).toOption();
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index daaefe4..aa41940 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -19,25 +19,23 @@
package org.apache.samza.diagnostics;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.samza.metrics.reporter.Metrics;
-import org.apache.samza.metrics.reporter.MetricsHeader;
-import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,8 +53,6 @@ public class DiagnosticsManager {
// Period size for pushing data to the diagnostic stream
private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager
Thread-%d";
- private static final String METRICS_GROUP_NAME =
"org.apache.samza.container.SamzaContainerMetrics";
- // Using SamzaContainerMetrics as the group name to maintain compatibility
with existing diagnostics
// Parameters used for populating the MetricHeader when sending
diagnostic-stream messages
private final String jobName;
@@ -68,31 +64,58 @@ public class DiagnosticsManager {
private final String hostname;
private final Instant resetTime;
- private SystemProducer systemProducer; // SystemProducer for writing
diagnostics data
- private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList
for storing DiagnosticExceptionEvent
+ // Job-related params
+ private final Integer containerMemoryMb;
+ private final Integer containerNumCores;
+ private final Integer numStoresWithChangelog;
+ private final Map<String, ContainerModel> containerModels;
+ private boolean jobParamsEmitted = false;
+
+ private final SystemProducer systemProducer; // SystemProducer for writing
diagnostics data
+ private final BoundedList<DiagnosticsExceptionEvent> exceptions; // A
BoundedList for storing DiagnosticExceptionEvent
+ private final ConcurrentLinkedQueue<ProcessorStopEvent> processorStopEvents;
+ // A BoundedList for storing DiagnosticExceptionEvent
private final ScheduledExecutorService scheduler; // Scheduler for pushing
data to the diagnostic stream
private final Duration terminationDuration; // duration to wait when
terminating the scheduler
private final SystemStream diagnosticSystemStream;
- public DiagnosticsManager(String jobName, String jobId, String containerId,
String executionEnvContainerId,
- String taskClassVersion, String samzaVersion, String hostname,
SystemStream diagnosticSystemStream,
- SystemProducer systemProducer, Duration terminationDuration) {
+ public DiagnosticsManager(String jobName, String jobId, Map<String,
ContainerModel> containerModels,
+ Integer containerMemoryMb, Integer containerNumCores, Integer
numStoresWithChangelog, String containerId,
+ String executionEnvContainerId, String taskClassVersion, String
samzaVersion, String hostname,
+ SystemStream diagnosticSystemStream, SystemProducer systemProducer,
Duration terminationDuration) {
+
+ this(jobName, jobId, containerModels, containerMemoryMb,
containerNumCores, numStoresWithChangelog, containerId,
+ executionEnvContainerId, taskClassVersion, samzaVersion, hostname,
diagnosticSystemStream, systemProducer,
+ terminationDuration, Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
+ }
+
+ @VisibleForTesting
+ DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel>
containerModels,
+ int containerMemoryMb, int containerNumCores, int
numStoresWithChangelog, String containerId,
+ String executionEnvContainerId, String taskClassVersion, String
samzaVersion, String hostname,
+ SystemStream diagnosticSystemStream, SystemProducer systemProducer,
Duration terminationDuration,
+ ScheduledExecutorService executorService) {
this.jobName = jobName;
this.jobId = jobId;
+ this.containerModels = containerModels;
+ this.containerMemoryMb = containerMemoryMb;
+ this.containerNumCores = containerNumCores;
+ this.numStoresWithChangelog = numStoresWithChangelog;
this.containerId = containerId;
this.executionEnvContainerId = executionEnvContainerId;
this.taskClassVersion = taskClassVersion;
this.samzaVersion = samzaVersion;
this.hostname = hostname;
- resetTime = Instant.now();
-
- this.systemProducer = systemProducer;
this.diagnosticSystemStream = diagnosticSystemStream;
+ this.systemProducer = systemProducer;
+ this.terminationDuration = terminationDuration;
+ this.processorStopEvents = new ConcurrentLinkedQueue<>();
this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList
with default size and time parameters
- this.scheduler = Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build());
- this.terminationDuration = terminationDuration;
+ this.scheduler = executorService;
+
+ resetTime = Instant.now();
try {
@@ -141,36 +164,59 @@ public class DiagnosticsManager {
this.exceptions.add(diagnosticsExceptionEvent);
}
+ public void addProcessorStopEvent(String processorId, String resourceId,
String host, int exitStatus) {
+ this.processorStopEvents.add(new ProcessorStopEvent(processorId,
resourceId, host, exitStatus));
+ LOG.info("Added stop event for Container Id: {}, resource Id: {}, host:
{}, exitStatus: {}", processorId,
+ resourceId, host, exitStatus);
+ }
+
private class DiagnosticsStreamPublisher implements Runnable {
@Override
public void run() {
- // Publish exception events if there are any
- Collection<DiagnosticsExceptionEvent> exceptionList =
exceptions.getValues();
+ try {
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+ new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" +
containerId, executionEnvContainerId,
+ taskClassVersion, samzaVersion, hostname,
System.currentTimeMillis(), resetTime.toEpochMilli());
+
+ // Add job-related params to the message (if not already published)
+ if (!jobParamsEmitted) {
+ diagnosticsStreamMessage.addContainerMb(containerMemoryMb);
+ diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
+
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
+ diagnosticsStreamMessage.addContainerModels(containerModels);
+ }
- if (!exceptionList.isEmpty()) {
+ // Add stop event list to the message
+ diagnosticsStreamMessage.addProcessorStopEvents(new
ArrayList(processorStopEvents));
- // Create the metricHeader
- MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId,
"samza-container-" + containerId, executionEnvContainerId,
- DiagnosticsUtil.class.getName(), taskClassVersion, samzaVersion,
hostname, System.currentTimeMillis(),
- resetTime.toEpochMilli());
+ // Add exception events to the message
+
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptions.getValues());
- Map<String, Map<String, Object>> metricsMessage = new HashMap<>();
- metricsMessage.putIfAbsent(METRICS_GROUP_NAME, new HashMap<>());
- metricsMessage.get(METRICS_GROUP_NAME).put(exceptions.getName(),
exceptionList);
- MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader,
new Metrics(metricsMessage));
+ if (!diagnosticsStreamMessage.isEmpty()) {
- try {
systemProducer.send(DiagnosticsManager.class.getName(),
- new OutgoingMessageEnvelope(diagnosticSystemStream,
metricsHeader.getHost(), null,
- new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+ new OutgoingMessageEnvelope(diagnosticSystemStream, hostname,
null,
+ new
MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot())));
+ systemProducer.flush(DiagnosticsManager.class.getName());
+
+ // Remove stop events from list after successful publish
+ if (diagnosticsStreamMessage.getProcessorStopEvents() != null) {
+
processorStopEvents.removeAll(diagnosticsStreamMessage.getProcessorStopEvents());
+ }
// Remove exceptions from list after successful publish to
diagnostics stream
- exceptions.remove(exceptionList);
- } catch (Exception e) {
- LOG.error("Exception when flushing exceptions", e);
+ if (diagnosticsStreamMessage.getExceptionEvents() != null) {
+ exceptions.remove(diagnosticsStreamMessage.getExceptionEvents());
+ }
+
+ // Emit jobParams once
+ jobParamsEmitted = true;
}
+ } catch (Exception e) {
+ LOG.error("Exception when flushing diagnosticsStreamMessage", e);
}
}
}
+
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
new file mode 100644
index 0000000..6840912
--- /dev/null
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Defines the contents for any message emitted to the diagnostic stream by
the {@link DiagnosticsManager}.
+ * All contents are stored in a {@link MetricsHeader} and a metricsMessage map
which combine to get a {@link MetricsSnapshot},
+ * which can be serialized using serdes ({@link
org.apache.samza.serializers.MetricsSnapshotSerdeV2}).
+ * This class serializes {@link ContainerModel} using {@link
SamzaObjectMapper} before adding to the metrics message.
+ *
+ */
+public class DiagnosticsStreamMessage {
+ private static final Logger LOG =
LoggerFactory.getLogger(DiagnosticsStreamMessage.class);
+
+ public static final String GROUP_NAME_FOR_DIAGNOSTICS_MANAGER =
DiagnosticsManager.class.getName();
+ // Using DiagnosticsManager as the group name for processor-stop-events,
job-related params, and container model
+
+ private static final String SAMZACONTAINER_METRICS_GROUP_NAME =
"org.apache.samza.container.SamzaContainerMetrics";
+ // Using SamzaContainerMetrics as the group name for exceptions to maintain
compatibility with existing diagnostics
+ private static final String EXCEPTION_LIST_METRIC_NAME = "exceptions";
+
+ private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
+ private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
+ private static final String CONTAINER_NUM_CORES_METRIC_NAME =
"containerNumCores";
+ public static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME =
"numStoresWithChangelog";
+ private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
+
+ private final MetricsHeader metricsHeader;
+ private final Map<String, Map<String, Object>> metricsMessage;
+
+ public DiagnosticsStreamMessage(String jobName, String jobId, String
containerName, String executionEnvContainerId,
+ String taskClassVersion, String samzaVersion, String hostname, long
timestamp, long resetTimestamp) {
+
+ // Create the metricHeader
+ metricsHeader =
+ new MetricsHeader(jobName, jobId, containerName,
executionEnvContainerId, DiagnosticsManager.class.getName(),
+ taskClassVersion, samzaVersion, hostname, timestamp,
resetTimestamp);
+
+ this.metricsMessage = new HashMap<>();
+ }
+
+ /**
+ * Add the container memory mb parameter to the message.
+ * @param containerMemoryMb the memory mb parameter value.
+ */
+ public void addContainerMb(Integer containerMemoryMb) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MB_METRIC_NAME, containerMemoryMb);
+ }
+
+ /**
+ * Add the container num cores parameter to the message.
+ * @param containerNumCores the num core parameter value.
+ */
+ public void addContainerNumCores(Integer containerNumCores) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_CORES_METRIC_NAME, containerNumCores);
+ }
+
+ /**
+ * Add the num stores with changelog parameter to the message.
+ * @param numStoresWithChangelog the parameter value.
+ */
+ public void addNumStoresWithChangelog(Integer numStoresWithChangelog) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME,
+ numStoresWithChangelog);
+ }
+
+ /**
+ * Add a map of container models (indexed by containerID) to the message.
+ * @param containerModelMap the container models map
+ */
+ public void addContainerModels(Map<String, ContainerModel>
containerModelMap) {
+ if (containerModelMap != null && !containerModelMap.isEmpty()) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MODELS_METRIC_NAME,
+ serializeContainerModelMap(containerModelMap));
+ }
+ }
+
+ /**
+ * Add a list of {@link DiagnosticsExceptionEvent}s to the message.
+ * @param exceptionList the list to add.
+ */
+ public void
addDiagnosticsExceptionEvents(Collection<DiagnosticsExceptionEvent>
exceptionList) {
+ if (exceptionList != null && !exceptionList.isEmpty()) {
+ addToMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME,
EXCEPTION_LIST_METRIC_NAME, exceptionList);
+ }
+ }
+
+ /**
+ * Add a list of {@link org.apache.samza.diagnostics.ProcessorStopEvent}s to
add to the list.
+ * @param stopEventList the list to add.
+ */
+ public void addProcessorStopEvents(List<ProcessorStopEvent> stopEventList) {
+ if (stopEventList != null && !stopEventList.isEmpty()) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
STOP_EVENT_LIST_METRIC_NAME, stopEventList);
+ }
+ }
+
+ /**
+ * Convert this message into a {@link MetricsSnapshot}, useful for
serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}.
+ * @return
+ */
+ public MetricsSnapshot convertToMetricsSnapshot() {
+ MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new
Metrics(metricsMessage));
+ return metricsSnapshot;
+ }
+
+ /**
+ * Check if the message has no contents.
+ * @return True if the message is empty, false otherwise.
+ */
+ public boolean isEmpty() {
+ return metricsMessage.isEmpty();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DiagnosticsStreamMessage that = (DiagnosticsStreamMessage) o;
+ return metricsHeader.getAsMap().equals(that.metricsHeader.getAsMap()) &&
metricsMessage.equals(that.metricsMessage);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metricsHeader, metricsMessage);
+ }
+
+ public Collection<ProcessorStopEvent> getProcessorStopEvents() {
+ return (Collection<ProcessorStopEvent>)
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
+ STOP_EVENT_LIST_METRIC_NAME);
+ }
+
+ public Collection<DiagnosticsExceptionEvent> getExceptionEvents() {
+ return (Collection<DiagnosticsExceptionEvent>)
getFromMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME,
+ EXCEPTION_LIST_METRIC_NAME);
+ }
+
+ public Integer getContainerMb() {
+ return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MB_METRIC_NAME);
+ }
+
+ public Integer getContainerNumCores() {
+ return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_NUM_CORES_METRIC_NAME);
+ }
+
+ public Integer getNumStoresWithChangelog() {
+ return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
+ CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
+ }
+
+ public Map<String, ContainerModel> getContainerModels() {
+ return deserializeContainerModelMap((String)
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MODELS_METRIC_NAME));
+ }
+
+ // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link
MetricsSnapshot}.
+ // * This is typically used when deserializing messages from a
diagnostics-stream.
+ // * @param metricsSnapshot
+ public static DiagnosticsStreamMessage
convertToDiagnosticsStreamMessage(MetricsSnapshot metricsSnapshot) {
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+ new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(),
metricsSnapshot.getHeader().getJobId(),
+ metricsSnapshot.getHeader().getContainerName(),
metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
+ metricsSnapshot.getHeader().getVersion(),
metricsSnapshot.getHeader().getSamzaVersion(),
+ metricsSnapshot.getHeader().getHost(),
metricsSnapshot.getHeader().getTime(),
+ metricsSnapshot.getHeader().getResetTime());
+
+ Map<String, Map<String, Object>> metricsMap =
metricsSnapshot.getMetrics().getAsMap();
+ Map<String, Object> diagnosticsManagerGroupMap =
metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER);
+ Map<String, Object> containerMetricsGroupMap =
metricsMap.get(SAMZACONTAINER_METRICS_GROUP_NAME);
+
+ if (diagnosticsManagerGroupMap != null) {
+
+ diagnosticsStreamMessage.addContainerNumCores((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME));
+ diagnosticsStreamMessage.addContainerMb((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
+ diagnosticsStreamMessage.addNumStoresWithChangelog((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
+
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String)
diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
+
+
diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>)
diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
+ }
+
+ if (containerMetricsGroupMap != null &&
containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) {
+ diagnosticsStreamMessage.addDiagnosticsExceptionEvents(
+ (Collection<DiagnosticsExceptionEvent>)
containerMetricsGroupMap.get(EXCEPTION_LIST_METRIC_NAME));
+ }
+
+ return diagnosticsStreamMessage;
+ }
+
+ /**
+ * Helper method to use {@link SamzaObjectMapper} to serialize {@link
ContainerModel}s.
+ * We use SamzaObjectMapper for ContainerModels, rather than using
ObjectMapper (in MetricsSnapshotSerdeV2)
+ * because MetricsSnapshotSerdeV2 enables default typing, which writes type
information for all containerModel (and
+ * underlying) classes, deserializing which requires a large number of
jackson related changes to those classes
+ * (annotations and/or mixins). We cannot disable default typing to avoid
backward incompatibility. This is why
+ * we serde-deserde ContainerModel explicitly using SamzaObjectMapper (which
is also used for reads-writes to coordinator
+ * stream).
+ * {@link SamzaObjectMapper} provides several conventions and optimizations
for serializing containerModels.
+ * @param containerModelMap map of container models to serialize.
+ * @return
+ */
+ private static String serializeContainerModelMap(Map<String, ContainerModel>
containerModelMap) {
+ ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper();
+ try {
+ return samzaObjectMapper.writeValueAsString(containerModelMap);
+ } catch (IOException e) {
+ LOG.error("Exception in serializing container model ", e);
+ }
+
+ return null;
+ }
+
+ /**
+ * Helper method to use {@link SamzaObjectMapper} to deserialize {@link
ContainerModel}s.
+ * {@link SamzaObjectMapper} provides several conventions and optimizations
for deserializing containerModels.
+ * @return
+ */
+ private static Map<String, ContainerModel> deserializeContainerModelMap(
+ String serializedContainerModel) {
+ Map<String, ContainerModel> containerModelMap = null;
+ ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper();
+
+ try {
+ if (serializedContainerModel != null) {
+ containerModelMap =
samzaObjectMapper.readValue(serializedContainerModel, new
TypeReference<Map<String, ContainerModel>>() {
+ });
+ }
+ } catch (IOException e) {
+ LOG.error("Exception in deserializing container model ", e);
+ }
+
+ return containerModelMap;
+ }
+
+ private void addToMetricsMessage(String groupName, String metricName, Object
value) {
+ if (value != null) {
+ metricsMessage.putIfAbsent(groupName, new HashMap<>());
+ metricsMessage.get(groupName).put(metricName, value);
+ }
+ }
+
+ private Object getFromMetricsMessage(String groupName, String metricName) {
+ if (metricsMessage.containsKey(groupName) && metricsMessage.get(groupName)
!= null) {
+ return metricsMessage.get(groupName).get(metricName);
+ } else {
+ return null;
+ }
+ }
+}
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java
new file mode 100644
index 0000000..786fb78
--- /dev/null
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.util.Objects;
+
+
+/**
+ * Encapsulates information (emitted to diagnostics stream) about the stopping
of a processor.
+ * Information emitted includes, processorId, resourceId, exit status and host.
+ */
+public class ProcessorStopEvent {
+ public final String processorId;
+ public final String resourceId;
+ public final String host;
+ public final int exitStatus;
+
+ // Default constructor, required for deserialization with jackson
+ private ProcessorStopEvent() {
+ this("", "", "", -1);
+ }
+
+ public ProcessorStopEvent(String processorId, String resourceId, String
host, int exitStatus) {
+ this.processorId = processorId;
+ this.resourceId = resourceId;
+ this.host = host;
+ this.exitStatus = exitStatus;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ProcessorStopEvent that = (ProcessorStopEvent) o;
+ return exitStatus == that.exitStatus && Objects.equals(processorId,
that.processorId) && Objects.equals(
+ resourceId, that.resourceId) && Objects.equals(host, that.host);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(processorId, resourceId, host, exitStatus);
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
new file mode 100644
index 0000000..c69b278
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestDiagnosticsManager {
+ private DiagnosticsManager diagnosticsManager;
+ private MockSystemProducer mockSystemProducer;
+ private SystemStream diagnosticsSystemStream = new SystemStream("kafka",
"test stream");
+
+ private String jobName = "Testjob";
+ private String jobId = "test job id";
+ private String executionEnvContainerId = "exec container id";
+ private String taskClassVersion = "0.0.1";
+ private String samzaVersion = "1.3.0";
+ private String hostname = "sample host name";
+ private int containerMb = 1024;
+ private int numStoresWithChangelog = 2;
+ private int containerNumCores = 2;
+ private Map<String, ContainerModel> containerModels =
TestDiagnosticsStreamMessage.getSampleContainerModels();
+ private Collection<DiagnosticsExceptionEvent> exceptionEventList =
TestDiagnosticsStreamMessage.getExceptionList();
+
+ @Before
+ public void setup() {
+
+ // Mocked system producer for publishing to diagnostics stream
+ mockSystemProducer = new MockSystemProducer();
+
+ // Mocked scheduled executor service which does a synchronous run() on
scheduling
+ ScheduledExecutorService mockExecutorService =
Mockito.mock(ScheduledExecutorService.class);
+ Mockito.when(mockExecutorService.scheduleWithFixedDelay(Mockito.any(),
Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.eq(TimeUnit.SECONDS))).thenAnswer(invocation -> {
+ ((Runnable) invocation.getArguments()[0]).run();
+ return Mockito.mock(ScheduledFuture.class);
+ });
+
+ this.diagnosticsManager =
+ new DiagnosticsManager(jobName, jobId, containerModels, containerMb,
containerNumCores, numStoresWithChangelog,
+ "0", executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticsSystemStream,
+ mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);
+
+ exceptionEventList.forEach(
+ diagnosticsExceptionEvent ->
this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
+
+ this.diagnosticsManager.addProcessorStopEvent("0",
executionEnvContainerId, hostname, 101);
+ }
+
+ @Test
+ public void testDiagnosticsStreamFirstMessagePublish() {
+ // invoking start will do a syncrhonous publish to the stream because of
our mocked scheduled exec service
+ this.diagnosticsManager.start();
+ Assert.assertEquals("One message should have been published", 1,
mockSystemProducer.getEnvelopeList().size());
+ OutgoingMessageEnvelope outgoingMessageEnvelope =
mockSystemProducer.getEnvelopeList().get(0);
+ validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+ }
+
+ @Test
+ public void testNoDualPublish() {
+ // Across two successive run() invocations only a single message should be
published
+ this.diagnosticsManager.start();
+ this.diagnosticsManager.start();
+
+ Assert.assertEquals("One message should have been published", 1,
mockSystemProducer.getEnvelopeList().size());
+ OutgoingMessageEnvelope outgoingMessageEnvelope =
mockSystemProducer.getEnvelopeList().get(0);
+ validateMetricsHeader(outgoingMessageEnvelope);
+ validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+ }
+
+ @Test
+ public void testSecondPublishWithProcessorStopInSecondMessage() {
+ // Across two successive run() invocations two messages should be
published if stop events are added
+ this.diagnosticsManager.start();
+ this.diagnosticsManager.addProcessorStopEvent("0",
executionEnvContainerId, hostname, 102);
+ this.diagnosticsManager.start();
+
+ Assert.assertEquals("Two messages should have been published", 2,
mockSystemProducer.getEnvelopeList().size());
+
+ // Validate the first message
+ OutgoingMessageEnvelope outgoingMessageEnvelope =
mockSystemProducer.getEnvelopeList().get(0);
+ validateMetricsHeader(outgoingMessageEnvelope);
+ validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+
+ // Validate the second message's header
+ outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
+ validateMetricsHeader(outgoingMessageEnvelope);
+
+ // Validate the second message's body (should be all empty except for the
processor-stop-event)
+ MetricsSnapshot metricsSnapshot =
+ new MetricsSnapshotSerdeV2().fromBytes((byte[])
outgoingMessageEnvelope.getMessage());
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+ Assert.assertNull(diagnosticsStreamMessage.getContainerMb());
+ Assert.assertNull(diagnosticsStreamMessage.getExceptionEvents());
+ Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
+ Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId,
hostname, 102)));
+ Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
+ Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
+ Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+ }
+
+ @Test
+ public void testSecondPublishWithExceptionInSecondMessage() {
+ // Across two successive run() invocations two messages should be
published if stop events are added
+ this.diagnosticsManager.start();
+ DiagnosticsExceptionEvent diagnosticsExceptionEvent = new
DiagnosticsExceptionEvent(System.currentTimeMillis(), new
RuntimeException("exception"), new HashMap());
+ this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent);
+ this.diagnosticsManager.start();
+
+ Assert.assertEquals("Two messages should have been published", 2,
mockSystemProducer.getEnvelopeList().size());
+
+ // Validate the first message
+ OutgoingMessageEnvelope outgoingMessageEnvelope =
mockSystemProducer.getEnvelopeList().get(0);
+ validateMetricsHeader(outgoingMessageEnvelope);
+ validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+
+ // Validate the second message's header
+ outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
+ validateMetricsHeader(outgoingMessageEnvelope);
+
+ // Validate the second message's body (should be all empty except for the
processor-stop-event)
+ MetricsSnapshot metricsSnapshot =
+ new MetricsSnapshotSerdeV2().fromBytes((byte[])
outgoingMessageEnvelope.getMessage());
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+ Assert.assertNull(diagnosticsStreamMessage.getContainerMb());
+ Assert.assertEquals(Arrays.asList(diagnosticsExceptionEvent),
diagnosticsStreamMessage.getExceptionEvents());
+ Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents());
+ Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
+ Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
+ Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ this.diagnosticsManager.stop();
+ }
+
+ private void validateMetricsHeader(OutgoingMessageEnvelope
outgoingMessageEnvelope) {
+ // Validate the outgoing message
+
+
Assert.assertTrue(outgoingMessageEnvelope.getSystemStream().equals(diagnosticsSystemStream));
+ MetricsSnapshot metricsSnapshot =
+ new MetricsSnapshotSerdeV2().fromBytes((byte[])
outgoingMessageEnvelope.getMessage());
+
+ // Validate all header fields
+ Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
+ Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
+
Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
executionEnvContainerId);
+ Assert.assertEquals(metricsSnapshot.getHeader().getVersion(),
taskClassVersion);
+ Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(),
samzaVersion);
+ Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
+ Assert.assertEquals(metricsSnapshot.getHeader().getSource(),
DiagnosticsManager.class.getName());
+
+ }
+
+ private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope
outgoingMessageEnvelope) {
+ MetricsSnapshot metricsSnapshot =
+ new MetricsSnapshotSerdeV2().fromBytes((byte[])
outgoingMessageEnvelope.getMessage());
+
+ // Validate the diagnostics stream message
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+ Assert.assertEquals(containerMb,
diagnosticsStreamMessage.getContainerMb().intValue());
+ Assert.assertEquals(exceptionEventList,
diagnosticsStreamMessage.getExceptionEvents());
+ Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname,
101)));
+ Assert.assertEquals(containerModels,
diagnosticsStreamMessage.getContainerModels());
+ Assert.assertEquals(containerNumCores,
diagnosticsStreamMessage.getContainerNumCores().intValue());
+ Assert.assertEquals(numStoresWithChangelog,
diagnosticsStreamMessage.getNumStoresWithChangelog().intValue());
+ }
+
+ private class MockSystemProducer implements SystemProducer {
+
+ private final List<OutgoingMessageEnvelope> envelopeList = new
ArrayList<>();
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void register(String source) {
+
+ }
+
+ @Override
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ envelopeList.add(envelope);
+ }
+
+ @Override
+ public void flush(String source) {
+
+ }
+
+ public List<OutgoingMessageEnvelope> getEnvelopeList() {
+ return this.envelopeList;
+ }
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
new file mode 100644
index 0000000..81bc577
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestDiagnosticsStreamMessage {
+
+ private final String jobName = "Testjob";
+ private final String jobId = "test job id";
+ private final String containerName = "sample container name";
+ private final String executionEnvContainerId = "exec container id";
+ private final String taskClassVersion = "0.0.1";
+ private final String samzaVersion = "1.3.0";
+ private final String hostname = "sample host name";
+ private final long timestamp = System.currentTimeMillis();
+ private final long resetTimestamp = System.currentTimeMillis();
+
+ private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
+ new DiagnosticsStreamMessage(jobName, jobId, containerName,
executionEnvContainerId, taskClassVersion,
+ samzaVersion, hostname, timestamp, resetTimestamp);
+
+ diagnosticsStreamMessage.addContainerMb(1024);
+ diagnosticsStreamMessage.addContainerNumCores(2);
+ diagnosticsStreamMessage.addNumStoresWithChangelog(3);
+
+
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
+ return diagnosticsStreamMessage;
+ }
+
+ public static Collection<DiagnosticsExceptionEvent> getExceptionList() {
+ BoundedList boundedList = new
BoundedList<DiagnosticsExceptionEvent>("exceptions");
+ DiagnosticsExceptionEvent diagnosticsExceptionEvent =
+ new DiagnosticsExceptionEvent(1, new Exception("this is a samza
exception", new Exception("cause")),
+ new HashMap());
+
+ boundedList.add(diagnosticsExceptionEvent);
+ return boundedList.getValues();
+ }
+
+ public List<ProcessorStopEvent> getProcessorStopEventList() {
+ List<ProcessorStopEvent> stopEventList = new ArrayList<>();
+ stopEventList.add(new ProcessorStopEvent("0", executionEnvContainerId,
hostname, 101));
+ return stopEventList;
+ }
+
+ public static Map<String, ContainerModel> getSampleContainerModels() {
+ Map<String, ContainerModel> containerModels = new HashMap<>();
+ Map<TaskName, TaskModel> tasks = new HashMap<>();
+
+ Set<SystemStreamPartition> sspsForTask1 = new HashSet<>();
+ sspsForTask1.add(new SystemStreamPartition("kafka", "test-stream", new
Partition(0)));
+ tasks.put(new TaskName("Partition 0"), new TaskModel(new
TaskName("Partition 0"), sspsForTask1, new Partition(0)));
+
+ Set<SystemStreamPartition> sspsForTask2 = new HashSet<>();
+ sspsForTask2.add(new SystemStreamPartition("kafka", "test-stream", new
Partition(1)));
+ tasks.put(new TaskName("Partition 1"), new TaskModel(new
TaskName("Partition 1"), sspsForTask2, new Partition(1)));
+
+ containerModels.put("0", new ContainerModel("0", tasks));
+ return containerModels;
+ }
+
+ /**
+ * Tests basic operations on {@link DiagnosticsStreamMessage}.
+ */
+ @Test
+ public void basicTest() {
+
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
getDiagnosticsStreamMessage();
+ Collection<DiagnosticsExceptionEvent> exceptionEventList =
getExceptionList();
+ diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
+
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
+ diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
+
+ Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
+ Assert.assertEquals(2, (int)
diagnosticsStreamMessage.getContainerNumCores());
+ Assert.assertEquals(3, (int)
diagnosticsStreamMessage.getNumStoresWithChangelog());
+ Assert.assertEquals(exceptionEventList,
diagnosticsStreamMessage.getExceptionEvents());
+ Assert.assertEquals(getSampleContainerModels(),
diagnosticsStreamMessage.getContainerModels());
+ Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
getProcessorStopEventList());
+ }
+
+ /**
+ * Tests serialization and deserialization of a {@link
DiagnosticsStreamMessage}
+ */
+ @Test
+ public void serdeTest() {
+ DiagnosticsStreamMessage diagnosticsStreamMessage =
getDiagnosticsStreamMessage();
+ Collection<DiagnosticsExceptionEvent> exceptionEventList =
getExceptionList();
+ diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
+
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
+ diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
+
+ MetricsSnapshot metricsSnapshot =
diagnosticsStreamMessage.convertToMetricsSnapshot();
+ Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
+ Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
+
Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
executionEnvContainerId);
+ Assert.assertEquals(metricsSnapshot.getHeader().getVersion(),
taskClassVersion);
+ Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(),
samzaVersion);
+ Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
+ Assert.assertEquals(metricsSnapshot.getHeader().getSource(),
DiagnosticsManager.class.getName());
+
+ Map<String, Map<String, Object>> metricsMap =
metricsSnapshot.getMetrics().getAsMap();
+
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
+
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels"));
+
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog"));
+
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
+
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
+
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
+
+ DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
+
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+
Assert.assertTrue(convertedDiagnosticsStreamMessage.equals(diagnosticsStreamMessage));
+ }
+}