Repository: samza Updated Branches: refs/heads/master a47e8819f -> ec1934ca8
SAMZA-1076; getKafkaChangelogEnabledStores() should use StorageConfig.getChangelo⦠getKafkaChangelogEnabledStores() should use StorageConfig.getChangelogStream to get changelog system.stream Author: Boris Shkolnik <[email protected]> Reviewers: xiliu <[email protected]> Closes #39 from sborya/KafkConfigForChangelogStream Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ec1934ca Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ec1934ca Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ec1934ca Branch: refs/heads/master Commit: ec1934ca895a57bd2199bd0a34413284914718d9 Parents: a47e881 Author: Boris Shkolnik <[email protected]> Authored: Fri Jan 6 12:22:16 2017 -0800 Committer: vjagadish1989 <[email protected]> Committed: Fri Jan 6 12:22:16 2017 -0800 ---------------------------------------------------------------------- .../scala/org/apache/samza/config/KafkaConfig.scala | 13 +++++++++---- .../org/apache/samza/config/TestKafkaConfig.scala | 8 ++++++-- 2 files changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ec1934ca/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 973ab8c..9320cf7 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -121,14 +121,19 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getKafkaChangelogEnabledStores() = { val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala var storeToChangelog = Map[String, String]() - for((changelogConfig, changelogName) <- changelogConfigs){ + val storageConfig = new StorageConfig(config) + val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX) + + for((changelogConfig, cn) <- changelogConfigs){ // Lookup the factory for this particular stream and verify if it's a kafka system + + val matcher = pattern.matcher(changelogConfig) + val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn) + + val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig)); val systemStream = Util.getSystemStreamFromNames(changelogName) val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){ - val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX) - val matcher = pattern.matcher(changelogConfig) - val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + systemStream) storeToChangelog += storeName -> systemStream.getStream } } http://git-wip-us.apache.org/repos/asf/samza/blob/ec1934ca/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 9c9c71e..d626f1c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -121,15 +121,19 @@ class TestKafkaConfig { props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory") props.setProperty("stores.test1.changelog", "kafka.mychangelog1") props.setProperty("stores.test2.changelog", "kafka.mychangelog2") + props.setProperty("job.changelog.system", "kafka") + props.setProperty("stores.test3.changelog", "otherstream") props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete") val mapConfig = new MapConfig(props.toMap[String, String]) val kafkaConfig = new KafkaConfig(mapConfig) assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete") assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact") + assertEquals(kafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact") val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores() - assertEquals(storeToChangelog.get("test1").getOrElse(""), "mychangelog1") - assertEquals(storeToChangelog.get("test2").getOrElse(""), "mychangelog2") + assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse("")) + assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse("")) + assertEquals("otherstream", storeToChangelog.get("test3").getOrElse("")) } @Test
