This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 27b1495  SAMZA-2672: Extract creation of stream monitors into separate 
classes for reuse (#1520)
27b1495 is described below

commit 27b1495589e992cf1606ebfc2e34699077d60b73
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Aug 30 14:26:40 2021 -0700

    SAMZA-2672: Extract creation of stream monitors into separate classes for 
reuse (#1520)
    
    Changes:
    1. Extract construction logic into factory classes for 
StreamPartitionCountMonitor and StreamRegexMonitor.
    2. Update job coordinator classes to use the factories.
---
 .../clustermanager/ClusterBasedJobCoordinator.java | 99 +++++++---------------
 .../coordinator/StreamPartitionCountMonitor.java   |  4 +
 .../StreamPartitionCountMonitorFactory.java        | 55 ++++++++++++
 .../coordinator/StreamRegexMonitorFactory.java     | 91 ++++++++++++++++++++
 .../java/org/apache/samza/zk/ZkJobCoordinator.java | 11 +--
 .../org/apache/samza/zk/TestZkJobCoordinator.java  |  2 +-
 6 files changed, 182 insertions(+), 80 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b62678b..286a040 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -18,13 +18,11 @@
  */
 package org.apache.samza.clustermanager;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
@@ -33,7 +31,6 @@ import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
@@ -43,7 +40,9 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.PartitionChangeException;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
 import org.apache.samza.coordinator.StreamRegexMonitor;
+import org.apache.samza.coordinator.StreamRegexMonitorFactory;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
@@ -63,7 +62,6 @@ import org.apache.samza.storage.StateBackendAdmin;
 import org.apache.samza.storage.StateBackendFactory;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.SystemClock;
@@ -146,7 +144,7 @@ public class ClusterBasedJobCoordinator {
   /**
    * The input topic partition count monitor
    */
-  private final Optional<StreamPartitionCountMonitor> partitionMonitor;
+  private final StreamPartitionCountMonitor partitionMonitor;
 
   /**
    * The input stream regex monitor
@@ -207,10 +205,9 @@ public class ClusterBasedJobCoordinator {
     this.state = new SamzaApplicationState(jobModelManager);
     // The systemAdmins should be started before partitionMonitor can be used. 
And it should be stopped when this coordinator is stopped.
     this.systemAdmins = new SystemAdmins(config, 
this.getClass().getSimpleName());
+    // SAMZA-2684: should these monitors be using the config from the job 
model calculation?
     this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
-
-    Set<SystemStream> inputSystemStreams = 
JobModelUtil.getSystemStreams(jobModelManager.jobModel());
-    this.inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, 
inputSystemStreams);
+    this.inputStreamRegexMonitor = 
getInputRegexMonitor(jobModelManager.jobModel(), config, systemAdmins);
 
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
     this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
@@ -312,7 +309,7 @@ public class ClusterBasedJobCoordinator {
 
       containerProcessManager.start();
       systemAdmins.start();
-      partitionMonitor.ifPresent(StreamPartitionCountMonitor::start);
+      partitionMonitor.start();
       inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::start);
 
       // containerPlacementRequestAllocator thread has to start after the cpm 
is started
@@ -380,7 +377,7 @@ public class ClusterBasedJobCoordinator {
    */
   private void onShutDown() {
     try {
-      partitionMonitor.ifPresent(StreamPartitionCountMonitor::stop);
+      partitionMonitor.stop();
       inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop);
       systemAdmins.stop();
       shutDowncontainerPlacementRequestAllocatorAndUtils();
@@ -415,70 +412,32 @@ public class ClusterBasedJobCoordinator {
     }
   }
 
-  private Optional<StreamPartitionCountMonitor> 
getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
-    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 
0, SystemClock.instance());
-    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfig(config).getAllInputStreams();
-    if (inputStreamsToMonitor.isEmpty()) {
-      throw new SamzaException("Input streams to a job can not be empty.");
-    }
-
-    return Optional.of(new StreamPartitionCountMonitor(inputStreamsToMonitor, 
streamMetadata, metrics,
-        new JobConfig(config).getMonitorPartitionChangeFrequency(), 
streamsChanged -> {
+  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, 
SystemAdmins systemAdmins) {
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    return new StreamPartitionCountMonitorFactory(streamMetadataCache, 
this.metrics).build(config, streamsChanged -> {
       // Fail the jobs with durable state store. Otherwise, application 
state.status remains UNDEFINED s.t. YARN job will be restarted
       if (hasDurableStores) {
-        LOG.error("Input topic partition count changed in a job with durable 
state. Failing the job. " +
-            "Changed topics: {}", streamsChanged.toString());
+        LOG.error(
+            "Input topic partition count changed in a job with durable state. 
Failing the job. Changed topics: {}",
+            streamsChanged.toString());
         state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
       }
-      coordinatorException = new PartitionChangeException("Input topic 
partition count changes detected for topics: " + streamsChanged.toString());
-    }));
+      coordinatorException = new PartitionChangeException(
+          "Input topic partition count changes detected for topics: " + 
streamsChanged.toString());
+    });
   }
 
-  private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, 
SystemAdmins systemAdmins, Set<SystemStream> inputStreamsToMonitor) {
-    JobConfig jobConfig = new JobConfig(config);
-
-    // if input regex monitor is not enabled return empty
-    if (jobConfig.getMonitorRegexDisabled()) {
-      LOG.info("StreamRegexMonitor is disabled.");
-      return Optional.empty();
-    }
-
-    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 
0, SystemClock.instance());
-    if (inputStreamsToMonitor.isEmpty()) {
-      throw new SamzaException("Input streams to a job can not be empty.");
-    }
-
-    // First list all rewriters
-    Optional<String> rewritersList = jobConfig.getConfigRewriters();
-
-    // if no rewriter is defined, there is nothing to monitor
-    if (!rewritersList.isPresent()) {
-      LOG.warn("No config rewriters are defined. No StreamRegexMonitor 
created.");
-      return Optional.empty();
-    }
-
-    // Compile a map of each input-system to its corresponding 
input-monitor-regex patterns
-    Map<String, Pattern> inputRegexesToMonitor = 
jobConfig.getMonitorRegexPatternMap(rewritersList.get());
-
-    // if there are no regexes to monitor
-    if (inputRegexesToMonitor.isEmpty()) {
-      LOG.info("No input regexes are defined. No StreamRegexMonitor created.");
-      return Optional.empty();
-    }
-
-    return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor, 
inputRegexesToMonitor, streamMetadata, metrics,
-        jobConfig.getMonitorRegexFrequency(), new 
StreamRegexMonitor.Callback() {
-          @Override
-          public void onInputStreamsChanged(Set<SystemStream> initialInputSet, 
Set<SystemStream> newInputStreams,
-              Map<String, Pattern> regexesMonitored) {
-            if (hasDurableStores) {
-              LOG.error("New input system-streams discovered. Failing the job. 
New input streams: {}" +
-                  " Existing input streams: {}", newInputStreams, 
inputStreamsToMonitor);
-              state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
-            }
-            coordinatorException = new InputStreamsDiscoveredException("New 
input streams discovered: " + newInputStreams);
-          }
-        }));
+  private Optional<StreamRegexMonitor> getInputRegexMonitor(JobModel jobModel, 
Config config, SystemAdmins systemAdmins) {
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    return new StreamRegexMonitorFactory(streamMetadataCache, 
this.metrics).build(jobModel, config,
+      (initialInputSet, newInputStreams, regexesMonitored) -> {
+        if (hasDurableStores) {
+          LOG.error("New input system-streams discovered. Failing the job. New 
input streams: {}"
+              + " Existing input streams: {}", newInputStreams, 
initialInputSet);
+          state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+        }
+        coordinatorException = new InputStreamsDiscoveredException("New input 
streams discovered: " + newInputStreams);
+      });
   }
 
   // The following two methods are package-private and for testing only
@@ -490,7 +449,7 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   StreamPartitionCountMonitor getPartitionMonitor() {
-    return partitionMonitor.get();
+    return partitionMonitor;
   }
 
   @VisibleForTesting
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index ee4490f..10efafc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -52,6 +52,10 @@ public class StreamPartitionCountMonitor {
   private final StreamMetadataCache metadataCache;
   private final int monitorPeriodMs;
   private final Map<SystemStream, Gauge<Integer>> gauges;
+  /**
+   * TODO get the initial metadata from the job model instead of querying it, 
so that there isn't a chance that the
+   * initial metadata has a mismatch with the job model
+   */
   private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
   private final Callback callbackMethod;
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitorFactory.java
new file mode 100644
index 0000000..4801b95
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Set;
+import com.google.common.base.Preconditions;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * Common place for reading configs and wiring {@link 
StreamPartitionCountMonitor}.
+ */
+public class StreamPartitionCountMonitorFactory {
+  private final StreamMetadataCache streamMetadataCache;
+  private final MetricsRegistry metrics;
+
+  public StreamPartitionCountMonitorFactory(StreamMetadataCache 
streamMetadataCache, MetricsRegistry metrics) {
+    this.streamMetadataCache = Preconditions.checkNotNull(streamMetadataCache);
+    this.metrics = Preconditions.checkNotNull(metrics);
+  }
+
+  /**
+   * Build {@link StreamPartitionCountMonitor} for input streams.
+   */
+  public StreamPartitionCountMonitor build(Config config, 
StreamPartitionCountMonitor.Callback callback) {
+    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfig(config).getAllInputStreams();
+    if (inputStreamsToMonitor.isEmpty()) {
+      throw new SamzaException("Input streams to a job can not be empty.");
+    }
+    return new StreamPartitionCountMonitor(inputStreamsToMonitor, 
this.streamMetadataCache, this.metrics,
+        new JobConfig(config).getMonitorPartitionChangeFrequency(), callback);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitorFactory.java
new file mode 100644
index 0000000..83e13f1
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitorFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Common place for reading configs and wiring {@link StreamRegexMonitor}.
+ */
+public class StreamRegexMonitorFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamRegexMonitorFactory.class);
+
+  private final StreamMetadataCache streamMetadataCache;
+  private final MetricsRegistry metrics;
+
+  public StreamRegexMonitorFactory(StreamMetadataCache streamMetadataCache, 
MetricsRegistry metrics) {
+    this.streamMetadataCache = Preconditions.checkNotNull(streamMetadataCache);
+    this.metrics = Preconditions.checkNotNull(metrics);
+  }
+
+  /**
+   * Build a {@link StreamRegexMonitor} for input streams for the job model.
+   */
+  public Optional<StreamRegexMonitor> build(JobModel jobModel, Config config, 
StreamRegexMonitor.Callback callback) {
+    JobConfig jobConfig = new JobConfig(config);
+
+    // if input regex monitor is not enabled return empty
+    if (jobConfig.getMonitorRegexDisabled()) {
+      LOG.info("StreamRegexMonitor is disabled.");
+      return Optional.empty();
+    }
+
+    Set<SystemStream> inputStreamsToMonitor = 
JobModelUtil.getSystemStreams(jobModel);
+    if (inputStreamsToMonitor.isEmpty()) {
+      throw new SamzaException("Input streams to a job can not be empty.");
+    }
+
+    // First list all rewriters
+    Optional<String> rewritersList = jobConfig.getConfigRewriters();
+
+    // if no rewriter is defined, there is nothing to monitor
+    if (!rewritersList.isPresent()) {
+      LOG.warn("No config rewriters are defined. No StreamRegexMonitor 
created.");
+      return Optional.empty();
+    }
+
+    // Compile a map of each input-system to its corresponding 
input-monitor-regex patterns
+    Map<String, Pattern> inputRegexesToMonitor = 
jobConfig.getMonitorRegexPatternMap(rewritersList.get());
+
+    // if there are no regexes to monitor
+    if (inputRegexesToMonitor.isEmpty()) {
+      LOG.info("No input regexes are defined. No StreamRegexMonitor created.");
+      return Optional.empty();
+    }
+
+    return Optional.of(
+        new StreamRegexMonitor(inputStreamsToMonitor, inputRegexesToMonitor, 
this.streamMetadataCache, this.metrics,
+            jobConfig.getMonitorRegexFrequency(), callback));
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 17a8d5c..f41ee8c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -35,7 +35,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.GrouperMetadata;
@@ -46,6 +45,7 @@ import org.apache.samza.coordinator.JobModelCalculator;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamPartitionCountMonitorFactory;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
@@ -62,7 +62,6 @@ import org.apache.samza.runtime.LocationIdProviderFactory;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.SystemClock;
@@ -404,13 +403,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   StreamPartitionCountMonitor getPartitionCountMonitor() {
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 
0, SystemClock.instance());
-    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfig(config).getAllInputStreams();
-
-    return new StreamPartitionCountMonitor(
-        inputStreamsToMonitor,
-        streamMetadata,
-        metrics.getMetricsRegistry(),
-        new JobConfig(config).getMonitorPartitionChangeFrequency(),
+    return new StreamPartitionCountMonitorFactory(streamMetadata, 
metrics.getMetricsRegistry()).build(config,
       streamsChanged -> {
         if (leaderElector.amILeader()) {
           debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 0, 
this::doOnProcessorChange);
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index ce81070..0ccb8d9 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -285,7 +285,7 @@ public class TestZkJobCoordinator {
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.streamPartitionCountMonitor = monitor;
-    when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
+    doReturn(monitor).when(zkJobCoordinator).getPartitionCountMonitor();
 
     ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new 
LeaderElectorListenerImpl();
 

Reply via email to