Repository: samza Updated Branches: refs/heads/master 531b35e9f -> aac736060
SAMZA-1929: Automatically adjust changelog topic level settings, when RocksDB TTL is set When TTL is set on RocksDB, the default usage of log compacted topic for changelog isn't appropriate. Ideally, it should match the behavior with a TTL. So we would adjust changelog topic setting, when 1) TTL is set for RocksDB and 2) Changelog is enabled 3) User has not set them And we would 1) Disable log compaction on Kafka changelog topic 2) Set topic TTL to be slightly larger than RocksDB TTL User should be able to override this behavior through TableDescriptor.withConfig() Author: Wei Song <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #682 from weisong44/SAMZA-1929 and squashes the following commits: bedfd280 [Wei Song] Merge branch 'master' into SAMZA-1929 097958c8 [Wei Song] Merge remote-tracking branch 'upstream/master' 9c35a0e1 [Wei Song] Updated comments to indicate cleanup.policy=compact,delete is available in 0.11.1.57 and later, for now use delete 9c7410cc [Wei Song] Merge branch 'master' into SAMZA-1929 05822f0a [Wei Song] Merge remote-tracking branch 'upstream/master' dbda1c17 [Wei Song] SAMZA-1929: Automatically adjust changelog topic level settings, when RocksDB TTL is set f7480505 [Wei Song] Merge remote-tracking branch 'upstream/master' 7706ab1f [Wei Song] Merge remote-tracking branch 'upstream/master' f5731b10 [Wei Song] Merge remote-tracking branch 'upstream/master' 1e5de45a [Wei Song] Merge remote-tracking branch 'upstream/master' c85604e0 [Wei Song] Merge remote-tracking branch 'upstream/master' 242d8442 [Wei Song] Merge remote-tracking branch 'upstream/master' ec7d8409 [Wei Song] Merge remote-tracking branch 'upstream/master' e19b4dc9 [Wei Song] Merge remote-tracking branch 'upstream/master' 8ee78441 [Wei Song] Merge remote-tracking branch 'upstream/master' 1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master' a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master' 41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master' 239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master' eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master' 51562391 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/aac73606 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/aac73606 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/aac73606 Branch: refs/heads/master Commit: aac73606096f39eaa6041d82617f114bad0df515 Parents: 531b35e Author: Wei Song <[email protected]> Authored: Fri Oct 5 15:23:37 2018 -0700 Committer: Wei Song <[email protected]> Committed: Fri Oct 5 15:23:37 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/KafkaConfig.scala | 25 +++++++++++++------- .../apache/samza/config/TestKafkaConfig.scala | 18 ++++++++++++-- 2 files changed, 33 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/aac73606/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index ef43e72..e5cca36 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -251,15 +251,24 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val kafkaChangeLogProperties = new Properties val appConfig = new ApplicationConfig(config) - // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.3, + // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57, // 1.0.2, or 1.1.0 (see KAFKA-6568) - // if (appConfig.getAppMode == ApplicationMode.STREAM) { - // kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") - // } else{ - // kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete") - // kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) - // } - kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + + // Adjust changelog topic setting, when TTL is set on a RocksDB store + // - Disable log compaction on Kafka changelog topic + // - Set topic TTL to be the same as RocksDB TTL + Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { + case Some(rocksDbTtl) => + if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { + kafkaChangeLogProperties.setProperty("cleanup.policy", "delete") + if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) { + kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl)) + } + } + case _ => + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + } + kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } http://git-wip-us.apache.org/repos/asf/samza/blob/aac73606/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index a4fe686..b8467b8 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -120,6 +120,11 @@ class TestKafkaConfig { props.setProperty("job.changelog.system", "kafka") props.setProperty("stores.test3.changelog", "otherstream") props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete") + props.setProperty("stores.test4.rocksdb.ttl.ms", "3600") + props.setProperty("stores.test5.rocksdb.ttl.ms", "3600") + props.setProperty("stores.test5.changelog.kafka.retention.ms", "1000") + props.setProperty("stores.test6.rocksdb.ttl.ms", "3600") + props.setProperty("stores.test6.changelog.kafka.cleanup.policy", "compact") val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) @@ -130,15 +135,24 @@ class TestKafkaConfig { assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse("")) assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse("")) assertEquals("otherstream", storeToChangelog.get("test3").getOrElse("")) + assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms")) + assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms")) props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory") - val mapConfig1 = new MapConfig(props.asScala.asJava) - val kafkaConfig1 = new KafkaConfig(mapConfig) val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores() assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse("")) assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse("")) assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse("")) + assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("cleanup.policy"), "delete") + assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("retention.ms"), "3600") + + assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("cleanup.policy"), "delete") + assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("retention.ms"), "1000") + + assertEquals(kafkaConfig.getChangelogKafkaProperties("test6").getProperty("cleanup.policy"), "compact") + assertNull(kafkaConfig.getChangelogKafkaProperties("test6").getProperty("retention.ms")) + props.setProperty(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name()) val batchMapConfig = new MapConfig(props.asScala.asJava) val batchKafkaConfig = new KafkaConfig(batchMapConfig)
