This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3054a86 SAMZA-2706: Clean up specific handling of
diagnostics-specific metrics reporter (#1550)
3054a86 is described below
commit 3054a86ecca7fd1909fd230042c698507e4eba05
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Nov 8 12:54:12 2021 -0800
SAMZA-2706: Clean up specific handling of diagnostics-specific metrics
reporter (#1550)
API changes:
1. In the YARN application master,diagnosticsreporter metrics reporter now
is given a processorId of "ApplicationMaster" instead of
"samza-container-ApplicationMaster".
2. When using MetricsReporterLoader.getMetricsReporters, if
diagnosticsreporter is in metrics.reporters, then diagnosticsreporter metrics
reporter is always created.
3. The diagnosticsreporter metrics reporter may still be created even if
job.diagnostics.enabled is set to false. Note that Samza will only
automatically create the stream for the diagnosticsreporter metrics reporter if
diagnostics is enabled (since the DiagnosticsManager also uses that same
stream), so if there is a case in which diagnosticsreporter metrics reporter is
needed while diagnostics is disabled, then the stream for the reporter needs to
be created through some other means.
---
.../clustermanager/ContainerProcessManager.java | 24 ++++++------------
.../apache/samza/processor/StreamProcessor.java | 16 +++---------
.../apache/samza/runtime/ContainerLaunchUtil.java | 14 +++--------
.../org/apache/samza/util/DiagnosticsUtil.java | 29 +++++-----------------
.../apache/samza/util/MetricsReporterLoader.java | 12 +--------
.../org/apache/samza/util/TestDiagnosticsUtil.java | 17 +++----------
6 files changed, 26 insertions(+), 86 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 143e0b3..4ef9c68 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -44,13 +44,11 @@ import
org.apache.samza.metrics.ContainerProcessManagerMetrics;
import org.apache.samza.metrics.JvmMetrics;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -105,7 +103,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
// The ContainerManager manages control actions for both active & standby
containers
private final ContainerManager containerManager;
- private final Option<DiagnosticsManager> diagnosticsManager;
+ private final Optional<DiagnosticsManager> diagnosticsManager;
private final LocalityManager localityManager;
@@ -161,15 +159,9 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
String jobName = new JobConfig(config).getName().get();
String jobId = new JobConfig(config).getJobId();
Optional<String> execEnvContainerId =
Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
- Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
- DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId,
state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId,
config);
-
- if (diagnosticsManagerReporterPair.isPresent()) {
- diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
diagnosticsManagerReporterPair.get().getValue());
- } else {
- diagnosticsManager = Option.empty();
- }
+ this.diagnosticsManager =
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId,
state.jobModelManager.jobModel(), METRICS_SOURCE_NAME,
+ execEnvContainerId, config);
this.localityManager = localityManager;
// Wire all metrics to all reporters
@@ -201,7 +193,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
this.clusterResourceManager = resourceManager;
this.containerManager = containerManager;
- this.diagnosticsManager = Option.empty();
+ this.diagnosticsManager = Optional.empty();
this.localityManager = localityManager;
this.containerAllocator = allocator.orElseGet(
() -> new ContainerAllocator(this.clusterResourceManager,
clusterManagerConfig, state,
@@ -240,7 +232,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
metricsReporters.values().forEach(reporter -> reporter.start());
}
- if (diagnosticsManager.isDefined()) {
+ if (diagnosticsManager.isPresent()) {
diagnosticsManager.get().start();
}
@@ -295,7 +287,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
Thread.currentThread().interrupt();
}
- if (diagnosticsManager.isDefined()) {
+ if (diagnosticsManager.isPresent()) {
try {
diagnosticsManager.get().stop();
} catch (InterruptedException e) {
@@ -398,7 +390,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
onResourceCompletedWithUnknownStatus(resourceStatus, containerId,
processorId, exitStatus);
}
- if (diagnosticsManager.isDefined()) {
+ if (diagnosticsManager.isPresent()) {
diagnosticsManager.get().addProcessorStopEvent(processorId,
resourceStatus.getContainerId(), hostName, exitStatus);
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ea5308f..92eca0b 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -31,12 +31,10 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainerListener;
@@ -55,7 +53,6 @@ import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.runtime.ProcessorLifecycleListener;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
@@ -381,17 +378,11 @@ public class StreamProcessor {
@VisibleForTesting
SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
-
- // Creating diagnostics manager and reporter, and wiring it respectively
+ // Creating diagnostics manager and wiring it respectively
String jobName = new JobConfig(config).getName().get();
String jobId = new JobConfig(config).getJobId();
- Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
+ Optional<DiagnosticsManager> diagnosticsManager =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel,
processorId, Optional.empty(), config);
- Option<DiagnosticsManager> diagnosticsManager = Option.empty();
- if (diagnosticsManagerReporterPair.isPresent()) {
- diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-
this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
diagnosticsManagerReporterPair.get().getValue());
- }
// Metadata store lifecycle managed outside of the SamzaContainer.
// All manager lifecycles are managed in the SamzaContainer including
startpointManager
@@ -415,7 +406,8 @@ public class StreamProcessor {
metricsRegistryMap, this.taskFactory,
JobContextImpl.fromConfigWithDefaults(this.config, jobModel),
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
- Option.apply(this.externalContextOptional.orElse(null)), null,
startpointManager, diagnosticsManager);
+ Option.apply(this.externalContextOptional.orElse(null)), null,
startpointManager,
+ Option.apply(diagnosticsManager.orElse(null)));
}
private static JobCoordinator createJobCoordinator(Config config, String
processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index a114e7b..89ec196 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -22,13 +22,11 @@ package org.apache.samza.runtime;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.ContainerHeartbeatMonitor;
import org.apache.samza.container.ExecutionContainerIdManager;
@@ -48,7 +46,6 @@ import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
@@ -121,12 +118,9 @@ public class ContainerLaunchUtil {
Map<String, MetricsReporter> metricsReporters =
loadMetricsReporters(appDesc, containerId, config);
// Creating diagnostics manager and reporter, and wiring it respectively
- Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId,
execEnvContainerIdOptional, config);
- Option<DiagnosticsManager> diagnosticsManager = Option.empty();
- if (diagnosticsManagerReporterPair.isPresent()) {
- diagnosticsManager =
Option.apply(diagnosticsManagerReporterPair.get().getKey());
-
metricsReporters.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS,
diagnosticsManagerReporterPair.get().getValue());
- }
+ Optional<DiagnosticsManager> diagnosticsManager =
+ DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel,
containerId, execEnvContainerIdOptional,
+ config);
MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
SamzaContainer container = SamzaContainer$.MODULE$.apply(
@@ -140,7 +134,7 @@ public class ContainerLaunchUtil {
Option.apply(externalContextOptional.orElse(null)),
localityManager,
startpointManager,
- diagnosticsManager);
+ Option.apply(diagnosticsManager.orElse(null)));
ProcessorLifecycleListener processorLifecycleListener =
appDesc.getProcessorLifecycleListenerFactory()
.createInstance(new ProcessorContext() { }, config);
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index b1e4206..f141d92 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -21,8 +21,6 @@ package org.apache.samza.util;
import java.io.File;
import java.time.Duration;
import java.util.Optional;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
@@ -36,11 +34,9 @@ import org.apache.samza.config.TaskConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.runtime.LocalContainerRunner;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
@@ -95,27 +91,14 @@ public class DiagnosticsUtil {
* if diagnostics is enabled.
* execEnvContainerId is the ID assigned to the container by the cluster
manager (e.g., YARN).
*/
- public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
buildDiagnosticsManager(String jobName,
+ public static Optional<DiagnosticsManager> buildDiagnosticsManager(String
jobName,
String jobId, JobModel jobModel, String containerId, Optional<String>
execEnvContainerId, Config config) {
JobConfig jobConfig = new JobConfig(config);
MetricsConfig metricsConfig = new MetricsConfig(config);
- Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
diagnosticsManagerReporterPair = Optional.empty();
+ Optional<DiagnosticsManager> diagnosticsManagerOptional = Optional.empty();
if (jobConfig.getDiagnosticsEnabled()) {
-
- // Diagnostics MetricReporter init
- String diagnosticsReporterName =
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
- String diagnosticsFactoryClassName =
metricsConfig.getMetricsFactoryClass(diagnosticsReporterName)
- .orElseThrow(() -> new SamzaException(
- String.format("Diagnostics reporter %s missing .class config",
diagnosticsReporterName)));
- MetricsReporterFactory metricsReporterFactory =
- ReflectionUtil.getObj(diagnosticsFactoryClassName,
MetricsReporterFactory.class);
- MetricsSnapshotReporter diagnosticsReporter =
- (MetricsSnapshotReporter)
metricsReporterFactory.getMetricsReporter(diagnosticsReporterName,
- "samza-container-" + containerId, config);
-
- // DiagnosticsManager init
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
int containerNumCores = clusterManagerConfig.getNumCores();
@@ -125,12 +108,12 @@ public class DiagnosticsUtil {
String samzaVersion = Util.getSamzaVersion();
String hostName = Util.getLocalHost().getHostName();
Optional<String> diagnosticsReporterStreamName =
-
metricsConfig.getMetricsSnapshotReporterStream(diagnosticsReporterName);
+
metricsConfig.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS);
if (!diagnosticsReporterStreamName.isPresent()) {
throw new ConfigException(
"Missing required config: " +
String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
- diagnosticsReporterName));
+ MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS));
}
SystemStream diagnosticsSystemStream =
StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
@@ -153,10 +136,10 @@ public class DiagnosticsUtil {
diagnosticsSystemStream, systemProducer,
Duration.ofMillis(new TaskConfig(config).getShutdownMs()),
jobConfig.getAutosizingEnabled(), config);
- diagnosticsManagerReporterPair = Optional.of(new
ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
+ diagnosticsManagerOptional = Optional.of(diagnosticsManager);
}
- return diagnosticsManagerReporterPair;
+ return diagnosticsManagerOptional;
}
public static void createDiagnosticsStream(Config config) {
diff --git
a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
index 4e50efa..55baa13 100644
--- a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
+++ b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
@@ -21,7 +21,6 @@ package org.apache.samza.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.metrics.MetricsReporter;
@@ -38,16 +37,7 @@ public class MetricsReporterLoader {
public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig
metricsConfig, String containerName) {
Map<String, MetricsReporter> metricsReporters = new HashMap<>();
-
- String diagnosticsReporterName =
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
-
- // Exclude creation of diagnostics-reporter, because it is created
manually in SamzaContainer (to allow sharing of
- // sysProducer between reporter and diagnosticsManager
- List<String> metricsReporterNames = metricsConfig.getMetricReporterNames()
- .stream()
- .filter(reporterName -> !reporterName.equals(diagnosticsReporterName))
- .collect(Collectors.toList());
-
+ List<String> metricsReporterNames = metricsConfig.getMetricReporterNames();
for (String metricsReporterName : metricsReporterNames) {
String metricsFactoryClassName =
metricsConfig.getMetricsFactoryClass(metricsReporterName)
.orElseThrow(() -> new SamzaException(
diff --git
a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
index d17dac1..e10c551 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
@@ -22,7 +22,6 @@ package org.apache.samza.util;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -31,8 +30,6 @@ import org.apache.samza.config.SystemConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.junit.Assert;
@@ -48,7 +45,6 @@ import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ReflectionUtil.class})
public class TestDiagnosticsUtil {
-
private static final String STREAM_NAME = "someStreamName";
private static final String JOB_NAME = "someJob";
private static final String JOB_ID = "someId";
@@ -58,27 +54,20 @@ public class TestDiagnosticsUtil {
public static final String SYSTEM_FACTORY =
"com.foo.system.SomeSystemFactory";
@Test
- public void testBuildDiagnosticsManagerReturnsConfiguredReporter() {
+ public void testBuildDiagnosticsManager() {
Config config = new MapConfig(buildTestConfigs());
JobModel mockJobModel = mock(JobModel.class);
SystemFactory systemFactory = mock(SystemFactory.class);
SystemProducer mockProducer = mock(SystemProducer.class);
- MetricsReporterFactory metricsReporterFactory =
mock(MetricsReporterFactory.class);
- MetricsSnapshotReporter mockReporter = mock(MetricsSnapshotReporter.class);
-
when(systemFactory.getProducer(anyString(), any(Config.class),
any(MetricsRegistry.class), anyString())).thenReturn(mockProducer);
- when(metricsReporterFactory.getMetricsReporter(anyString(), anyString(),
any(Config.class))).thenReturn(
- mockReporter);
PowerMockito.mockStatic(ReflectionUtil.class);
- when(ReflectionUtil.getObj(REPORTER_FACTORY,
MetricsReporterFactory.class)).thenReturn(metricsReporterFactory);
when(ReflectionUtil.getObj(SYSTEM_FACTORY,
SystemFactory.class)).thenReturn(systemFactory);
- Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>>
managerReporterPair =
+ Optional<DiagnosticsManager> diagnosticsManager =
DiagnosticsUtil.buildDiagnosticsManager(JOB_NAME, JOB_ID,
mockJobModel, CONTAINER_ID, Optional.of(ENV_ID),
config);
- Assert.assertTrue(managerReporterPair.isPresent());
- Assert.assertEquals(mockReporter, managerReporterPair.get().getValue());
+ Assert.assertTrue(diagnosticsManager.isPresent());
}
private Map<String, String> buildTestConfigs() {