Repository: samza
Updated Branches:
  refs/heads/master 4c0e50581 -> 36c4c4b02


Minor: KafkaConfig should treat empty changelog name as no changelog.

If a store changelog stream name is empty, treat is as a non-changelogged store 
instead of throwing an exception.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>

Closes #546 from prateekm/kafka-changelog


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/36c4c4b0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/36c4c4b0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/36c4c4b0

Branch: refs/heads/master
Commit: 36c4c4b02df36027fc3516407085175fb565bd70
Parents: 4c0e505
Author: Prateek Maheshwari <[email protected]>
Authored: Wed Jun 6 11:00:06 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Jun 6 11:00:06 2018 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/config/KafkaConfig.scala    | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/36c4c4b0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 124c85a..07f4710 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -236,10 +236,11 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
       val matcher = pattern.matcher(changelogConfig)
       val storeName = if (matcher.find()) matcher.group(1) else throw new 
SamzaException("Unable to find store name in the changelog configuration: " + 
changelogConfig + " with SystemStream: " + cn)
 
-      val changelogName = 
storageConfig.getChangelogStream(storeName).getOrElse(throw new 
SamzaException("unable to get SystemStream for store:" + changelogConfig));
-      val systemStream = Util.getSystemStreamFromNames(changelogName)
-      val factoryName = 
config.getSystemFactory(systemStream.getSystem).getOrElse(new 
SamzaException("Unable to determine factory for system: " + 
systemStream.getSystem))
-      storeToChangelog += storeName -> systemStream.getStream
+      storageConfig.getChangelogStream(storeName).foreach(changelogName => {
+        val systemStream = Util.getSystemStreamFromNames(changelogName)
+        val factoryName = 
config.getSystemFactory(systemStream.getSystem).getOrElse(new 
SamzaException("Unable to determine factory for system: " + 
systemStream.getSystem))
+        storeToChangelog += storeName -> systemStream.getStream
+      })
     }
     storeToChangelog
   }

Reply via email to