This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 07cdbdc  SAMZA-2227: Fix to ensure that new topics detected by 
regex-monitor are picked up during next deployment of app
07cdbdc is described below

commit 07cdbdcdf69833661009e1d5735177667c93607b
Author: Ray Matharu <[email protected]>
AuthorDate: Mon Jun 3 17:28:14 2019 -0700

    SAMZA-2227: Fix to ensure that new topics detected by regex-monitor are 
picked up during next deployment of app
    
    Author: Ray Matharu <[email protected]>
    
    Reviewers: Bharath Kumarasubramaniam <[email protected]>, Jagadish 
Venkatraman <[email protected]>
    
    Closes #1055 from rmatharu/regex-fix
---
 .../clustermanager/ClusterBasedJobCoordinator.java |  8 +++--
 .../org/apache/samza/job/model/JobModelUtil.java   |  7 ++++
 .../scala/org/apache/samza/config/JobConfig.scala  |  2 ++
 .../apache/samza/config/RegExTopicGenerator.scala  | 41 +++++++++++-----------
 .../apache/samza/coordinator/JobModelManager.scala | 22 +++++++++---
 .../apache/samza/system/StreamMetadataCache.scala  |  1 -
 .../main/scala/org/apache/samza/util/Util.scala    | 27 ++++++++------
 .../org/apache/samza/config/KafkaConfig.scala      |  6 ----
 .../samza/config/TestRegExTopicGenerator.scala     |  6 ++--
 9 files changed, 71 insertions(+), 49 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 9044361..1dfdb04 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -191,7 +191,10 @@ public class ClusterBasedJobCoordinator {
     // The systemAdmins should be started before partitionMonitor can be used. 
And it should be stopped when this coordinator is stopped.
     systemAdmins = new SystemAdmins(config);
     partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
-    inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins);
+
+    Set<SystemStream> inputSystemStreams = 
JobModelUtil.getSystemStreams(jobModelManager.jobModel());
+    inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins, 
inputSystemStreams);
+
     clusterManagerConfig = new ClusterManagerConfig(config);
     isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
 
@@ -333,7 +336,7 @@ public class ClusterBasedJobCoordinator {
       }));
   }
 
-  private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, 
SystemAdmins systemAdmins) {
+  private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, 
SystemAdmins systemAdmins, Set<SystemStream> inputStreamsToMonitor) {
 
     // if input regex monitor is not enabled return empty
     if (new JobConfig(config).getMonitorRegexEnabled()) {
@@ -342,7 +345,6 @@ public class ClusterBasedJobCoordinator {
     }
 
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 
0, SystemClock.instance());
-    Set<SystemStream> inputStreamsToMonitor = new 
TaskConfigJava(config).getAllInputStreams();
     if (inputStreamsToMonitor.isEmpty()) {
       throw new SamzaException("Input streams to a job can not be empty.");
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java 
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index de8a349..c236266 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 
 /**
@@ -55,5 +57,10 @@ public class JobModelUtil {
     return taskToSSPs;
   }
 
+  public static Set<SystemStream> getSystemStreams(JobModel jobModel) {
+    Map<TaskName, Set<SystemStreamPartition>> taskToSSPs = 
getTaskToSystemStreamPartitions(jobModel);
+    return taskToSSPs.values().stream().flatMap(taskSSPs -> 
taskSSPs.stream().map(ssp -> 
ssp.getSystemStream())).collect(Collectors.toSet());
+  }
+
   private JobModelUtil() { }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index f8f6851..e49134a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -223,6 +223,8 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getRegexResolvedSystem(rewriterName: String) = 
getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
 
+  def getRegexResolvedInheritedConfig(rewriterName: String) = 
config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", 
true)
+
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
similarity index 77%
rename from 
samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
rename to 
samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 654354b..5af55c7 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -19,17 +19,14 @@
 
 package org.apache.samza.config
 
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{Config2Kafka}
-import org.apache.samza.config.JobConfig.{REGEX_RESOLVED_STREAMS}
 import org.apache.samza.SamzaException
-import org.apache.samza.util.{Logging, StreamUtil}
+import org.apache.samza.config.JobConfig.REGEX_RESOLVED_STREAMS
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.system.{StreamMetadataCache, SystemAdmins, 
SystemStream}
+import org.apache.samza.util.{Logging, StreamUtil, SystemClock}
 
-import collection.JavaConverters._
+import scala.collection.JavaConverters._
 import scala.collection._
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.system.SystemStream
 
 /**
  * Dynamically determine the Kafka topics to use as input streams to the task 
via a regular expression.
@@ -45,19 +42,20 @@ import org.apache.samza.system.SystemStream
  * task.inputs=kafka.somestream
  * systems.kafka.streams.somestream.foo=bar
  *
- * @see samza.config.KafkaConfig.getRegexResolvedStreams
+ * @see samza.config.JobConfig.getRegexResolvedStreams
  *
  */
 class RegExTopicGenerator extends ConfigRewriter with Logging {
 
   def rewrite(rewriterName: String, config: Config): Config = {
-    val regex = config
+    val jobConfig = new JobConfig(config)
+    val regex = jobConfig
       .getRegexResolvedStreams(rewriterName)
       .getOrElse(throw new SamzaException("No %s defined in config" format 
REGEX_RESOLVED_STREAMS))
-    val systemName = config
+    val systemName = jobConfig
       .getRegexResolvedSystem(rewriterName)
       .getOrElse(throw new SamzaException("No system defined for %s." format 
rewriterName))
-    val topics = getTopicsFromZK(rewriterName, config)
+    val topics = getTopicsFromSystemAdmin(rewriterName, config)
     val existingInputStreams = config.getInputStreams
     val newInputStreams = new mutable.HashSet[SystemStream]
     val keysAndValsToAdd = new mutable.HashMap[String, String]
@@ -79,7 +77,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging 
{
       newInputStreams.add(m)
 
       // For each topic that matched, generate all the specified configs
-      config
+      jobConfig
         .getRegexResolvedInheritedConfig(rewriterName)
         .asScala
         .foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + 
".streams." + m.getStream + "." + kv._1, kv._2))
@@ -97,19 +95,20 @@ class RegExTopicGenerator extends ConfigRewriter with 
Logging {
     new MapConfig(((keysAndValsToAdd ++ config.asScala) += 
inputStreams).asJava)
   }
 
-  def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {
-    val systemName = config
+  def getTopicsFromSystemAdmin(rewriterName: String, config: Config): 
Seq[String] = {
+    val systemName = new JobConfig(config)
       .getRegexResolvedSystem(rewriterName)
       .getOrElse(throw new SamzaException("No system defined in config for 
rewriter %s." format rewriterName))
-    val consumerConfig = 
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, "")
-    val zkConnect = Option(consumerConfig.getZkConnect)
-      .getOrElse(throw new SamzaException("No zookeeper.connect for system %s 
defined in config." format systemName))
-    val zkClient = new ZkClient(zkConnect, 6000, 6000)
 
+    var systemStreams = Seq.empty[String]
+    val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName)
     try {
-      ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics()
+      systemAdmin.start()
+      systemStreams =
+        systemStreams ++ 
systemAdmin.getAllSystemStreams.asScala.map(systemStream => 
systemStream.getStream).toSeq
     } finally {
-      zkClient.close()
+      systemAdmin.stop();
     }
+    systemStreams
   }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 135f4c2..54ae3a8 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -22,7 +22,7 @@ package org.apache.samza.coordinator
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.Partition
+import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.TaskConfig.Config2Task
@@ -48,8 +48,8 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.runtime.LocationId
 import org.apache.samza.system._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.Util.rewriteConfig
+import org.apache.samza.util.{Logging, Util}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -267,7 +267,21 @@ object JobModelManager extends Logging {
     * @return the input {@see SystemStreamPartition} of the samza job.
     */
   private def getInputStreamPartitions(config: Config, streamMetadataCache: 
StreamMetadataCache): Set[SystemStreamPartition] = {
-    val inputSystemStreams = config.getInputStreams
+
+
+    def invokeRegexTopicRewriter(config: Config): Config = {
+      config.getConfigRewriters match {
+        case Some(rewriters) => rewriters.split(",").
+          filter(rewriterName => config.getConfigRewriterClass(rewriterName)
+            .getOrElse(throw new SamzaException("Unable to find class config 
for config rewriter %s." format rewriterName))
+            .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
+          foldLeft(config)(Util.applyRewriter(_, _))
+        case _ => config
+      }
+    }
+
+    // Expand regex input, if a regex-rewriter is defined in config
+    val inputSystemStreams = invokeRegexTopicRewriter(config).getInputStreams
 
     // Get the set of partitions for each SystemStream from the stream metadata
     streamMetadataCache
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala 
b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index abd6942..565ec88 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -88,7 +88,6 @@ class StreamMetadataCache (
   /**
     * Returns the list of System Streams for this system.
     * @param systemName
-    * @param pattern
     */
   def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = {
     val systemAdmin = systemAdmins.getSystemAdmin(systemName)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index eba6930..e31d82f 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -148,18 +148,25 @@ object Util extends Logging {
    * @return re-written config
    */
   def rewriteConfig(config: Config): Config = {
-    def rewrite(c: Config, rewriterName: String): Config = {
-      val rewriterClassName = config
-              .getConfigRewriterClass(rewriterName)
-              .getOrElse(throw new SamzaException("Unable to find class config 
for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj(rewriterClassName, classOf[ConfigRewriter])
-      info("Re-writing config with " + rewriter)
-      rewriter.rewrite(rewriterName, c)
-    }
-
     config.getConfigRewriters match {
-      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, 
_))
+      case Some(rewriters) => 
rewriters.split(",").foldLeft(config)(applyRewriter(_, _))
       case _ => config
     }
   }
+
+  /**
+    * Re-writes configuration using a ConfigRewriter, defined with the given 
rewriterName in config.
+    * @param config the config to re-write
+    * @param rewriterName the name of the rewriter to apply
+    * @return the rewritten config
+    */
+  def applyRewriter(config: Config, rewriterName: String): Config = {
+    val rewriterClassName = config
+      .getConfigRewriterClass(rewriterName)
+      .getOrElse(throw new SamzaException("Unable to find class config for 
config rewriter %s." format rewriterName))
+    val rewriter = ReflectionUtil.getObj(this.getClass.getClassLoader, 
rewriterClassName, classOf[ConfigRewriter])
+    info("Re-writing config with " + rewriter)
+    rewriter.rewrite(rewriterName, config)
+  }
+
 }
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index ff71c92..c5eb40c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -238,12 +238,6 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
       }.toMap
   }
 
-  // regex resolver
-  def getRegexResolvedStreams(rewriterName: String) = 
getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
-
-  def getRegexResolvedSystem(rewriterName: String) = 
getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
-
-  def getRegexResolvedInheritedConfig(rewriterName: String) = 
config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", 
true)
 
   /**
     * Gets the replication factor for the changelog topics. Uses the following 
precedence.
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 8b19292..16a3985 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -20,10 +20,8 @@
 package org.apache.samza.config
 
 import collection.JavaConverters._
-
 import org.junit.Assert._
 import org.junit.Test
-
 import JobConfig._
 
 class TestRegExTopicGenerator {
@@ -48,7 +46,7 @@ class TestRegExTopicGenerator {
 
     // Don't actually talk to ZooKeeper
     val rewriter = new RegExTopicGenerator() {
-      override def getTopicsFromZK(rewriterName: String, config: Config): 
Seq[String] = List("catdog", "dogtired", "cow", "scaredycat", "Homer", 
"crazycat")
+      override def getTopicsFromSystemAdmin(rewriterName: String, config: 
Config): Seq[String] = List("catdog", "dogtired", "cow", "scaredycat", "Homer", 
"crazycat")
     }
 
     val rewritten = rewriter.rewrite(REWRITER_NAME, config)
@@ -79,7 +77,7 @@ class TestRegExTopicGenerator {
       getRegexConfigInherited + ".config.zorp" -> "morp")
 
     val rewriter = new RegExTopicGenerator() {
-      override def getTopicsFromZK(rewriterName: String, config: Config): 
Seq[String] = List("yoyoyo")
+      override def getTopicsFromSystemAdmin(rewriterName: String, config: 
Config): Seq[String] = List("yoyoyo")
     }
 
     val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map.asJava))

Reply via email to