This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 213466b KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511) 213466b is described below commit 213466b3d4fd21b332c0b6882fea36cf1affef1c Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Apr 2 17:12:32 2019 -0700 KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (#6511) * remove streams overrides on segment.ms and segment.index.bytes * kip comments --- .../main/java/org/apache/kafka/streams/StreamsConfig.java | 15 +++++---------- .../processor/internals/RepartitionTopicConfig.java | 6 ++---- .../streams/integration/InternalTopicIntegrationTest.java | 4 ++-- .../processor/internals/InternalTopologyBuilderTest.java | 2 +- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 2ba7312..f607d1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1003,17 +1003,12 @@ public class StreamsConfig extends AbstractConfig { // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false); + final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); - if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) { - final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString()); - final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); - final int batchSize; - if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) { - batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()); - } else { - final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties()); - batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG); - } + if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) && + producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) { + final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString()); + final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()); if (segmentSize < batchSize) { throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java index 466520e..7161a3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java @@ -33,10 +33,8 @@ public class RepartitionTopicConfig extends InternalTopicConfig { static { final Map<String, String> tempTopicDefaultOverrides = new HashMap<>(); tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); - tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800"); // 50 MB - tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB - tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000"); // 10 min - tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Long.MAX_VALUE)); // Infinity + tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB + tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)); // Infinity REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 8bcaf5d..345b581 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -171,7 +171,7 @@ public class InternalTopicIntegrationTest { final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition"); assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); - assertEquals(5, repartitionProps.size()); + assertEquals(3, repartitionProps.size()); } @Test @@ -216,6 +216,6 @@ public class InternalTopicIntegrationTest { final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition"); assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); - assertEquals(5, repartitionProps.size()); + assertEquals(3, repartitionProps.size()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 1a46af8..fbeae18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -643,7 +643,7 @@ public class InternalTopologyBuilderTest { final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); - assertEquals(5, properties.size()); + assertEquals(3, properties.size()); assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("appId-foo", topicConfig.name());