This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d9494  Adding two additional params to DiagnosticsStreamMessage 
(#1126)
f3d9494 is described below

commit f3d949453e0b67d6e8f09c5c91dd4d17feb06f7d
Author: rmatharu <[email protected]>
AuthorDate: Tue Aug 6 17:24:23 2019 -0700

    Adding two additional params to DiagnosticsStreamMessage (#1126)
    
    Adding two additional params to DiagnosticsStreamMessage
---
 .../org/apache/samza/util/DiagnosticsUtil.java     |  7 ++-
 .../samza/diagnostics/DiagnosticsManager.java      | 58 ++++++++++++++++------
 .../diagnostics/DiagnosticsStreamMessage.java      | 30 ++++++++++-
 .../samza/diagnostics/TestDiagnosticsManager.java  |  6 ++-
 4 files changed, 83 insertions(+), 18 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 4bc0f24..a3245a1 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -94,13 +94,16 @@ public class DiagnosticsUtil {
   public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
buildDiagnosticsManager(String jobName,
       String jobId, JobModel jobModel, String containerId, Optional<String> 
execEnvContainerId, Config config) {
 
+    JobConfig jobConfig = new JobConfig(config);
     Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = Optional.empty();
 
-    if (new JobConfig(config).getDiagnosticsEnabled()) {
+    if (jobConfig.getDiagnosticsEnabled()) {
 
       ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
       int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
       int containerNumCores = clusterManagerConfig.getNumCores();
+      long maxHeapSizeBytes = Runtime.getRuntime().maxMemory();
+      int containerThreadPoolSize = jobConfig.getThreadPoolSize();
 
       // Diagnostic stream, producer, and reporter related parameters
       String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
@@ -129,7 +132,7 @@ public class DiagnosticsUtil {
           systemFactory.getProducer(diagnosticsSystemStream.getSystem(), 
config, new MetricsRegistryMap());
       DiagnosticsManager diagnosticsManager =
           new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), 
containerMemoryMb, containerNumCores,
-              new StorageConfig(config).getNumStoresWithChangelog(), 
containerId, execEnvContainerId.orElse(""),
+              new StorageConfig(config).getNumStoresWithChangelog(), 
maxHeapSizeBytes, containerThreadPoolSize, containerId, 
execEnvContainerId.orElse(""),
               taskClassVersion, samzaVersion, hostName, 
diagnosticsSystemStream, systemProducer,
               Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index aa41940..ed5179e 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -65,9 +65,11 @@ public class DiagnosticsManager {
   private final Instant resetTime;
 
   // Job-related params
-  private final Integer containerMemoryMb;
-  private final Integer containerNumCores;
-  private final Integer numStoresWithChangelog;
+  private final int containerMemoryMb;
+  private final int containerNumCores;
+  private final int numStoresWithChangelog;
+  private final long maxHeapSizeBytes;
+  private final int containerThreadPoolSize;
   private final Map<String, ContainerModel> containerModels;
   private boolean jobParamsEmitted = false;
 
@@ -79,22 +81,46 @@ public class DiagnosticsManager {
   private final Duration terminationDuration; // duration to wait when 
terminating the scheduler
   private final SystemStream diagnosticSystemStream;
 
-  public DiagnosticsManager(String jobName, String jobId, Map<String, 
ContainerModel> containerModels,
-      Integer containerMemoryMb, Integer containerNumCores, Integer 
numStoresWithChangelog, String containerId,
-      String executionEnvContainerId, String taskClassVersion, String 
samzaVersion, String hostname,
-      SystemStream diagnosticSystemStream, SystemProducer systemProducer, 
Duration terminationDuration) {
-
-    this(jobName, jobId, containerModels, containerMemoryMb, 
containerNumCores, numStoresWithChangelog, containerId,
-        executionEnvContainerId, taskClassVersion, samzaVersion, hostname, 
diagnosticSystemStream, systemProducer,
+  public DiagnosticsManager(String jobName,
+      String jobId,
+      Map<String, ContainerModel> containerModels,
+      int containerMemoryMb,
+      int containerNumCores,
+      int numStoresWithChangelog,
+      long maxHeapSizeBytes,
+      int containerThreadPoolSize,
+      String containerId,
+      String executionEnvContainerId,
+      String taskClassVersion,
+      String samzaVersion,
+      String hostname,
+      SystemStream diagnosticSystemStream,
+      SystemProducer systemProducer,
+      Duration terminationDuration) {
+
+    this(jobName, jobId, containerModels, containerMemoryMb, 
containerNumCores, numStoresWithChangelog, maxHeapSizeBytes, 
containerThreadPoolSize,
+        containerId, executionEnvContainerId, taskClassVersion, samzaVersion, 
hostname, diagnosticSystemStream, systemProducer,
         terminationDuration, Executors.newSingleThreadScheduledExecutor(
             new 
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
   }
 
   @VisibleForTesting
-  DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel> 
containerModels,
-      int containerMemoryMb, int containerNumCores, int 
numStoresWithChangelog, String containerId,
-      String executionEnvContainerId, String taskClassVersion, String 
samzaVersion, String hostname,
-      SystemStream diagnosticSystemStream, SystemProducer systemProducer, 
Duration terminationDuration,
+  DiagnosticsManager(String jobName,
+      String jobId,
+      Map<String, ContainerModel> containerModels,
+      int containerMemoryMb,
+      int containerNumCores,
+      int numStoresWithChangelog,
+      long maxHeapSizeBytes,
+      int containerThreadPoolSize,
+      String containerId,
+      String executionEnvContainerId,
+      String taskClassVersion,
+      String samzaVersion,
+      String hostname,
+      SystemStream diagnosticSystemStream,
+      SystemProducer systemProducer,
+      Duration terminationDuration,
       ScheduledExecutorService executorService) {
     this.jobName = jobName;
     this.jobId = jobId;
@@ -102,6 +128,8 @@ public class DiagnosticsManager {
     this.containerMemoryMb = containerMemoryMb;
     this.containerNumCores = containerNumCores;
     this.numStoresWithChangelog = numStoresWithChangelog;
+    this.maxHeapSizeBytes = maxHeapSizeBytes;
+    this.containerThreadPoolSize = containerThreadPoolSize;
     this.containerId = containerId;
     this.executionEnvContainerId = executionEnvContainerId;
     this.taskClassVersion = taskClassVersion;
@@ -185,6 +213,8 @@ public class DiagnosticsManager {
           diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
           
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
           diagnosticsStreamMessage.addContainerModels(containerModels);
+          diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
+          
diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
         }
 
         // Add stop event list to the message
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 6840912..81642d5 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -55,7 +55,9 @@ public class DiagnosticsStreamMessage {
   private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
   private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
   private static final String CONTAINER_NUM_CORES_METRIC_NAME = 
"containerNumCores";
-  public static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME = 
"numStoresWithChangelog";
+  private static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME 
= "numStoresWithChangelog";
+  private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME = 
"maxHeap";
+  private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = 
"containerThreadPoolSize";
   private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
 
   private final MetricsHeader metricsHeader;
@@ -98,6 +100,22 @@ public class DiagnosticsStreamMessage {
   }
 
   /**
+   * Add the configured max heap size in bytes.
+   * @param maxHeapSize the parameter value.
+   */
+  public void addMaxHeapSize(Long maxHeapSize) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME, maxHeapSize);
+  }
+
+  /**
+   * Add the configured container thread pool size.
+   * @param threadPoolSize the parameter value.
+   */
+  public void addContainerThreadPoolSize(Integer threadPoolSize) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_THREAD_POOL_SIZE_METRIC_NAME, threadPoolSize);
+  }
+
+  /**
    * Add a map of container models (indexed by containerID) to the message.
    * @param containerModelMap the container models map
    */
@@ -185,6 +203,14 @@ public class DiagnosticsStreamMessage {
         CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
   }
 
+  public Long getMaxHeapSize() {
+    return (Long) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME);
+  }
+
+  public Integer getContainerThreadPoolSize() {
+    return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_THREAD_POOL_SIZE_METRIC_NAME);
+  }
+
   public Map<String, ContainerModel> getContainerModels() {
     return deserializeContainerModelMap((String) 
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MODELS_METRIC_NAME));
   }
@@ -210,6 +236,8 @@ public class DiagnosticsStreamMessage {
       diagnosticsStreamMessage.addContainerMb((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
       diagnosticsStreamMessage.addNumStoresWithChangelog((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
       
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String)
 diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
+      diagnosticsStreamMessage.addMaxHeapSize((Long) 
diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME));
+      diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
 
       
diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) 
diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
     }
diff --git 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index c69b278..33d16e3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -53,6 +53,8 @@ public class TestDiagnosticsManager {
   private String samzaVersion = "1.3.0";
   private String hostname = "sample host name";
   private int containerMb = 1024;
+  private int containerThreadPoolSize = 2;
+  private long maxHeapSize = 900;
   private int numStoresWithChangelog = 2;
   private int containerNumCores = 2;
   private Map<String, ContainerModel> containerModels = 
TestDiagnosticsStreamMessage.getSampleContainerModels();
@@ -73,7 +75,7 @@ public class TestDiagnosticsManager {
           });
 
     this.diagnosticsManager =
-        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, 
containerNumCores, numStoresWithChangelog,
+        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, 
containerNumCores, numStoresWithChangelog, maxHeapSize, containerThreadPoolSize,
             "0", executionEnvContainerId, taskClassVersion, samzaVersion, 
hostname, diagnosticsSystemStream,
             mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);
 
@@ -202,6 +204,8 @@ public class TestDiagnosticsManager {
         
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
 
     Assert.assertEquals(containerMb, 
diagnosticsStreamMessage.getContainerMb().intValue());
+    Assert.assertEquals(maxHeapSize, 
diagnosticsStreamMessage.getMaxHeapSize().longValue());
+    Assert.assertEquals(containerThreadPoolSize, 
diagnosticsStreamMessage.getContainerThreadPoolSize().intValue());
     Assert.assertEquals(exceptionEventList, 
diagnosticsStreamMessage.getExceptionEvents());
     Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), 
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 
101)));
     Assert.assertEquals(containerModels, 
diagnosticsStreamMessage.getContainerModels());

Reply via email to