This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f3d9494 Adding two additional params to DiagnosticsStreamMessage
(#1126)
f3d9494 is described below
commit f3d949453e0b67d6e8f09c5c91dd4d17feb06f7d
Author: rmatharu <[email protected]>
AuthorDate: Tue Aug 6 17:24:23 2019 -0700
Adding two additional params to DiagnosticsStreamMessage (#1126)
Adding two additional params to DiagnosticsStreamMessage
---
.../org/apache/samza/util/DiagnosticsUtil.java | 7 ++-
.../samza/diagnostics/DiagnosticsManager.java | 58 ++++++++++++++++------
.../diagnostics/DiagnosticsStreamMessage.java | 30 ++++++++++-
.../samza/diagnostics/TestDiagnosticsManager.java | 6 ++-
4 files changed, 83 insertions(+), 18 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 4bc0f24..a3245a1 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -94,13 +94,16 @@ public class DiagnosticsUtil {
public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
buildDiagnosticsManager(String jobName,
String jobId, JobModel jobModel, String containerId, Optional<String>
execEnvContainerId, Config config) {
+ JobConfig jobConfig = new JobConfig(config);
Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair = Optional.empty();
- if (new JobConfig(config).getDiagnosticsEnabled()) {
+ if (jobConfig.getDiagnosticsEnabled()) {
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
int containerNumCores = clusterManagerConfig.getNumCores();
+ long maxHeapSizeBytes = Runtime.getRuntime().maxMemory();
+ int containerThreadPoolSize = jobConfig.getThreadPoolSize();
// Diagnostic stream, producer, and reporter related parameters
String diagnosticsReporterName =
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
@@ -129,7 +132,7 @@ public class DiagnosticsUtil {
systemFactory.getProducer(diagnosticsSystemStream.getSystem(),
config, new MetricsRegistryMap());
DiagnosticsManager diagnosticsManager =
new DiagnosticsManager(jobName, jobId, jobModel.getContainers(),
containerMemoryMb, containerNumCores,
- new StorageConfig(config).getNumStoresWithChangelog(),
containerId, execEnvContainerId.orElse(""),
+ new StorageConfig(config).getNumStoresWithChangelog(),
maxHeapSizeBytes, containerThreadPoolSize, containerId,
execEnvContainerId.orElse(""),
taskClassVersion, samzaVersion, hostName,
diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index aa41940..ed5179e 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -65,9 +65,11 @@ public class DiagnosticsManager {
private final Instant resetTime;
// Job-related params
- private final Integer containerMemoryMb;
- private final Integer containerNumCores;
- private final Integer numStoresWithChangelog;
+ private final int containerMemoryMb;
+ private final int containerNumCores;
+ private final int numStoresWithChangelog;
+ private final long maxHeapSizeBytes;
+ private final int containerThreadPoolSize;
private final Map<String, ContainerModel> containerModels;
private boolean jobParamsEmitted = false;
@@ -79,22 +81,46 @@ public class DiagnosticsManager {
private final Duration terminationDuration; // duration to wait when
terminating the scheduler
private final SystemStream diagnosticSystemStream;
- public DiagnosticsManager(String jobName, String jobId, Map<String,
ContainerModel> containerModels,
- Integer containerMemoryMb, Integer containerNumCores, Integer
numStoresWithChangelog, String containerId,
- String executionEnvContainerId, String taskClassVersion, String
samzaVersion, String hostname,
- SystemStream diagnosticSystemStream, SystemProducer systemProducer,
Duration terminationDuration) {
-
- this(jobName, jobId, containerModels, containerMemoryMb,
containerNumCores, numStoresWithChangelog, containerId,
- executionEnvContainerId, taskClassVersion, samzaVersion, hostname,
diagnosticSystemStream, systemProducer,
+ public DiagnosticsManager(String jobName,
+ String jobId,
+ Map<String, ContainerModel> containerModels,
+ int containerMemoryMb,
+ int containerNumCores,
+ int numStoresWithChangelog,
+ long maxHeapSizeBytes,
+ int containerThreadPoolSize,
+ String containerId,
+ String executionEnvContainerId,
+ String taskClassVersion,
+ String samzaVersion,
+ String hostname,
+ SystemStream diagnosticSystemStream,
+ SystemProducer systemProducer,
+ Duration terminationDuration) {
+
+ this(jobName, jobId, containerModels, containerMemoryMb,
containerNumCores, numStoresWithChangelog, maxHeapSizeBytes,
containerThreadPoolSize,
+ containerId, executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticSystemStream, systemProducer,
terminationDuration, Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
}
@VisibleForTesting
- DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel>
containerModels,
- int containerMemoryMb, int containerNumCores, int
numStoresWithChangelog, String containerId,
- String executionEnvContainerId, String taskClassVersion, String
samzaVersion, String hostname,
- SystemStream diagnosticSystemStream, SystemProducer systemProducer,
Duration terminationDuration,
+ DiagnosticsManager(String jobName,
+ String jobId,
+ Map<String, ContainerModel> containerModels,
+ int containerMemoryMb,
+ int containerNumCores,
+ int numStoresWithChangelog,
+ long maxHeapSizeBytes,
+ int containerThreadPoolSize,
+ String containerId,
+ String executionEnvContainerId,
+ String taskClassVersion,
+ String samzaVersion,
+ String hostname,
+ SystemStream diagnosticSystemStream,
+ SystemProducer systemProducer,
+ Duration terminationDuration,
ScheduledExecutorService executorService) {
this.jobName = jobName;
this.jobId = jobId;
@@ -102,6 +128,8 @@ public class DiagnosticsManager {
this.containerMemoryMb = containerMemoryMb;
this.containerNumCores = containerNumCores;
this.numStoresWithChangelog = numStoresWithChangelog;
+ this.maxHeapSizeBytes = maxHeapSizeBytes;
+ this.containerThreadPoolSize = containerThreadPoolSize;
this.containerId = containerId;
this.executionEnvContainerId = executionEnvContainerId;
this.taskClassVersion = taskClassVersion;
@@ -185,6 +213,8 @@ public class DiagnosticsManager {
diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
diagnosticsStreamMessage.addContainerModels(containerModels);
+ diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
+
diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
}
// Add stop event list to the message
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 6840912..81642d5 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -55,7 +55,9 @@ public class DiagnosticsStreamMessage {
private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
private static final String CONTAINER_NUM_CORES_METRIC_NAME =
"containerNumCores";
- public static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME =
"numStoresWithChangelog";
+ private static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME
= "numStoresWithChangelog";
+ private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME =
"maxHeap";
+ private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME =
"containerThreadPoolSize";
private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
private final MetricsHeader metricsHeader;
@@ -98,6 +100,22 @@ public class DiagnosticsStreamMessage {
}
/**
+ * Add the configured max heap size in bytes.
+ * @param maxHeapSize the parameter value.
+ */
+ public void addMaxHeapSize(Long maxHeapSize) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME, maxHeapSize);
+ }
+
+ /**
+ * Add the configured container thread pool size.
+ * @param threadPoolSize the parameter value.
+ */
+ public void addContainerThreadPoolSize(Integer threadPoolSize) {
+ addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_THREAD_POOL_SIZE_METRIC_NAME, threadPoolSize);
+ }
+
+ /**
* Add a map of container models (indexed by containerID) to the message.
* @param containerModelMap the container models map
*/
@@ -185,6 +203,14 @@ public class DiagnosticsStreamMessage {
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
}
+ public Long getMaxHeapSize() {
+ return (Long) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME);
+ }
+
+ public Integer getContainerThreadPoolSize() {
+ return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_THREAD_POOL_SIZE_METRIC_NAME);
+ }
+
public Map<String, ContainerModel> getContainerModels() {
return deserializeContainerModelMap((String)
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
CONTAINER_MODELS_METRIC_NAME));
}
@@ -210,6 +236,8 @@ public class DiagnosticsStreamMessage {
diagnosticsStreamMessage.addContainerMb((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
diagnosticsStreamMessage.addNumStoresWithChangelog((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String)
diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
+ diagnosticsStreamMessage.addMaxHeapSize((Long)
diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME));
+ diagnosticsStreamMessage.addContainerThreadPoolSize((Integer)
diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>)
diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
}
diff --git
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index c69b278..33d16e3 100644
---
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -53,6 +53,8 @@ public class TestDiagnosticsManager {
private String samzaVersion = "1.3.0";
private String hostname = "sample host name";
private int containerMb = 1024;
+ private int containerThreadPoolSize = 2;
+ private long maxHeapSize = 900;
private int numStoresWithChangelog = 2;
private int containerNumCores = 2;
private Map<String, ContainerModel> containerModels =
TestDiagnosticsStreamMessage.getSampleContainerModels();
@@ -73,7 +75,7 @@ public class TestDiagnosticsManager {
});
this.diagnosticsManager =
- new DiagnosticsManager(jobName, jobId, containerModels, containerMb,
containerNumCores, numStoresWithChangelog,
+ new DiagnosticsManager(jobName, jobId, containerModels, containerMb,
containerNumCores, numStoresWithChangelog, maxHeapSize, containerThreadPoolSize,
"0", executionEnvContainerId, taskClassVersion, samzaVersion,
hostname, diagnosticsSystemStream,
mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);
@@ -202,6 +204,8 @@ public class TestDiagnosticsManager {
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
Assert.assertEquals(containerMb,
diagnosticsStreamMessage.getContainerMb().intValue());
+ Assert.assertEquals(maxHeapSize,
diagnosticsStreamMessage.getMaxHeapSize().longValue());
+ Assert.assertEquals(containerThreadPoolSize,
diagnosticsStreamMessage.getContainerThreadPoolSize().intValue());
Assert.assertEquals(exceptionEventList,
diagnosticsStreamMessage.getExceptionEvents());
Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname,
101)));
Assert.assertEquals(containerModels,
diagnosticsStreamMessage.getContainerModels());