Repository: samza Updated Branches: refs/heads/master 4c0e50581 -> 36c4c4b02
Minor: KafkaConfig should treat empty changelog name as no changelog. If a store changelog stream name is empty, treat is as a non-changelogged store instead of throwing an exception. Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]> Closes #546 from prateekm/kafka-changelog Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/36c4c4b0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/36c4c4b0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/36c4c4b0 Branch: refs/heads/master Commit: 36c4c4b02df36027fc3516407085175fb565bd70 Parents: 4c0e505 Author: Prateek Maheshwari <[email protected]> Authored: Wed Jun 6 11:00:06 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Jun 6 11:00:06 2018 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/config/KafkaConfig.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/36c4c4b0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 124c85a..07f4710 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -236,10 +236,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val matcher = pattern.matcher(changelogConfig) val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn) - val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig)); - val systemStream = Util.getSystemStreamFromNames(changelogName) - val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) - storeToChangelog += storeName -> systemStream.getStream + storageConfig.getChangelogStream(storeName).foreach(changelogName => { + val systemStream = Util.getSystemStreamFromNames(changelogName) + val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) + storeToChangelog += storeName -> systemStream.getStream + }) } storeToChangelog }
