This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 27b1495 SAMZA-2672: Extract creation of stream monitors into separate
classes for reuse (#1520)
27b1495 is described below
commit 27b1495589e992cf1606ebfc2e34699077d60b73
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Aug 30 14:26:40 2021 -0700
SAMZA-2672: Extract creation of stream monitors into separate classes for
reuse (#1520)
Changes:
1. Extract construction logic into factory classes for
StreamPartitionCountMonitor and StreamRegexMonitor.
2. Update job coordinator classes to use the factories.
---
.../clustermanager/ClusterBasedJobCoordinator.java | 99 +++++++---------------
.../coordinator/StreamPartitionCountMonitor.java | 4 +
.../StreamPartitionCountMonitorFactory.java | 55 ++++++++++++
.../coordinator/StreamRegexMonitorFactory.java | 91 ++++++++++++++++++++
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 11 +--
.../org/apache/samza/zk/TestZkJobCoordinator.java | 2 +-
6 files changed, 182 insertions(+), 80 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b62678b..286a040 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -18,13 +18,11 @@
*/
package org.apache.samza.clustermanager;
-import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.SamzaException;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
@@ -33,7 +31,6 @@ import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.ExecutionContainerIdManager;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
@@ -43,7 +40,9 @@ import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.PartitionChangeException;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
import org.apache.samza.coordinator.StreamRegexMonitor;
+import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
@@ -63,7 +62,6 @@ import org.apache.samza.storage.StateBackendAdmin;
import org.apache.samza.storage.StateBackendFactory;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
@@ -146,7 +144,7 @@ public class ClusterBasedJobCoordinator {
/**
* The input topic partition count monitor
*/
- private final Optional<StreamPartitionCountMonitor> partitionMonitor;
+ private final StreamPartitionCountMonitor partitionMonitor;
/**
* The input stream regex monitor
@@ -207,10 +205,9 @@ public class ClusterBasedJobCoordinator {
this.state = new SamzaApplicationState(jobModelManager);
// The systemAdmins should be started before partitionMonitor can be used.
And it should be stopped when this coordinator is stopped.
this.systemAdmins = new SystemAdmins(config,
this.getClass().getSimpleName());
+ // SAMZA-2684: should these monitors be using the config from the job
model calculation?
this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
-
- Set<SystemStream> inputSystemStreams =
JobModelUtil.getSystemStreams(jobModelManager.jobModel());
- this.inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins,
inputSystemStreams);
+ this.inputStreamRegexMonitor =
getInputRegexMonitor(jobModelManager.jobModel(), config, systemAdmins);
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
@@ -312,7 +309,7 @@ public class ClusterBasedJobCoordinator {
containerProcessManager.start();
systemAdmins.start();
- partitionMonitor.ifPresent(StreamPartitionCountMonitor::start);
+ partitionMonitor.start();
inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::start);
// containerPlacementRequestAllocator thread has to start after the cpm
is started
@@ -380,7 +377,7 @@ public class ClusterBasedJobCoordinator {
*/
private void onShutDown() {
try {
- partitionMonitor.ifPresent(StreamPartitionCountMonitor::stop);
+ partitionMonitor.stop();
inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop);
systemAdmins.stop();
shutDowncontainerPlacementRequestAllocatorAndUtils();
@@ -415,70 +412,32 @@ public class ClusterBasedJobCoordinator {
}
}
- private Optional<StreamPartitionCountMonitor>
getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
- StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins,
0, SystemClock.instance());
- Set<SystemStream> inputStreamsToMonitor = new
TaskConfig(config).getAllInputStreams();
- if (inputStreamsToMonitor.isEmpty()) {
- throw new SamzaException("Input streams to a job can not be empty.");
- }
-
- return Optional.of(new StreamPartitionCountMonitor(inputStreamsToMonitor,
streamMetadata, metrics,
- new JobConfig(config).getMonitorPartitionChangeFrequency(),
streamsChanged -> {
+ private StreamPartitionCountMonitor getPartitionCountMonitor(Config config,
SystemAdmins systemAdmins) {
+ StreamMetadataCache streamMetadataCache = new
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+ return new StreamPartitionCountMonitorFactory(streamMetadataCache,
this.metrics).build(config, streamsChanged -> {
// Fail the jobs with durable state store. Otherwise, application
state.status remains UNDEFINED s.t. YARN job will be restarted
if (hasDurableStores) {
- LOG.error("Input topic partition count changed in a job with durable
state. Failing the job. " +
- "Changed topics: {}", streamsChanged.toString());
+ LOG.error(
+ "Input topic partition count changed in a job with durable state.
Failing the job. Changed topics: {}",
+ streamsChanged.toString());
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
}
- coordinatorException = new PartitionChangeException("Input topic
partition count changes detected for topics: " + streamsChanged.toString());
- }));
+ coordinatorException = new PartitionChangeException(
+ "Input topic partition count changes detected for topics: " +
streamsChanged.toString());
+ });
}
- private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config,
SystemAdmins systemAdmins, Set<SystemStream> inputStreamsToMonitor) {
- JobConfig jobConfig = new JobConfig(config);
-
- // if input regex monitor is not enabled return empty
- if (jobConfig.getMonitorRegexDisabled()) {
- LOG.info("StreamRegexMonitor is disabled.");
- return Optional.empty();
- }
-
- StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins,
0, SystemClock.instance());
- if (inputStreamsToMonitor.isEmpty()) {
- throw new SamzaException("Input streams to a job can not be empty.");
- }
-
- // First list all rewriters
- Optional<String> rewritersList = jobConfig.getConfigRewriters();
-
- // if no rewriter is defined, there is nothing to monitor
- if (!rewritersList.isPresent()) {
- LOG.warn("No config rewriters are defined. No StreamRegexMonitor
created.");
- return Optional.empty();
- }
-
- // Compile a map of each input-system to its corresponding
input-monitor-regex patterns
- Map<String, Pattern> inputRegexesToMonitor =
jobConfig.getMonitorRegexPatternMap(rewritersList.get());
-
- // if there are no regexes to monitor
- if (inputRegexesToMonitor.isEmpty()) {
- LOG.info("No input regexes are defined. No StreamRegexMonitor created.");
- return Optional.empty();
- }
-
- return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor,
inputRegexesToMonitor, streamMetadata, metrics,
- jobConfig.getMonitorRegexFrequency(), new
StreamRegexMonitor.Callback() {
- @Override
- public void onInputStreamsChanged(Set<SystemStream> initialInputSet,
Set<SystemStream> newInputStreams,
- Map<String, Pattern> regexesMonitored) {
- if (hasDurableStores) {
- LOG.error("New input system-streams discovered. Failing the job.
New input streams: {}" +
- " Existing input streams: {}", newInputStreams,
inputStreamsToMonitor);
- state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
- }
- coordinatorException = new InputStreamsDiscoveredException("New
input streams discovered: " + newInputStreams);
- }
- }));
+ private Optional<StreamRegexMonitor> getInputRegexMonitor(JobModel jobModel,
Config config, SystemAdmins systemAdmins) {
+ StreamMetadataCache streamMetadataCache = new
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+ return new StreamRegexMonitorFactory(streamMetadataCache,
this.metrics).build(jobModel, config,
+ (initialInputSet, newInputStreams, regexesMonitored) -> {
+ if (hasDurableStores) {
+ LOG.error("New input system-streams discovered. Failing the job. New
input streams: {}"
+ + " Existing input streams: {}", newInputStreams,
initialInputSet);
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ }
+ coordinatorException = new InputStreamsDiscoveredException("New input
streams discovered: " + newInputStreams);
+ });
}
// The following two methods are package-private and for testing only
@@ -490,7 +449,7 @@ public class ClusterBasedJobCoordinator {
@VisibleForTesting
StreamPartitionCountMonitor getPartitionMonitor() {
- return partitionMonitor.get();
+ return partitionMonitor;
}
@VisibleForTesting
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index ee4490f..10efafc 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -52,6 +52,10 @@ public class StreamPartitionCountMonitor {
private final StreamMetadataCache metadataCache;
private final int monitorPeriodMs;
private final Map<SystemStream, Gauge<Integer>> gauges;
+ /**
+ * TODO get the initial metadata from the job model instead of querying it,
so that there isn't a chance that the
+ * initial metadata has a mismatch with the job model
+ */
private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
private final Callback callbackMethod;
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitorFactory.java
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitorFactory.java
new file mode 100644
index 0000000..4801b95
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Set;
+import com.google.common.base.Preconditions;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Common place for reading configs and wiring {@link
StreamPartitionCountMonitor}.
+ */
+public class StreamPartitionCountMonitorFactory {
+ private final StreamMetadataCache streamMetadataCache;
+ private final MetricsRegistry metrics;
+
+ public StreamPartitionCountMonitorFactory(StreamMetadataCache
streamMetadataCache, MetricsRegistry metrics) {
+ this.streamMetadataCache = Preconditions.checkNotNull(streamMetadataCache);
+ this.metrics = Preconditions.checkNotNull(metrics);
+ }
+
+ /**
+ * Build {@link StreamPartitionCountMonitor} for input streams.
+ */
+ public StreamPartitionCountMonitor build(Config config,
StreamPartitionCountMonitor.Callback callback) {
+ Set<SystemStream> inputStreamsToMonitor = new
TaskConfig(config).getAllInputStreams();
+ if (inputStreamsToMonitor.isEmpty()) {
+ throw new SamzaException("Input streams to a job can not be empty.");
+ }
+ return new StreamPartitionCountMonitor(inputStreamsToMonitor,
this.streamMetadataCache, this.metrics,
+ new JobConfig(config).getMonitorPartitionChangeFrequency(), callback);
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitorFactory.java
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitorFactory.java
new file mode 100644
index 0000000..83e13f1
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitorFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Common place for reading configs and wiring {@link StreamRegexMonitor}.
+ */
+public class StreamRegexMonitorFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamRegexMonitorFactory.class);
+
+ private final StreamMetadataCache streamMetadataCache;
+ private final MetricsRegistry metrics;
+
+ public StreamRegexMonitorFactory(StreamMetadataCache streamMetadataCache,
MetricsRegistry metrics) {
+ this.streamMetadataCache = Preconditions.checkNotNull(streamMetadataCache);
+ this.metrics = Preconditions.checkNotNull(metrics);
+ }
+
+ /**
+ * Build a {@link StreamRegexMonitor} for input streams for the job model.
+ */
+ public Optional<StreamRegexMonitor> build(JobModel jobModel, Config config,
StreamRegexMonitor.Callback callback) {
+ JobConfig jobConfig = new JobConfig(config);
+
+ // if input regex monitor is not enabled return empty
+ if (jobConfig.getMonitorRegexDisabled()) {
+ LOG.info("StreamRegexMonitor is disabled.");
+ return Optional.empty();
+ }
+
+ Set<SystemStream> inputStreamsToMonitor =
JobModelUtil.getSystemStreams(jobModel);
+ if (inputStreamsToMonitor.isEmpty()) {
+ throw new SamzaException("Input streams to a job can not be empty.");
+ }
+
+ // First list all rewriters
+ Optional<String> rewritersList = jobConfig.getConfigRewriters();
+
+ // if no rewriter is defined, there is nothing to monitor
+ if (!rewritersList.isPresent()) {
+ LOG.warn("No config rewriters are defined. No StreamRegexMonitor
created.");
+ return Optional.empty();
+ }
+
+ // Compile a map of each input-system to its corresponding
input-monitor-regex patterns
+ Map<String, Pattern> inputRegexesToMonitor =
jobConfig.getMonitorRegexPatternMap(rewritersList.get());
+
+ // if there are no regexes to monitor
+ if (inputRegexesToMonitor.isEmpty()) {
+ LOG.info("No input regexes are defined. No StreamRegexMonitor created.");
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ new StreamRegexMonitor(inputStreamsToMonitor, inputRegexesToMonitor,
this.streamMetadataCache, this.metrics,
+ jobConfig.getMonitorRegexFrequency(), callback));
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 17a8d5c..f41ee8c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -35,7 +35,6 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GrouperMetadata;
@@ -46,6 +45,7 @@ import org.apache.samza.coordinator.JobModelCalculator;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
@@ -62,7 +62,6 @@ import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
@@ -404,13 +403,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@VisibleForTesting
StreamPartitionCountMonitor getPartitionCountMonitor() {
StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins,
0, SystemClock.instance());
- Set<SystemStream> inputStreamsToMonitor = new
TaskConfig(config).getAllInputStreams();
-
- return new StreamPartitionCountMonitor(
- inputStreamsToMonitor,
- streamMetadata,
- metrics.getMetricsRegistry(),
- new JobConfig(config).getMonitorPartitionChangeFrequency(),
+ return new StreamPartitionCountMonitorFactory(streamMetadata,
metrics.getMetricsRegistry()).build(config,
streamsChanged -> {
if (leaderElector.amILeader()) {
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 0,
this::doOnProcessorChange);
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index ce81070..0ccb8d9 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -285,7 +285,7 @@ public class TestZkJobCoordinator {
StreamPartitionCountMonitor monitor =
Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
- when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
+ doReturn(monitor).when(zkJobCoordinator).getPartitionCountMonitor();
ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new
LeaderElectorListenerImpl();