rmatharu commented on a change in pull request #1083: SAMZA-2251: Minor 
diagnostics manager change to emit additional job details
URL: https://github.com/apache/samza/pull/1083#discussion_r296427257
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
 ##########
 @@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Defines the contents for any message emitted to the diagnostic stream by 
the {@link DiagnosticsManager}.
+ * All contents are stored in a {@link MetricsHeader} and a metricsMessage map 
which combine to get a {@link MetricsSnapshot},
+ * which can be serialized using serdes ({@link 
org.apache.samza.serializers.MetricsSnapshotSerdeV2}).
+ * This class serializes {@link ContainerModel} using {@link 
SamzaObjectMapper} before adding to the metrics message.
+ *
+ */
+public class DiagnosticsStreamMessage {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DiagnosticsStreamMessage.class);
+
+  public static final String GROUP_NAME_FOR_DIAGNOSTICS_MANAGER = 
DiagnosticsManager.class.getName();
+  // Using DiagnosticsManager as the group name for processor-stop-events, 
job-related params, and container model
+
+  private static final String SAMZACONTAINER_METRICS_GROUP_NAME = 
"org.apache.samza.container.SamzaContainerMetrics";
+  // Using SamzaContainerMetrics as the group name for exceptions to maintain 
compatibility with existing diagnostics
+  private static final String EXCEPTION_LIST_METRIC_NAME = "exceptions";
+
+  private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
+  private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
+  private static final String CONTAINER_NUM_CORES_METRIC_NAME = 
"containerNumCores";
+  public static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME = 
"numStoresWithChangelog";
+  private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
+
+  private final MetricsHeader metricsHeader;
+  private final Map<String, Map<String, Object>> metricsMessage;
+
+  public DiagnosticsStreamMessage(String jobName, String jobId, String 
containerName, String executionEnvContainerId,
+      String taskClassVersion, String samzaVersion, String hostname, long 
timestamp, long resetTimestamp) {
+
+    // Create the metricHeader
+    metricsHeader =
+        new MetricsHeader(jobName, jobId, containerName, 
executionEnvContainerId, DiagnosticsManager.class.getName(),
+            taskClassVersion, samzaVersion, hostname, timestamp, 
resetTimestamp);
+
+    this.metricsMessage = new HashMap<>();
+  }
+
+  /**
+   * Add the container memory mb parameter to the message.
+   * @param containerMemoryMb the memory mb parameter value.
+   */
+  public void addContainerMb(int containerMemoryMb) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MB_METRIC_NAME, containerMemoryMb);
+  }
+
+  /**
+   * Add the container num cores parameter to the message.
+   * @param containerNumCores the num core parameter value.
+   */
+  public void addContainerNumCores(int containerNumCores) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_CORES_METRIC_NAME, containerNumCores);
+  }
+
+  /**
+   * Add the num stores with changelog parameter to the message.
+   * @param numStoresWithChangelog the parameter value.
+   */
+  public void addNumStoresWithChangelog(int numStoresWithChangelog) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME,
+        numStoresWithChangelog);
+  }
+
+  /**
+   * Add a map of container models (indexed by containerID) to the message.
+   * @param containerModelMap the container models map
+   */
+  public void addContainerModels(Map<String, ContainerModel> 
containerModelMap) {
+    if (containerModelMap != null && !containerModelMap.isEmpty()) {
+      addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MODELS_METRIC_NAME,
+          serializeContainerModelMap(containerModelMap));
+    }
+  }
+
+  /**
+   * Add a list of {@link DiagnosticsExceptionEvent}s to the message.
+   * @param exceptionList the list to add.
+   */
+  public void 
addDiagnosticsExceptionEvents(Collection<DiagnosticsExceptionEvent> 
exceptionList) {
+    if (exceptionList != null && !exceptionList.isEmpty()) {
+      addToMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME, 
EXCEPTION_LIST_METRIC_NAME, exceptionList);
+    }
+  }
+
+  /**
+   * Add a list of {@link 
org.apache.samza.diagnostics.DiagnosticsManager.ProcessorStopEvent}s to add to 
the list.
+   * @param stopEventList the list to add.
+   */
+  public void 
addProcessorStopEvents(List<DiagnosticsManager.ProcessorStopEvent> 
stopEventList) {
+    if (stopEventList != null && !stopEventList.isEmpty()) {
+      addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
STOP_EVENT_LIST_METRIC_NAME, stopEventList);
+    }
+  }
+
+  /**
+   * Convert this message into a {@link MetricsSnapshot}, useful for 
serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}.
+   * @return
+   */
+  public MetricsSnapshot convertToMetricsSnapshot() {
+    MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new 
Metrics(metricsMessage));
+    return metricsSnapshot;
+  }
+
+  /**
+   * Check if the message has no contents.
+   * @return True if the message is empty, false otherwise.
+   */
+  public boolean isEmpty() {
+    return metricsMessage.isEmpty();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DiagnosticsStreamMessage that = (DiagnosticsStreamMessage) o;
+    return metricsHeader.getAsMap().equals(that.metricsHeader.getAsMap()) && 
metricsMessage.equals(that.metricsMessage);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(metricsHeader, metricsMessage);
+  }
+
+  public Collection<DiagnosticsManager.ProcessorStopEvent> 
getProcessorStopEvents() {
+    return (Collection<DiagnosticsManager.ProcessorStopEvent>) 
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
+        STOP_EVENT_LIST_METRIC_NAME);
+  }
+
+  public Collection<DiagnosticsExceptionEvent> getExceptionEvents() {
+    return (Collection<DiagnosticsExceptionEvent>) 
getFromMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME,
+        EXCEPTION_LIST_METRIC_NAME);
+  }
+
+  public int getContainerMb() {
+    return (int) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MB_METRIC_NAME);
+  }
+
+  public int getContainerNumCores() {
+    return (int) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_CORES_METRIC_NAME);
+  }
+
+  public int getNumStoresWithChangelog() {
+    return (int) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
+        CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
+  }
+
+  public Map<String, ContainerModel> getContainerModels() {
+    return deserializeContainerModelMap(
+        (Map<String, Object>) 
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MODELS_METRIC_NAME));
+  }
+
+  // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link 
MetricsSnapshot}.
+  //   * This is typically used when deserializing messages from a 
diagnostics-stream.
+  //   * @param metricsSnapshot
+  public static DiagnosticsStreamMessage 
convertToDiagnosticsStreamMessage(MetricsSnapshot metricsSnapshot) {
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(), 
metricsSnapshot.getHeader().getJobId(),
+            metricsSnapshot.getHeader().getContainerName(), 
metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
+            metricsSnapshot.getHeader().getVersion(), 
metricsSnapshot.getHeader().getSamzaVersion(),
+            metricsSnapshot.getHeader().getHost(), 
metricsSnapshot.getHeader().getTime(),
+            metricsSnapshot.getHeader().getResetTime());
+
+    if 
(metricsSnapshot.getMetrics().getAsMap().containsKey(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER))
 {
+      diagnosticsStreamMessage.addContainerNumCores((int) 
metricsSnapshot.getMetrics()
+          .getAsMap()
+          .get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER)
+          .get(CONTAINER_NUM_CORES_METRIC_NAME));
+
+      diagnosticsStreamMessage.addContainerMb((int) 
metricsSnapshot.getMetrics()
+          .getAsMap()
+          .get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER)
+          .get(CONTAINER_MB_METRIC_NAME));
+
+      diagnosticsStreamMessage.addNumStoresWithChangelog((int) 
metricsSnapshot.getMetrics()
+          .getAsMap()
+          .get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER)
+          .get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
+
+      diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap(
+          (Map<String, Object>) metricsSnapshot.getMetrics()
+              .getAsMap()
+              .get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER)
+              .get(CONTAINER_MODELS_METRIC_NAME)));
+
+      diagnosticsStreamMessage.addProcessorStopEvents(
+          (List<DiagnosticsManager.ProcessorStopEvent>) 
metricsSnapshot.getMetrics()
+              .getAsMap()
+              .get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER)
+              .get(STOP_EVENT_LIST_METRIC_NAME));
+    }
+
+    if 
(metricsSnapshot.getMetrics().getAsMap().containsKey(SAMZACONTAINER_METRICS_GROUP_NAME))
 {
+      diagnosticsStreamMessage.addDiagnosticsExceptionEvents(
+          (Collection<DiagnosticsExceptionEvent>) metricsSnapshot.getMetrics()
+              .getAsMap()
+              .get(SAMZACONTAINER_METRICS_GROUP_NAME)
+              .get(EXCEPTION_LIST_METRIC_NAME));
+    }
+
+    return diagnosticsStreamMessage;
+  }
+
+  /**
+   * Helper method to use {@link SamzaObjectMapper} to serialize {@link 
ContainerModel}s.
+   * {@link SamzaObjectMapper} provides several conventions and optimizations 
for serializing containerModels.
+   * @param containerModelMap map of container models to serialize.
+   * @return
+   */
+  private static Map<String, String> serializeContainerModelMap(Map<String, 
ContainerModel> containerModelMap) {
+    Map<String, String> serializedContainerModelMap = new HashMap<>();
+    ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper();
+    String serializedContainerModel;
+
+    for (Map.Entry<String, ContainerModel> containerModelEntry : 
containerModelMap.entrySet()) {
+      serializedContainerModel = "";
 
 Review comment:
   yes, that gets handled; so that we can get the container count, even if the 
container-model doesnt get serialized for any reason.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to