This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 07cdbdc SAMZA-2227: Fix to ensure that new topics detected by
regex-monitor are picked up during next deployment of app
07cdbdc is described below
commit 07cdbdcdf69833661009e1d5735177667c93607b
Author: Ray Matharu <[email protected]>
AuthorDate: Mon Jun 3 17:28:14 2019 -0700
SAMZA-2227: Fix to ensure that new topics detected by regex-monitor are
picked up during next deployment of app
Author: Ray Matharu <[email protected]>
Reviewers: Bharath Kumarasubramaniam <[email protected]>, Jagadish
Venkatraman <[email protected]>
Closes #1055 from rmatharu/regex-fix
---
.../clustermanager/ClusterBasedJobCoordinator.java | 8 +++--
.../org/apache/samza/job/model/JobModelUtil.java | 7 ++++
.../scala/org/apache/samza/config/JobConfig.scala | 2 ++
.../apache/samza/config/RegExTopicGenerator.scala | 41 +++++++++++-----------
.../apache/samza/coordinator/JobModelManager.scala | 22 +++++++++---
.../apache/samza/system/StreamMetadataCache.scala | 1 -
.../main/scala/org/apache/samza/util/Util.scala | 27 ++++++++------
.../org/apache/samza/config/KafkaConfig.scala | 6 ----
.../samza/config/TestRegExTopicGenerator.scala | 6 ++--
9 files changed, 71 insertions(+), 49 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 9044361..1dfdb04 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -191,7 +191,10 @@ public class ClusterBasedJobCoordinator {
// The systemAdmins should be started before partitionMonitor can be used.
And it should be stopped when this coordinator is stopped.
systemAdmins = new SystemAdmins(config);
partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
- inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins);
+
+ Set<SystemStream> inputSystemStreams =
JobModelUtil.getSystemStreams(jobModelManager.jobModel());
+ inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins,
inputSystemStreams);
+
clusterManagerConfig = new ClusterManagerConfig(config);
isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
@@ -333,7 +336,7 @@ public class ClusterBasedJobCoordinator {
}));
}
- private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config,
SystemAdmins systemAdmins) {
+ private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config,
SystemAdmins systemAdmins, Set<SystemStream> inputStreamsToMonitor) {
// if input regex monitor is not enabled return empty
if (new JobConfig(config).getMonitorRegexEnabled()) {
@@ -342,7 +345,6 @@ public class ClusterBasedJobCoordinator {
}
StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins,
0, SystemClock.instance());
- Set<SystemStream> inputStreamsToMonitor = new
TaskConfigJava(config).getAllInputStreams();
if (inputStreamsToMonitor.isEmpty()) {
throw new SamzaException("Input streams to a job can not be empty.");
}
diff --git
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index de8a349..c236266 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
/**
@@ -55,5 +57,10 @@ public class JobModelUtil {
return taskToSSPs;
}
+ public static Set<SystemStream> getSystemStreams(JobModel jobModel) {
+ Map<TaskName, Set<SystemStreamPartition>> taskToSSPs =
getTaskToSystemStreamPartitions(jobModel);
+ return taskToSSPs.values().stream().flatMap(taskSSPs ->
taskSSPs.stream().map(ssp ->
ssp.getSystemStream())).collect(Collectors.toSet());
+ }
+
private JobModelUtil() { }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index f8f6851..e49134a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -223,6 +223,8 @@ class JobConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
def getRegexResolvedSystem(rewriterName: String) =
getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+ def getRegexResolvedInheritedConfig(rewriterName: String) =
config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".",
true)
+
def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
similarity index 77%
rename from
samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
rename to
samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 654354b..5af55c7 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++
b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -19,17 +19,14 @@
package org.apache.samza.config
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{Config2Kafka}
-import org.apache.samza.config.JobConfig.{REGEX_RESOLVED_STREAMS}
import org.apache.samza.SamzaException
-import org.apache.samza.util.{Logging, StreamUtil}
+import org.apache.samza.config.JobConfig.REGEX_RESOLVED_STREAMS
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.system.{StreamMetadataCache, SystemAdmins,
SystemStream}
+import org.apache.samza.util.{Logging, StreamUtil, SystemClock}
-import collection.JavaConverters._
+import scala.collection.JavaConverters._
import scala.collection._
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.system.SystemStream
/**
* Dynamically determine the Kafka topics to use as input streams to the task
via a regular expression.
@@ -45,19 +42,20 @@ import org.apache.samza.system.SystemStream
* task.inputs=kafka.somestream
* systems.kafka.streams.somestream.foo=bar
*
- * @see samza.config.KafkaConfig.getRegexResolvedStreams
+ * @see samza.config.JobConfig.getRegexResolvedStreams
*
*/
class RegExTopicGenerator extends ConfigRewriter with Logging {
def rewrite(rewriterName: String, config: Config): Config = {
- val regex = config
+ val jobConfig = new JobConfig(config)
+ val regex = jobConfig
.getRegexResolvedStreams(rewriterName)
.getOrElse(throw new SamzaException("No %s defined in config" format
REGEX_RESOLVED_STREAMS))
- val systemName = config
+ val systemName = jobConfig
.getRegexResolvedSystem(rewriterName)
.getOrElse(throw new SamzaException("No system defined for %s." format
rewriterName))
- val topics = getTopicsFromZK(rewriterName, config)
+ val topics = getTopicsFromSystemAdmin(rewriterName, config)
val existingInputStreams = config.getInputStreams
val newInputStreams = new mutable.HashSet[SystemStream]
val keysAndValsToAdd = new mutable.HashMap[String, String]
@@ -79,7 +77,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging
{
newInputStreams.add(m)
// For each topic that matched, generate all the specified configs
- config
+ jobConfig
.getRegexResolvedInheritedConfig(rewriterName)
.asScala
.foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem +
".streams." + m.getStream + "." + kv._1, kv._2))
@@ -97,19 +95,20 @@ class RegExTopicGenerator extends ConfigRewriter with
Logging {
new MapConfig(((keysAndValsToAdd ++ config.asScala) +=
inputStreams).asJava)
}
- def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {
- val systemName = config
+ def getTopicsFromSystemAdmin(rewriterName: String, config: Config):
Seq[String] = {
+ val systemName = new JobConfig(config)
.getRegexResolvedSystem(rewriterName)
.getOrElse(throw new SamzaException("No system defined in config for
rewriter %s." format rewriterName))
- val consumerConfig =
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, "")
- val zkConnect = Option(consumerConfig.getZkConnect)
- .getOrElse(throw new SamzaException("No zookeeper.connect for system %s
defined in config." format systemName))
- val zkClient = new ZkClient(zkConnect, 6000, 6000)
+ var systemStreams = Seq.empty[String]
+ val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName)
try {
- ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics()
+ systemAdmin.start()
+ systemStreams =
+ systemStreams ++
systemAdmin.getAllSystemStreams.asScala.map(systemStream =>
systemStream.getStream).toSeq
} finally {
- zkClient.close()
+ systemAdmin.stop();
}
+ systemStreams
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 135f4c2..54ae3a8 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -22,7 +22,7 @@ package org.apache.samza.coordinator
import java.util
import java.util.concurrent.atomic.AtomicReference
-import org.apache.samza.Partition
+import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.config._
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.TaskConfig.Config2Task
@@ -48,8 +48,8 @@ import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.runtime.LocationId
import org.apache.samza.system._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.Util.rewriteConfig
+import org.apache.samza.util.{Logging, Util}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -267,7 +267,21 @@ object JobModelManager extends Logging {
* @return the input {@see SystemStreamPartition} of the samza job.
*/
private def getInputStreamPartitions(config: Config, streamMetadataCache:
StreamMetadataCache): Set[SystemStreamPartition] = {
- val inputSystemStreams = config.getInputStreams
+
+
+ def invokeRegexTopicRewriter(config: Config): Config = {
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").
+ filter(rewriterName => config.getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config
for config rewriter %s." format rewriterName))
+ .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
+ foldLeft(config)(Util.applyRewriter(_, _))
+ case _ => config
+ }
+ }
+
+ // Expand regex input, if a regex-rewriter is defined in config
+ val inputSystemStreams = invokeRegexTopicRewriter(config).getInputStreams
// Get the set of partitions for each SystemStream from the stream metadata
streamMetadataCache
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index abd6942..565ec88 100644
---
a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++
b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -88,7 +88,6 @@ class StreamMetadataCache (
/**
* Returns the list of System Streams for this system.
* @param systemName
- * @param pattern
*/
def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = {
val systemAdmin = systemAdmins.getSystemAdmin(systemName)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index eba6930..e31d82f 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -148,18 +148,25 @@ object Util extends Logging {
* @return re-written config
*/
def rewriteConfig(config: Config): Config = {
- def rewrite(c: Config, rewriterName: String): Config = {
- val rewriterClassName = config
- .getConfigRewriterClass(rewriterName)
- .getOrElse(throw new SamzaException("Unable to find class config
for config rewriter %s." format rewriterName))
- val rewriter = Util.getObj(rewriterClassName, classOf[ConfigRewriter])
- info("Re-writing config with " + rewriter)
- rewriter.rewrite(rewriterName, c)
- }
-
config.getConfigRewriters match {
- case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_,
_))
+ case Some(rewriters) =>
rewriters.split(",").foldLeft(config)(applyRewriter(_, _))
case _ => config
}
}
+
+ /**
+ * Re-writes configuration using a ConfigRewriter, defined with the given
rewriterName in config.
+ * @param config the config to re-write
+ * @param rewriterName the name of the rewriter to apply
+ * @return the rewritten config
+ */
+ def applyRewriter(config: Config, rewriterName: String): Config = {
+ val rewriterClassName = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for
config rewriter %s." format rewriterName))
+ val rewriter = ReflectionUtil.getObj(this.getClass.getClassLoader,
rewriterClassName, classOf[ConfigRewriter])
+ info("Re-writing config with " + rewriter)
+ rewriter.rewrite(rewriterName, config)
+ }
+
}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index ff71c92..c5eb40c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -238,12 +238,6 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
}.toMap
}
- // regex resolver
- def getRegexResolvedStreams(rewriterName: String) =
getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
-
- def getRegexResolvedSystem(rewriterName: String) =
getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
-
- def getRegexResolvedInheritedConfig(rewriterName: String) =
config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".",
true)
/**
* Gets the replication factor for the changelog topics. Uses the following
precedence.
diff --git
a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 8b19292..16a3985 100644
---
a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++
b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -20,10 +20,8 @@
package org.apache.samza.config
import collection.JavaConverters._
-
import org.junit.Assert._
import org.junit.Test
-
import JobConfig._
class TestRegExTopicGenerator {
@@ -48,7 +46,7 @@ class TestRegExTopicGenerator {
// Don't actually talk to ZooKeeper
val rewriter = new RegExTopicGenerator() {
- override def getTopicsFromZK(rewriterName: String, config: Config):
Seq[String] = List("catdog", "dogtired", "cow", "scaredycat", "Homer",
"crazycat")
+ override def getTopicsFromSystemAdmin(rewriterName: String, config:
Config): Seq[String] = List("catdog", "dogtired", "cow", "scaredycat", "Homer",
"crazycat")
}
val rewritten = rewriter.rewrite(REWRITER_NAME, config)
@@ -79,7 +77,7 @@ class TestRegExTopicGenerator {
getRegexConfigInherited + ".config.zorp" -> "morp")
val rewriter = new RegExTopicGenerator() {
- override def getTopicsFromZK(rewriterName: String, config: Config):
Seq[String] = List("yoyoyo")
+ override def getTopicsFromSystemAdmin(rewriterName: String, config:
Config): Seq[String] = List("yoyoyo")
}
val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map.asJava))