Repository: samza
Updated Branches:
  refs/heads/master a47e8819f -> ec1934ca8


SAMZA-1076; getKafkaChangelogEnabledStores() should use 
StorageConfig.getChangelo…

getKafkaChangelogEnabledStores() should use StorageConfig.getChangelogStream to 
get changelog system.stream

Author: Boris Shkolnik <[email protected]>

Reviewers: xiliu <[email protected]>

Closes #39 from sborya/KafkConfigForChangelogStream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ec1934ca
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ec1934ca
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ec1934ca

Branch: refs/heads/master
Commit: ec1934ca895a57bd2199bd0a34413284914718d9
Parents: a47e881
Author: Boris Shkolnik <[email protected]>
Authored: Fri Jan 6 12:22:16 2017 -0800
Committer: vjagadish1989 <[email protected]>
Committed: Fri Jan 6 12:22:16 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/samza/config/KafkaConfig.scala    | 13 +++++++++----
 .../org/apache/samza/config/TestKafkaConfig.scala      |  8 ++++++--
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ec1934ca/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 973ab8c..9320cf7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -121,14 +121,19 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getKafkaChangelogEnabledStores() = {
     val changelogConfigs = 
config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
     var storeToChangelog = Map[String, String]()
-    for((changelogConfig, changelogName) <- changelogConfigs){
+    val storageConfig = new StorageConfig(config)
+    val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
+
+    for((changelogConfig, cn) <- changelogConfigs){
       // Lookup the factory for this particular stream and verify if it's a 
kafka system
+
+      val matcher = pattern.matcher(changelogConfig)
+      val storeName = if(matcher.find()) matcher.group(1) else throw new 
SamzaException("Unable to find store name in the changelog configuration: " + 
changelogConfig + " with SystemStream: " + cn)
+
+      val changelogName = 
storageConfig.getChangelogStream(storeName).getOrElse(throw new 
SamzaException("unable to get SystemStream for store:" + changelogConfig));
       val systemStream = Util.getSystemStreamFromNames(changelogName)
       val factoryName = 
config.getSystemFactory(systemStream.getSystem).getOrElse(new 
SamzaException("Unable to determine factory for system: " + 
systemStream.getSystem))
       if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
-        val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
-        val matcher = pattern.matcher(changelogConfig)
-        val storeName = if(matcher.find()) matcher.group(1) else throw new 
SamzaException("Unable to find store name in the changelog configuration: " + 
changelogConfig + " with SystemStream: " + systemStream)
         storeToChangelog += storeName -> systemStream.getStream
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/ec1934ca/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 9c9c71e..d626f1c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -121,15 +121,19 @@ class TestKafkaConfig {
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory")
     props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
     props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
+    props.setProperty("job.changelog.system", "kafka")
+    props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
     
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
     
assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"),
 "delete")
     
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"),
 "compact")
+    
assertEquals(kafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"),
 "compact")
     val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores()
-    assertEquals(storeToChangelog.get("test1").getOrElse(""), "mychangelog1")
-    assertEquals(storeToChangelog.get("test2").getOrElse(""), "mychangelog2")
+    assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse(""))
+    assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse(""))
+    assertEquals("otherstream", storeToChangelog.get("test3").getOrElse(""))
   }
   
   @Test

Reply via email to