This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3054a86  SAMZA-2706: Clean up specific handling of 
diagnostics-specific metrics reporter (#1550)
3054a86 is described below

commit 3054a86ecca7fd1909fd230042c698507e4eba05
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Nov 8 12:54:12 2021 -0800

    SAMZA-2706: Clean up specific handling of diagnostics-specific metrics 
reporter (#1550)
    
    API changes:
    1. In the YARN application master,diagnosticsreporter metrics reporter now 
is given a processorId of "ApplicationMaster" instead of 
"samza-container-ApplicationMaster".
    2. When using MetricsReporterLoader.getMetricsReporters, if 
diagnosticsreporter is in metrics.reporters, then diagnosticsreporter metrics 
reporter is always created.
    3. The diagnosticsreporter metrics reporter may still be created even if 
job.diagnostics.enabled is set to false. Note that Samza will only 
automatically create the stream for the diagnosticsreporter metrics reporter if 
diagnostics is enabled (since the DiagnosticsManager also uses that same 
stream), so if there is a case in which diagnosticsreporter metrics reporter is 
needed while diagnostics is disabled, then the stream for the reporter needs to 
be created through some other means.
---
 .../clustermanager/ContainerProcessManager.java    | 24 ++++++------------
 .../apache/samza/processor/StreamProcessor.java    | 16 +++---------
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 14 +++--------
 .../org/apache/samza/util/DiagnosticsUtil.java     | 29 +++++-----------------
 .../apache/samza/util/MetricsReporterLoader.java   | 12 +--------
 .../org/apache/samza/util/TestDiagnosticsUtil.java | 17 +++----------
 6 files changed, 26 insertions(+), 86 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 143e0b3..4ef9c68 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -44,13 +44,11 @@ import 
org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.JvmMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -105,7 +103,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   // The ContainerManager manages control actions for both active & standby 
containers
   private final ContainerManager containerManager;
 
-  private final Option<DiagnosticsManager> diagnosticsManager;
+  private final Optional<DiagnosticsManager> diagnosticsManager;
 
   private final LocalityManager localityManager;
 
@@ -161,15 +159,9 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     String jobName = new JobConfig(config).getName().get();
     String jobId = new JobConfig(config).getJobId();
     Optional<String> execEnvContainerId = 
Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair =
-        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, 
state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId, 
config);
-
-    if (diagnosticsManagerReporterPair.isPresent()) {
-      diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-      
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
-    } else {
-      diagnosticsManager = Option.empty();
-    }
+    this.diagnosticsManager =
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, 
state.jobModelManager.jobModel(), METRICS_SOURCE_NAME,
+            execEnvContainerId, config);
 
     this.localityManager = localityManager;
     // Wire all metrics to all reporters
@@ -201,7 +193,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     this.clusterResourceManager = resourceManager;
     this.containerManager = containerManager;
-    this.diagnosticsManager = Option.empty();
+    this.diagnosticsManager = Optional.empty();
     this.localityManager = localityManager;
     this.containerAllocator = allocator.orElseGet(
       () -> new ContainerAllocator(this.clusterResourceManager, 
clusterManagerConfig, state,
@@ -240,7 +232,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       metricsReporters.values().forEach(reporter -> reporter.start());
     }
 
-    if (diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isPresent()) {
       diagnosticsManager.get().start();
     }
 
@@ -295,7 +287,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       Thread.currentThread().interrupt();
     }
 
-    if (diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isPresent()) {
       try {
         diagnosticsManager.get().stop();
       } catch (InterruptedException e) {
@@ -398,7 +390,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         onResourceCompletedWithUnknownStatus(resourceStatus, containerId, 
processorId, exitStatus);
     }
 
-    if (diagnosticsManager.isDefined()) {
+    if (diagnosticsManager.isPresent()) {
       diagnosticsManager.get().addProcessorStopEvent(processorId, 
resourceStatus.getContainerId(), hostName, exitStatus);
     }
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ea5308f..92eca0b 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -31,12 +31,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainerListener;
@@ -55,7 +53,6 @@ import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
@@ -381,17 +378,11 @@ public class StreamProcessor {
 
   @VisibleForTesting
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
-
-    // Creating diagnostics manager and reporter, and wiring it respectively
+    // Creating diagnostics manager and wiring it respectively
     String jobName = new JobConfig(config).getName().get();
     String jobId = new JobConfig(config).getJobId();
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair =
+    Optional<DiagnosticsManager> diagnosticsManager =
         DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, 
processorId, Optional.empty(), config);
-    Option<DiagnosticsManager> diagnosticsManager = Option.empty();
-    if (diagnosticsManagerReporterPair.isPresent()) {
-      diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-      
this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
-    }
 
     // Metadata store lifecycle managed outside of the SamzaContainer.
     // All manager lifecycles are managed in the SamzaContainer including 
startpointManager
@@ -415,7 +406,8 @@ public class StreamProcessor {
         metricsRegistryMap, this.taskFactory, 
JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
         
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        Option.apply(this.externalContextOptional.orElse(null)), null, 
startpointManager, diagnosticsManager);
+        Option.apply(this.externalContextOptional.orElse(null)), null, 
startpointManager,
+        Option.apply(diagnosticsManager.orElse(null)));
   }
 
   private static JobCoordinator createJobCoordinator(Config config, String 
processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a114e7b..89ec196 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -22,13 +22,11 @@ package org.apache.samza.runtime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.ExecutionContainerIdManager;
@@ -48,7 +46,6 @@ import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
@@ -121,12 +118,9 @@ public class ContainerLaunchUtil {
       Map<String, MetricsReporter> metricsReporters = 
loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = 
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, 
execEnvContainerIdOptional, config);
-      Option<DiagnosticsManager> diagnosticsManager = Option.empty();
-      if (diagnosticsManagerReporterPair.isPresent()) {
-        diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-        
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
 diagnosticsManagerReporterPair.get().getValue());
-      }
+      Optional<DiagnosticsManager> diagnosticsManager =
+          DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, 
containerId, execEnvContainerIdOptional,
+              config);
       MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
@@ -140,7 +134,7 @@ public class ContainerLaunchUtil {
           Option.apply(externalContextOptional.orElse(null)),
           localityManager,
           startpointManager,
-          diagnosticsManager);
+          Option.apply(diagnosticsManager.orElse(null)));
 
       ProcessorLifecycleListener processorLifecycleListener = 
appDesc.getProcessorLifecycleListenerFactory()
           .createInstance(new ProcessorContext() { }, config);
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index b1e4206..f141d92 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -21,8 +21,6 @@ package org.apache.samza.util;
 import java.io.File;
 import java.time.Duration;
 import java.util.Optional;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -36,11 +34,9 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.LocalContainerRunner;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
@@ -95,27 +91,14 @@ public class DiagnosticsUtil {
    * if diagnostics is enabled.
    * execEnvContainerId is the ID assigned to the container by the cluster 
manager (e.g., YARN).
    */
-  public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
buildDiagnosticsManager(String jobName,
+  public static Optional<DiagnosticsManager> buildDiagnosticsManager(String 
jobName,
       String jobId, JobModel jobModel, String containerId, Optional<String> 
execEnvContainerId, Config config) {
 
     JobConfig jobConfig = new JobConfig(config);
     MetricsConfig metricsConfig = new MetricsConfig(config);
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = Optional.empty();
+    Optional<DiagnosticsManager> diagnosticsManagerOptional = Optional.empty();
 
     if (jobConfig.getDiagnosticsEnabled()) {
-
-      // Diagnostics MetricReporter init
-      String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
-      String diagnosticsFactoryClassName = 
metricsConfig.getMetricsFactoryClass(diagnosticsReporterName)
-          .orElseThrow(() -> new SamzaException(
-              String.format("Diagnostics reporter %s missing .class config", 
diagnosticsReporterName)));
-      MetricsReporterFactory metricsReporterFactory =
-          ReflectionUtil.getObj(diagnosticsFactoryClassName, 
MetricsReporterFactory.class);
-      MetricsSnapshotReporter diagnosticsReporter =
-          (MetricsSnapshotReporter) 
metricsReporterFactory.getMetricsReporter(diagnosticsReporterName,
-              "samza-container-" + containerId, config);
-
-      // DiagnosticsManager init
       ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
       int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
       int containerNumCores = clusterManagerConfig.getNumCores();
@@ -125,12 +108,12 @@ public class DiagnosticsUtil {
       String samzaVersion = Util.getSamzaVersion();
       String hostName = Util.getLocalHost().getHostName();
       Optional<String> diagnosticsReporterStreamName =
-          
metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName);
+          
metricsConfig.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS);
 
       if (!diagnosticsReporterStreamName.isPresent()) {
         throw new ConfigException(
             "Missing required config: " + 
String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
-                diagnosticsReporterName));
+                MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS));
       }
       SystemStream diagnosticsSystemStream = 
StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
 
@@ -153,10 +136,10 @@ public class DiagnosticsUtil {
               diagnosticsSystemStream, systemProducer,
               Duration.ofMillis(new TaskConfig(config).getShutdownMs()), 
jobConfig.getAutosizingEnabled(), config);
 
-      diagnosticsManagerReporterPair = Optional.of(new 
ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
+      diagnosticsManagerOptional = Optional.of(diagnosticsManager);
     }
 
-    return diagnosticsManagerReporterPair;
+    return diagnosticsManagerOptional;
   }
 
   public static void createDiagnosticsStream(Config config) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java 
b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
index 4e50efa..55baa13 100644
--- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
+++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
@@ -21,7 +21,6 @@ package org.apache.samza.util;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.MetricsReporter;
@@ -38,16 +37,7 @@ public class MetricsReporterLoader {
 
   public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig 
metricsConfig, String containerName) {
     Map<String, MetricsReporter> metricsReporters = new HashMap<>();
-
-    String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
-
-    // Exclude creation of diagnostics-reporter, because it is created 
manually in SamzaContainer (to allow sharing of
-    // sysProducer between reporter and diagnosticsManager
-    List<String> metricsReporterNames = metricsConfig.getMetricReporterNames()
-        .stream()
-        .filter(reporterName -> !reporterName.equals(diagnosticsReporterName))
-        .collect(Collectors.toList());
-
+    List<String> metricsReporterNames = metricsConfig.getMetricReporterNames();
     for (String metricsReporterName : metricsReporterNames) {
       String metricsFactoryClassName = 
metricsConfig.getMetricsFactoryClass(metricsReporterName)
           .orElseThrow(() -> new SamzaException(
diff --git 
a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java 
b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
index d17dac1..e10c551 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
@@ -22,7 +22,6 @@ package org.apache.samza.util;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -31,8 +30,6 @@ import org.apache.samza.config.SystemConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.junit.Assert;
@@ -48,7 +45,6 @@ import static org.mockito.Mockito.*;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ReflectionUtil.class})
 public class TestDiagnosticsUtil {
-
   private static final String STREAM_NAME = "someStreamName";
   private static final String JOB_NAME = "someJob";
   private static final String JOB_ID = "someId";
@@ -58,27 +54,20 @@ public class TestDiagnosticsUtil {
   public static final String SYSTEM_FACTORY = 
"com.foo.system.SomeSystemFactory";
 
   @Test
-  public void testBuildDiagnosticsManagerReturnsConfiguredReporter() {
+  public void testBuildDiagnosticsManager() {
     Config config = new MapConfig(buildTestConfigs());
     JobModel mockJobModel = mock(JobModel.class);
     SystemFactory systemFactory = mock(SystemFactory.class);
     SystemProducer mockProducer = mock(SystemProducer.class);
-    MetricsReporterFactory metricsReporterFactory = 
mock(MetricsReporterFactory.class);
-    MetricsSnapshotReporter mockReporter = mock(MetricsSnapshotReporter.class);
-
     when(systemFactory.getProducer(anyString(), any(Config.class), 
any(MetricsRegistry.class), anyString())).thenReturn(mockProducer);
-    when(metricsReporterFactory.getMetricsReporter(anyString(), anyString(), 
any(Config.class))).thenReturn(
-        mockReporter);
     PowerMockito.mockStatic(ReflectionUtil.class);
-    when(ReflectionUtil.getObj(REPORTER_FACTORY, 
MetricsReporterFactory.class)).thenReturn(metricsReporterFactory);
     when(ReflectionUtil.getObj(SYSTEM_FACTORY, 
SystemFactory.class)).thenReturn(systemFactory);
 
-    Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
managerReporterPair =
+    Optional<DiagnosticsManager> diagnosticsManager =
         DiagnosticsUtil.buildDiagnosticsManager(JOB_NAME, JOB_ID, 
mockJobModel, CONTAINER_ID, Optional.of(ENV_ID),
             config);
 
-    Assert.assertTrue(managerReporterPair.isPresent());
-    Assert.assertEquals(mockReporter, managerReporterPair.get().getValue());
+    Assert.assertTrue(diagnosticsManager.isPresent());
   }
 
   private Map<String, String> buildTestConfigs() {

Reply via email to