Repository: samza Updated Branches: refs/heads/master e94abca72 -> 1b9391b9e
ChangeLog should not require KafkaSystemAdmin Author: Boris S <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #325 from sborya/ChangeLogRequireKafkaSystemAdmin Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1b9391b9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1b9391b9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1b9391b9 Branch: refs/heads/master Commit: 1b9391b9e23a38b898590148dfb3b1e503bdd447 Parents: e94abca Author: Boris S <[email protected]> Authored: Fri Oct 13 15:04:22 2017 -0700 Committer: Boris S <[email protected]> Committed: Fri Oct 13 15:04:22 2017 -0700 ---------------------------------------------------------------------- .../scala/org/apache/samza/storage/TaskStorageManager.scala | 3 ++- .../src/main/scala/org/apache/samza/config/KafkaConfig.scala | 5 +---- .../test/scala/org/apache/samza/config/TestKafkaConfig.scala | 8 ++++++++ 3 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 0879e9a..c8c935a 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -205,13 +205,14 @@ class TaskStorageManager( } private def validateChangelogStreams() = { - info("Validating change log streams") + info("Validating change log streams: " + changeLogSystemStreams) for ((storeName, systemStream) <- changeLogSystemStreams) { val systemAdmin = systemAdmins .getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions) + systemAdmin.validateStream(changelogSpec) } http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index d926dcb..9c33b16 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.SamzaException import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.system.kafka.KafkaSystemFactory import org.apache.samza.util.{Logging, Util} import scala.collection.JavaConverters._ @@ -235,9 +234,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig)); val systemStream = Util.getSystemStreamFromNames(changelogName) val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) - if (classOf[KafkaSystemFactory].getCanonicalName == factoryName) { - storeToChangelog += storeName -> systemStream.getStream - } + storeToChangelog += storeName -> systemStream.getStream } storeToChangelog } http://git-wip-us.apache.org/repos/asf/samza/blob/1b9391b9/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 9d1e99b..0474cbe 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -130,6 +130,14 @@ class TestKafkaConfig { assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse("")) assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse("")) assertEquals("otherstream", storeToChangelog.get("test3").getOrElse("")) + + props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory") + val mapConfig1 = new MapConfig(props.asScala.asJava) + val kafkaConfig1 = new KafkaConfig(mapConfig) + val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores() + assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse("")) + assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse("")) + assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse("")) } @Test
