This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f9c0e2b SAMZA-2704: Wire in diagnostics reporter for Kubernetes job
coordinator (#1553)
f9c0e2b is described below
commit f9c0e2bb165f3d3cb93820d95a4a3f27920519b0
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Nov 17 12:45:12 2021 -0800
SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator
(#1553)
API changes: N/A
---
.../clustermanager/JobCoordinatorLaunchUtil.java | 8 +--
.../samza/coordinator/CoordinationConstants.java | 5 ++
.../StaticResourceJobCoordinator.java | 57 +++++++++++++++++++---
.../samza/diagnostics/DiagnosticsManager.java | 19 ++++----
.../TestStaticResourceJobCoordinator.java | 49 +++++++++++++++++--
5 files changed, 116 insertions(+), 22 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index 053b913..837f2c6 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
*/
public class JobCoordinatorLaunchUtil {
private static final Logger LOG =
LoggerFactory.getLogger(JobCoordinatorLaunchUtil.class);
- private static final String JOB_COORDINATOR_SOURCE_NAME = "JobCoordinator";
/**
* There is no processor associated with this job coordinator, so adding a
placeholder value.
*/
@@ -115,9 +115,11 @@ public class JobCoordinatorLaunchUtil {
jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER,
finalConfig, metrics,
metadataStore);
Map<String, MetricsReporter> metricsReporters =
- MetricsReporterLoader.getMetricsReporters(new
MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME);
+ MetricsReporterLoader.getMetricsReporters(new
MetricsConfig(finalConfig),
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME);
metricsReporters.values()
- .forEach(metricsReporter ->
metricsReporter.register(JOB_COORDINATOR_SOURCE_NAME, metrics));
+ .forEach(
+ metricsReporter ->
metricsReporter.register(CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
metrics));
metricsReporters.values().forEach(MetricsReporter::start);
CountDownLatch waitForShutdownLatch = new CountDownLatch(1);
jobCoordinator.setListener(new
NoProcessorJobCoordinatorListener(waitForShutdownLatch));
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index 22268a8..64524df 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -36,4 +36,9 @@ public final class CoordinationConstants {
private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT =
YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT =
YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
+
+ /**
+ * Container name to use for job coordinator in components like metrics and
diagnostics.
+ */
+ public static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index 9f4810e..25038e9 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -25,6 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelHelper;
@@ -37,6 +40,7 @@ import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.communication.CoordinatorCommunication;
import org.apache.samza.coordinator.communication.JobInfoServingContext;
import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
@@ -47,6 +51,7 @@ import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.DiagnosticsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,21 +80,27 @@ public class StaticResourceJobCoordinator implements
JobCoordinator {
private final String processorId;
private final Config config;
- private Optional<JobCoordinatorListener> jobCoordinatorListener =
Optional.empty();
+ private volatile Optional<JobCoordinatorListener> jobCoordinatorListener =
Optional.empty();
+
+ /**
+ * {@link DiagnosticsManager} constructed using a {@link JobModel}, so this
can only be constructed within
+ * {@link #start} after the job model is built.
+ */
+ private volatile Optional<DiagnosticsManager> currentDiagnosticsManager =
Optional.empty();
/**
* Job model is calculated during {@link #start()}, so it is not immediately
available.
*/
- private Optional<JobModel> currentJobModel = Optional.empty();
+ private volatile Optional<JobModel> currentJobModel = Optional.empty();
/**
* {@link JobModelMonitors} depend on job model, so they are only available
after {@link #start()}.
*/
- private Optional<JobModelMonitors> currentJobModelMonitors =
Optional.empty();
+ private volatile Optional<JobModelMonitors> currentJobModelMonitors =
Optional.empty();
/**
* Keeps track of if the job coordinator has completed all preparation for
running the job, including
* publishing a new job model and starting the job model monitors.
*/
- private AtomicBoolean jobPreparationComplete = new AtomicBoolean(false);
+ private final AtomicBoolean jobPreparationComplete = new
AtomicBoolean(false);
StaticResourceJobCoordinator(String processorId, JobModelHelper
jobModelHelper,
JobInfoServingContext jobModelServingContext, CoordinatorCommunication
coordinatorCommunication,
@@ -123,6 +134,7 @@ public class StaticResourceJobCoordinator implements
JobCoordinator {
doSetLoggingContextConfig(jobModel.getConfig());
// monitors should be created right after job model is calculated (see
jobModelMonitors() for more details)
JobModelMonitors jobModelMonitors = jobModelMonitors(jobModel);
+ Optional<DiagnosticsManager> diagnosticsManager =
diagnosticsManager(jobModel);
JobCoordinatorMetadata newMetadata =
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel,
jobModel.getConfig());
Set<JobMetadataChange> jobMetadataChanges =
checkForMetadataChanges(newMetadata);
@@ -139,10 +151,14 @@ public class StaticResourceJobCoordinator implements
JobCoordinator {
this.jobRestartSignal.restartJob();
} else {
prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
- this.coordinatorCommunication.start();
+ // save components that depend on job model in order to manage
lifecycle or access later
+ this.currentDiagnosticsManager = diagnosticsManager;
+ this.currentJobModelMonitors = Optional.of(jobModelMonitors);
this.currentJobModel = Optional.of(jobModel);
+ // lifecycle: start components
+ this.coordinatorCommunication.start();
this.jobCoordinatorListener.ifPresent(listener ->
listener.onNewJobModel(this.processorId, jobModel));
- this.currentJobModelMonitors = Optional.of(jobModelMonitors);
+ this.currentDiagnosticsManager.ifPresent(DiagnosticsManager::start);
jobModelMonitors.start();
this.jobPreparationComplete.set(true);
}
@@ -157,6 +173,7 @@ public class StaticResourceJobCoordinator implements
JobCoordinator {
try {
this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onJobModelExpired);
if (this.jobPreparationComplete.get()) {
+
this.currentDiagnosticsManager.ifPresent(StaticResourceJobCoordinator::quietlyStop);
this.currentJobModelMonitors.ifPresent(JobModelMonitors::stop);
this.coordinatorCommunication.stop();
}
@@ -212,6 +229,14 @@ public class StaticResourceJobCoordinator implements
JobCoordinator {
LoggingContextHolder.INSTANCE.setConfig(config);
}
+ private Optional<DiagnosticsManager> diagnosticsManager(JobModel jobModel) {
+ JobConfig jobConfig = new JobConfig(this.config);
+ String jobName = jobConfig.getName().orElseThrow(() -> new
ConfigException("Missing job name"));
+ // TODO SAMZA-2705: construct execEnvContainerId for diagnostics
+ return buildDiagnosticsManager(jobName, jobConfig.getJobId(), jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
Optional.empty(), this.config);
+ }
+
/**
* Run set up steps so that workers can begin processing:
* 1. Persist job coordinator metadata
@@ -234,13 +259,33 @@ public class StaticResourceJobCoordinator implements
JobCoordinator {
}
}
+ /**
+ * Wrapper around {@link MetadataResourceUtil} constructor so it can be
stubbed during testing.
+ */
@VisibleForTesting
MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
return new MetadataResourceUtil(jobModel, this.metrics, this.config);
}
+ /**
+ * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can
be stubbed during testing.
+ */
+ @VisibleForTesting
+ Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
+ String jobId, JobModel jobModel, String containerId, Optional<String>
execEnvContainerId, Config config) {
+ return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel,
containerId, execEnvContainerId, config);
+ }
+
private Set<JobMetadataChange>
checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
JobCoordinatorMetadata previousMetadata =
this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
return
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
}
+
+ private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
+ try {
+ diagnosticsManager.stop();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while stopping diagnostics manager", e);
+ }
+ }
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index 93ca566..4389fcc 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -175,16 +175,17 @@ public class DiagnosticsManager {
}
public void stop() throws InterruptedException {
- scheduler.shutdown();
-
- // Allow any scheduled publishes to finish, and block for termination
- scheduler.awaitTermination(terminationDuration.toMillis(),
TimeUnit.MILLISECONDS);
-
- if (!scheduler.isTerminated()) {
- LOG.warn("Unable to terminate scheduler");
- scheduler.shutdownNow();
+ try {
+ scheduler.shutdown();
+ // Allow any scheduled publishes to finish, and block for termination
+ scheduler.awaitTermination(terminationDuration.toMillis(),
TimeUnit.MILLISECONDS);
+ } finally {
+ if (!scheduler.isTerminated()) {
+ LOG.warn("Unable to terminate scheduler");
+ scheduler.shutdownNow();
+ }
+ this.systemProducer.stop();
}
- this.systemProducer.stop();
}
public void addExceptionEvent(DiagnosticsExceptionEvent
diagnosticsExceptionEvent) {
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 03d5776..9386883 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.coordinator.staticresource;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -28,7 +29,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelHelper;
import org.apache.samza.coordinator.MetadataResourceUtil;
@@ -39,6 +43,7 @@ import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.communication.CoordinatorCommunication;
import org.apache.samza.coordinator.communication.JobInfoServingContext;
import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
@@ -79,6 +84,8 @@ import static org.mockito.Mockito.when;
* actions (trigger callbacks which will change the job model, trigger
shutdown) to check the coordination flow.
*/
public class TestStaticResourceJobCoordinator {
+ private static final String JOB_NAME = "my-samza-job";
+ private static final String JOB_ID = "123";
private static final String PROCESSOR_ID = "samza-job-coordinator";
private static final SystemStream SYSTEM_STREAM = new SystemStream("system",
"stream");
private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
@@ -114,16 +121,18 @@ public class TestStaticResourceJobCoordinator {
@Mock
private SystemAdmins systemAdmins;
@Mock
- private Config config;
- @Mock
private JobCoordinatorListener jobCoordinatorListener;
+ @Mock
+ private DiagnosticsManager diagnosticsManager;
+ private Config config;
private StaticResourceJobCoordinator staticResourceJobCoordinator;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(this.changelogStreamManager.readPartitionMapping()).thenReturn(this.changelogPartitionMapping);
+ this.config = config();
this.staticResourceJobCoordinator =
spy(new StaticResourceJobCoordinator(PROCESSOR_ID,
this.jobModelHelper, this.jobModelServingContext,
this.coordinatorCommunication, this.jobCoordinatorMetadataManager,
this.streamPartitionCountMonitorFactory,
@@ -141,11 +150,13 @@ public class TestStaticResourceJobCoordinator {
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel,
jobModelConfig);
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil,
streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -158,11 +169,13 @@ public class TestStaticResourceJobCoordinator {
StreamPartitionCountMonitor streamPartitionCountMonitor =
setupStreamPartitionCountMonitor(jobModelConfig);
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel,
jobModelConfig);
setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(),
true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil,
streamPartitionCountMonitor,
streamRegexMonitor, null, null);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -175,6 +188,7 @@ public class TestStaticResourceJobCoordinator {
StreamPartitionCountMonitor streamPartitionCountMonitor =
setupStreamPartitionCountMonitor(jobModelConfig);
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel,
jobModelConfig);
setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.of(JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
this.staticResourceJobCoordinator.start();
verifyStartLifecycle();
verify(this.jobRestartSignal).restartJob();
@@ -190,11 +204,13 @@ public class TestStaticResourceJobCoordinator {
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel,
jobModelConfig);
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT,
JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil,
streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -216,6 +232,9 @@ public class TestStaticResourceJobCoordinator {
when(this.streamRegexMonitorFactory.build(any(), any(),
any())).thenReturn(Optional.empty());
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
Optional.empty(), this.config);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
@@ -227,13 +246,14 @@ public class TestStaticResourceJobCoordinator {
}
@Test
- public void testStopAfterStart() {
+ public void testStopAfterStart() throws InterruptedException {
Config jobModelConfig = mock(Config.class);
JobModel jobModel = setupJobModel(jobModelConfig);
StreamPartitionCountMonitor streamPartitionCountMonitor =
setupStreamPartitionCountMonitor(jobModelConfig);
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel,
jobModelConfig);
setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ setUpDiagnosticsManager(jobModel);
metadataResourceUtil(jobModel);
// call start in order to set up monitors
this.staticResourceJobCoordinator.start();
@@ -241,6 +261,7 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator.stop();
verify(this.jobCoordinatorListener).onJobModelExpired();
+ verify(this.diagnosticsManager).stop();
verify(streamPartitionCountMonitor).stop();
verify(streamRegexMonitor).stop();
verify(this.coordinatorCommunication).stop();
@@ -263,6 +284,9 @@ public class TestStaticResourceJobCoordinator {
when(this.streamRegexMonitorFactory.build(any(), any(),
any())).thenReturn(Optional.empty());
setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
Optional.empty(), this.config);
metadataResourceUtil(jobModel);
// call start in order to set up monitors
this.staticResourceJobCoordinator.start();
@@ -299,10 +323,12 @@ public class TestStaticResourceJobCoordinator {
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel,
jobModelConfig);
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT,
JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil,
streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
// call the callback from the monitor
@@ -322,10 +348,12 @@ public class TestStaticResourceJobCoordinator {
callbackArgumentCaptor.capture())).thenReturn(Optional.of(streamRegexMonitor));
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel,
jobModelConfig,
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT,
JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil,
streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
// call the callback from the monitor
@@ -393,6 +421,12 @@ public class TestStaticResourceJobCoordinator {
return metadataResourceUtil;
}
+ private void setUpDiagnosticsManager(JobModel expectedJobModel) {
+
doReturn(Optional.of(this.diagnosticsManager)).when(this.staticResourceJobCoordinator)
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, expectedJobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME,
Optional.empty(), this.config);
+ }
+
private void verifyStartLifecycle() {
verify(this.systemAdmins).start();
verify(this.startpointManager).start();
@@ -437,6 +471,13 @@ public class TestStaticResourceJobCoordinator {
verify(this.staticResourceJobCoordinator,
never()).metadataResourceUtil(any());
verify(this.startpointManager, never()).fanOut(any());
verifyZeroInteractions(this.jobModelServingContext,
this.coordinatorCommunication, streamPartitionCountMonitor,
- streamRegexMonitor, this.jobCoordinatorListener);
+ streamRegexMonitor, this.jobCoordinatorListener,
this.diagnosticsManager);
+ }
+
+ private static Config config() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME, JOB_NAME);
+ configMap.put(JobConfig.JOB_ID, JOB_ID);
+ return new MapConfig(configMap);
}
}
\ No newline at end of file