This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c8f8757  Emitting numPersistentStores instead of num stores with 
changelog from DiagnosticsManager (#1145)
c8f8757 is described below

commit c8f875740b25b8edaf25d17c43cd5d409f278215
Author: rmatharu <[email protected]>
AuthorDate: Mon Aug 26 11:54:46 2019 -0700

    Emitting numPersistentStores instead of num stores with changelog from 
DiagnosticsManager (#1145)
    
    * Emitting numPersistentStores instead of num stores with changelog
---
 .../java/org/apache/samza/config/StorageConfig.java     | 13 +++++++++----
 .../java/org/apache/samza/util/DiagnosticsUtil.java     |  2 +-
 .../apache/samza/diagnostics/DiagnosticsManager.java    | 12 ++++++------
 .../samza/diagnostics/DiagnosticsStreamMessage.java     | 17 +++++++++--------
 .../samza/diagnostics/TestDiagnosticsManager.java       | 10 +++++-----
 .../samza/diagnostics/TestDiagnosticsStreamMessage.java |  6 +++---
 6 files changed, 33 insertions(+), 27 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 0bb9b99..7bc6cb4 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -63,6 +63,8 @@ public class StorageConfig extends MapConfig {
   static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + 
"%s.side.inputs.processor.factory";
   static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE =
       STORE_PREFIX + "%s.side.inputs.processor.serialized.instance";
+  static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
 
   public StorageConfig(Config config) {
     super(config);
@@ -225,10 +227,13 @@ public class StorageConfig extends MapConfig {
   }
 
   /**
-   * Helper method to get the number of stores configured with a changelog.
+   * Helper method to get the number of persistent stores.
    */
-  public int getNumStoresWithChangelog() {
-    Config subConfig = subset(STORE_PREFIX, true);
-    return new Long(subConfig.keySet().stream().filter(key -> 
key.endsWith(CHANGELOG_SUFFIX)).count()).intValue();
+  public int getNumPersistentStores() {
+    return (int) getStoreNames().stream()
+        .map(storeName -> getStorageFactoryClassName(storeName))
+        .filter(factoryName -> factoryName.isPresent())
+        .filter(factoryName -> 
!factoryName.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY))
+        .count();
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index a3245a1..2870153 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -132,7 +132,7 @@ public class DiagnosticsUtil {
           systemFactory.getProducer(diagnosticsSystemStream.getSystem(), 
config, new MetricsRegistryMap());
       DiagnosticsManager diagnosticsManager =
           new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), 
containerMemoryMb, containerNumCores,
-              new StorageConfig(config).getNumStoresWithChangelog(), 
maxHeapSizeBytes, containerThreadPoolSize, containerId, 
execEnvContainerId.orElse(""),
+              new StorageConfig(config).getNumPersistentStores(), 
maxHeapSizeBytes, containerThreadPoolSize, containerId, 
execEnvContainerId.orElse(""),
               taskClassVersion, samzaVersion, hostName, 
diagnosticsSystemStream, systemProducer,
               Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index ed5179e..80ddf0d 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -67,7 +67,7 @@ public class DiagnosticsManager {
   // Job-related params
   private final int containerMemoryMb;
   private final int containerNumCores;
-  private final int numStoresWithChangelog;
+  private final int numPersistentStores;
   private final long maxHeapSizeBytes;
   private final int containerThreadPoolSize;
   private final Map<String, ContainerModel> containerModels;
@@ -86,7 +86,7 @@ public class DiagnosticsManager {
       Map<String, ContainerModel> containerModels,
       int containerMemoryMb,
       int containerNumCores,
-      int numStoresWithChangelog,
+      int numPersistentStores,
       long maxHeapSizeBytes,
       int containerThreadPoolSize,
       String containerId,
@@ -98,7 +98,7 @@ public class DiagnosticsManager {
       SystemProducer systemProducer,
       Duration terminationDuration) {
 
-    this(jobName, jobId, containerModels, containerMemoryMb, 
containerNumCores, numStoresWithChangelog, maxHeapSizeBytes, 
containerThreadPoolSize,
+    this(jobName, jobId, containerModels, containerMemoryMb, 
containerNumCores, numPersistentStores, maxHeapSizeBytes, 
containerThreadPoolSize,
         containerId, executionEnvContainerId, taskClassVersion, samzaVersion, 
hostname, diagnosticSystemStream, systemProducer,
         terminationDuration, Executors.newSingleThreadScheduledExecutor(
             new 
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
@@ -110,7 +110,7 @@ public class DiagnosticsManager {
       Map<String, ContainerModel> containerModels,
       int containerMemoryMb,
       int containerNumCores,
-      int numStoresWithChangelog,
+      int numPersistentStores,
       long maxHeapSizeBytes,
       int containerThreadPoolSize,
       String containerId,
@@ -127,7 +127,7 @@ public class DiagnosticsManager {
     this.containerModels = containerModels;
     this.containerMemoryMb = containerMemoryMb;
     this.containerNumCores = containerNumCores;
-    this.numStoresWithChangelog = numStoresWithChangelog;
+    this.numPersistentStores = numPersistentStores;
     this.maxHeapSizeBytes = maxHeapSizeBytes;
     this.containerThreadPoolSize = containerThreadPoolSize;
     this.containerId = containerId;
@@ -211,7 +211,7 @@ public class DiagnosticsManager {
         if (!jobParamsEmitted) {
           diagnosticsStreamMessage.addContainerMb(containerMemoryMb);
           diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
-          
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
+          diagnosticsStreamMessage.addNumPersistentStores(numPersistentStores);
           diagnosticsStreamMessage.addContainerModels(containerModels);
           diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
           
diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 81642d5..de12bde 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -55,7 +55,7 @@ public class DiagnosticsStreamMessage {
   private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
   private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
   private static final String CONTAINER_NUM_CORES_METRIC_NAME = 
"containerNumCores";
-  private static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME 
= "numStoresWithChangelog";
+  private static final String CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME = 
"numPersistentStores";
   private static final String CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME = 
"maxHeap";
   private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = 
"containerThreadPoolSize";
   private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
@@ -92,11 +92,11 @@ public class DiagnosticsStreamMessage {
 
   /**
    * Add the num stores with changelog parameter to the message.
-   * @param numStoresWithChangelog the parameter value.
+   * @param numPersistentStores the parameter value.
    */
-  public void addNumStoresWithChangelog(Integer numStoresWithChangelog) {
-    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME,
-        numStoresWithChangelog);
+  public void addNumPersistentStores(Integer numPersistentStores) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME,
+        numPersistentStores);
   }
 
   /**
@@ -198,9 +198,9 @@ public class DiagnosticsStreamMessage {
     return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_CORES_METRIC_NAME);
   }
 
-  public Integer getNumStoresWithChangelog() {
+  public Integer getNumPersistentStores() {
     return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
-        CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
+        CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME);
   }
 
   public Long getMaxHeapSize() {
@@ -234,7 +234,8 @@ public class DiagnosticsStreamMessage {
 
       diagnosticsStreamMessage.addContainerNumCores((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME));
       diagnosticsStreamMessage.addContainerMb((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
-      diagnosticsStreamMessage.addNumStoresWithChangelog((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
+      diagnosticsStreamMessage.addNumPersistentStores((Integer) 
diagnosticsManagerGroupMap.get(
+          CONTAINER_NUM_PERSISTENT_STORES_METRIC_NAME));
       
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String)
 diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
       diagnosticsStreamMessage.addMaxHeapSize((Long) 
diagnosticsManagerGroupMap.get(CONTAINER_MAX_CONFIGURED_HEAP_METRIC_NAME));
       diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
diff --git 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 33d16e3..d1acdd2 100644
--- 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -55,7 +55,7 @@ public class TestDiagnosticsManager {
   private int containerMb = 1024;
   private int containerThreadPoolSize = 2;
   private long maxHeapSize = 900;
-  private int numStoresWithChangelog = 2;
+  private int numPersistentStores = 2;
   private int containerNumCores = 2;
   private Map<String, ContainerModel> containerModels = 
TestDiagnosticsStreamMessage.getSampleContainerModels();
   private Collection<DiagnosticsExceptionEvent> exceptionEventList = 
TestDiagnosticsStreamMessage.getExceptionList();
@@ -75,7 +75,7 @@ public class TestDiagnosticsManager {
           });
 
     this.diagnosticsManager =
-        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, 
containerNumCores, numStoresWithChangelog, maxHeapSize, containerThreadPoolSize,
+        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, 
containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
             "0", executionEnvContainerId, taskClassVersion, samzaVersion, 
hostname, diagnosticsSystemStream,
             mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);
 
@@ -136,7 +136,7 @@ public class TestDiagnosticsManager {
         Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, 
hostname, 102)));
     Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
     Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
-    Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+    Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
   }
 
   @Test
@@ -169,7 +169,7 @@ public class TestDiagnosticsManager {
     Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents());
     Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
     Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
-    Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+    Assert.assertNull(diagnosticsStreamMessage.getNumPersistentStores());
   }
 
   @After
@@ -210,7 +210,7 @@ public class TestDiagnosticsManager {
     Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), 
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 
101)));
     Assert.assertEquals(containerModels, 
diagnosticsStreamMessage.getContainerModels());
     Assert.assertEquals(containerNumCores, 
diagnosticsStreamMessage.getContainerNumCores().intValue());
-    Assert.assertEquals(numStoresWithChangelog, 
diagnosticsStreamMessage.getNumStoresWithChangelog().intValue());
+    Assert.assertEquals(numPersistentStores, 
diagnosticsStreamMessage.getNumPersistentStores().intValue());
   }
 
   private class MockSystemProducer implements SystemProducer {
diff --git 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index 81bc577..cd506b2 100644
--- 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++ 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -54,7 +54,7 @@ public class TestDiagnosticsStreamMessage {
 
     diagnosticsStreamMessage.addContainerMb(1024);
     diagnosticsStreamMessage.addContainerNumCores(2);
-    diagnosticsStreamMessage.addNumStoresWithChangelog(3);
+    diagnosticsStreamMessage.addNumPersistentStores(3);
 
     
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
     return diagnosticsStreamMessage;
@@ -106,7 +106,7 @@ public class TestDiagnosticsStreamMessage {
 
     Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
     Assert.assertEquals(2, (int) 
diagnosticsStreamMessage.getContainerNumCores());
-    Assert.assertEquals(3, (int) 
diagnosticsStreamMessage.getNumStoresWithChangelog());
+    Assert.assertEquals(3, (int) 
diagnosticsStreamMessage.getNumPersistentStores());
     Assert.assertEquals(exceptionEventList, 
diagnosticsStreamMessage.getExceptionEvents());
     Assert.assertEquals(getSampleContainerModels(), 
diagnosticsStreamMessage.getContainerModels());
     Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), 
getProcessorStopEventList());
@@ -135,7 +135,7 @@ public class TestDiagnosticsStreamMessage {
     Map<String, Map<String, Object>> metricsMap = 
metricsSnapshot.getMetrics().getAsMap();
     
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
     
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels"));
-    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog"));
+    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numPersistentStores"));
     
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
     
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
     
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));

Reply via email to