Repository: samza Updated Branches: refs/heads/master cc1ca2c9d -> e6049b7dd
SAMZA-1457: Set retention for internal streams for Batch application For intermediate streams, checkpoint and changelog, we need to set a short retention period for batch. Author: Xinyu Liu <[email protected]> Author: xiliu <[email protected]> Reviewers: Jagadish V <[email protected]> Closes #328 from xinyuiscool/SAMZA-1457 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6049b7d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6049b7d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6049b7d Branch: refs/heads/master Commit: e6049b7ddca385b5a1a363020b9d711f4a0b93b6 Parents: cc1ca2c Author: Xinyu Liu <[email protected]> Authored: Tue Oct 24 18:20:32 2017 -0700 Committer: xiliu <[email protected]> Committed: Tue Oct 24 18:20:32 2017 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaStreamSpec.java | 9 +++ .../kafka/KafkaCheckpointManagerFactory.scala | 21 ++----- .../org/apache/samza/config/KafkaConfig.scala | 37 +++++++++++- .../samza/system/kafka/KafkaSystemAdmin.scala | 9 ++- .../samza/system/kafka/KafkaSystemFactory.scala | 22 ++++++- .../TestKafkaCheckpointManagerFactory.java | 51 +++++++++++++++++ .../kafka/TestKafkaSystemFactoryJava.java | 60 ++++++++++++++++++++ .../kafka/TestKafkaCheckpointManager.scala | 6 +- .../apache/samza/config/TestKafkaConfig.scala | 13 +++++ 9 files changed, 204 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index a49c022..de7b7b0 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -167,6 +167,15 @@ public class KafkaStreamSpec extends StreamSpec { return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), getProperties()); } + /** + * Make a copy of the spec with new properties + * @param properties properties of the Kafka stream + * @return new instance of {@link KafkaStreamSpec} + */ + public KafkaStreamSpec copyWithProperties(Properties properties) { + return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), getReplicationFactor(), properties); + } + public int getReplicationFactor() { return replicationFactor; } http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 402248f..8ac347c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -21,11 +21,13 @@ package org.apache.samza.checkpoint.kafka import java.util.Properties +import com.google.common.collect.ImmutableMap import kafka.utils.ZkUtils import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory} +import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.{Config, KafkaConfig, SystemConfig} +import org.apache.samza.config._ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemFactory import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _} @@ -38,21 +40,6 @@ object KafkaCheckpointManagerFactory { // on log compacted topics. Details in SAMZA-586. "compression.type" -> "none") - // Set the checkpoint topic configs to have a very small segment size and - // enable log compaction. This keeps job startup time small since there - // are fewer useless (overwritten) messages to read from the checkpoint - // topic. - def getCheckpointTopicProperties(config: Config) = { - val segmentBytes: Int = if (config == null) { - KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES - } else { - new KafkaConfig(config).getCheckpointSegmentBytes() - } - (new Properties /: Map( - "cleanup.policy" -> "compact", - "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props } - } - /** * Get the checkpoint system and system factory from the configuration * @param config @@ -113,6 +100,6 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin connectZk, config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key config.failOnCheckpointValidation, - checkpointTopicProperties = getCheckpointTopicProperties(config)) + checkpointTopicProperties = new KafkaConfig(config).getCheckpointTopicProperties()) } } http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 9c33b16..1c1cdbd 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -21,13 +21,16 @@ package org.apache.samza.config import java.util +import java.util.concurrent.TimeUnit import java.util.regex.Pattern import java.util.{Properties, UUID} +import com.google.common.collect.ImmutableMap import kafka.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.SamzaException +import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.util.{Logging, Util} @@ -76,6 +79,8 @@ object KafkaConfig { */ val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes" + val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1) + implicit def Config2Kafka(config: Config) = new KafkaConfig(config) } @@ -243,13 +248,43 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getChangelogKafkaProperties(name: String) = { val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) val kafkaChangeLogProperties = new Properties - kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + + val appConfig = new ApplicationConfig(config) + if (appConfig.getAppMode == ApplicationMode.STREAM) { + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + } else{ + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete") + kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) + } kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } kafkaChangeLogProperties } + // Set the checkpoint topic configs to have a very small segment size and + // enable log compaction. This keeps job startup time small since there + // are fewer useless (overwritten) messages to read from the checkpoint + // topic. + def getCheckpointTopicProperties() = { + val segmentBytes: Int = getCheckpointSegmentBytes() + val appConfig = new ApplicationConfig(config) + val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM + val properties = new Properties() + + if (isStreamMode) { + properties.putAll(ImmutableMap.of( + "cleanup.policy", "compact", + "segment.bytes", String.valueOf(segmentBytes))) + } else { + properties.putAll(ImmutableMap.of( + "cleanup.policy", "compact,delete", + "retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH), + "segment.bytes", String.valueOf(segmentBytes))) + } + properties + } + // kafka config def getKafkaSystemConsumerConfig( systemName: String, clientId: String, http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index a2256c8..013b292 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -139,7 +139,12 @@ class KafkaSystemAdmin( * Replication factor for the Changelog topic in kafka * Kafka properties to be used during the Changelog topic creation */ - topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends ExtendedSystemAdmin with Logging { + topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo](), + + /** + * Kafka properties to be used during the intermediate topic creation + */ + intermediateStreamProperties: Map[String, Properties] = Map()) extends ExtendedSystemAdmin with Logging { import KafkaSystemAdmin._ @@ -450,6 +455,8 @@ class KafkaSystemAdmin( new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, topicMeta.kafkaProps) } else if (spec.isCoordinatorStream){ new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties) + } else if (intermediateStreamProperties.contains(spec.getId)) { + KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId)) } else { KafkaStreamSpec.fromSpec(spec) } http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index e0b5540..a480042 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -22,8 +22,9 @@ package org.apache.samza.system.kafka import java.util.Properties import kafka.utils.ZkUtils import org.apache.samza.SamzaException +import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore} -import org.apache.samza.config.Config +import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, Config} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.config.TaskConfig.Config2Task @@ -125,6 +126,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { (topicName, changelogInfo) }}.toMap + + val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config) new KafkaSystemAdmin( systemName, bootstrapServers, @@ -134,7 +137,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { timeout, bufferSize, clientId, - topicMetaInformation) + topicMetaInformation, + intermediateStreamProperties) } def getCoordinatorTopicProperties(config: Config) = { @@ -144,4 +148,18 @@ class KafkaSystemFactory extends SystemFactory with Logging { "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props } } + def getIntermediateStreamProperties(config : Config): Map[String, Properties] = { + val appConfig = new ApplicationConfig(config) + if (appConfig.getAppMode == ApplicationMode.BATCH) { + val streamConfig = new StreamConfig(config) + streamConfig.getStreamIds().filter(streamConfig.getIsIntermediate(_)).map(streamId => { + val properties = new Properties() + properties.putAll(streamConfig.getStreamProperties(streamId)) + properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) + (streamId, properties) + }).toMap + } else { + Map() + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java new file mode 100644 index 0000000..1846ea8 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.MapConfig; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class TestKafkaCheckpointManagerFactory { + + @Test + public void testGetCheckpointTopicProperties() { + Map<String, String> config = new HashMap<>(); + Properties properties = new KafkaConfig(new MapConfig(config)).getCheckpointTopicProperties(); + + assertEquals(properties.getProperty("cleanup.policy"), "compact"); + assertEquals(properties.getProperty("segment.bytes"), String.valueOf(KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES())); + + config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name()); + properties = new KafkaConfig(new MapConfig(config)).getCheckpointTopicProperties(); + + assertEquals(properties.getProperty("cleanup.policy"), "compact,delete"); + assertEquals(properties.getProperty("segment.bytes"), String.valueOf(KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES())); + assertEquals(properties.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH())); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java new file mode 100644 index 0000000..a886bab --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.MapConfig; +import org.junit.Test; +import scala.collection.JavaConversions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestKafkaSystemFactoryJava { + + @Test + public void testGetIntermediateStreamProperties() { + Map<String, String> config = new HashMap<>(); + KafkaSystemFactory factory = new KafkaSystemFactory(); + Map<String, Properties> properties = JavaConversions.mapAsJavaMap( + factory.getIntermediateStreamProperties(new MapConfig(config))); + assertTrue(properties.isEmpty()); + + // no properties for stream + config.put("streams.test.samza.intermediate", "true"); + config.put("streams.test.compression.type", "lz4"); //some random config + properties = JavaConversions.mapAsJavaMap( + factory.getIntermediateStreamProperties(new MapConfig(config))); + assertTrue(properties.isEmpty()); + + config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name()); + properties = JavaConversions.mapAsJavaMap( + factory.getIntermediateStreamProperties(new MapConfig(config))); + assertTrue(!properties.isEmpty()); + Properties prop = properties.get("test"); + assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH())); + assertEquals(prop.getProperty("compression.type"), "lz4"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 3337a36..eba2033 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -56,7 +56,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val checkpointTopic = "checkpoint-topic" val serdeCheckpointTopic = "checkpoint-topic-invalid-serde" - val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null) + val checkpointTopicConfig = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties() val zkSecure = JaasUtils.isZkSecurityEnabled() @@ -307,7 +307,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, - checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava))) + checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties()) // CheckpointManager with a specific checkpoint topic private def getKafkaCheckpointManager = getKafkaCheckpointManagerWithParam(checkpointTopic) @@ -329,7 +329,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, serde = new InvalideSerde(exception), - checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava))) + checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties()) class InvalideSerde(exception: String) extends CheckpointSerde { override def fromBytes(bytes: Array[Byte]): Checkpoint = { http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 0474cbe..19b2cc6 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -138,6 +138,19 @@ class TestKafkaConfig { assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse("")) assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse("")) assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse("")) + + props.setProperty(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name()) + val batchMapConfig = new MapConfig(props.asScala.asJava) + val batchKafkaConfig = new KafkaConfig(batchMapConfig) + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete") + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"), + String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact,delete") + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"), + String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact,delete") + assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("retention.ms"), + String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) } @Test
