This is an automated email from the ASF dual-hosted git repository. divijv pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new caf0b67fbbb KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106) caf0b67fbbb is described below commit caf0b67fbbb9993103d3f31b71c74eac0a6e6c29 Author: Jason Taylor <jasta...@amazon.com> AuthorDate: Thu Jan 16 14:30:00 2025 +0000 KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106) Reviewers: Divij Vaidya <di...@amazon.com> --- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 5 ++++- core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala | 2 ++ docs/upgrade.html | 9 +++++++++ .../java/org/apache/kafka/server/config/ServerLogConfigs.java | 4 ++-- .../kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java | 3 +-- .../streams/integration/FineGrainedAutoResetIntegrationTest.java | 6 +++++- .../streams/integration/KStreamAggregationIntegrationTest.java | 6 +++++- .../streams/integration/SmokeTestDriverIntegrationTest.java | 7 ++++++- 8 files changed, 34 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 9100cc7af21..303db7442d1 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1918,7 +1918,10 @@ class LogCleanerTest extends Logging { @Test def testCleanTombstone(): Unit = { - val logConfig = new LogConfig(new Properties()) + val properties = new Properties() + // This test uses future timestamps beyond the default of 1 hour. + properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString) + val logConfig = new LogConfig(properties) val log = makeLog(config = logConfig) val cleaner = makeCleaner(10) diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 9bbfa7242c3..b86a5608c3d 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -87,6 +87,8 @@ class DumpLogSegmentsTest { private def createTestLog = { val props = new Properties props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128") + // This test uses future timestamps beyond the default of 1 hour. + props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString) log = UnifiedLog( dir = logDir, config = new LogConfig(props), diff --git a/docs/upgrade.html b/docs/upgrade.html index aa9820cab5e..b8816fee95b 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -318,6 +318,15 @@ KIP-714 is now enabled for Kafka Streams via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>. This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin, but also to collect the <a href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself. + </li> + The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster + recovery post unclean shutdown at the expense of extra IO cycles. + See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a> + </li> + <li> + The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a + timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set. + See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a> </li> </ul> </li> diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java index 4cd83ef1acf..fa7ed93850f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java @@ -118,7 +118,7 @@ public class ServerLogConfigs { "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG); - public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = Long.MAX_VALUE; + public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 3600000; // 1 hour public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " + "message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " + "timestamp, with the maximum allowable difference determined by the value set in this configuration. " + @@ -126,7 +126,7 @@ public class ServerLogConfigs { "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir"; - public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1; + public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2; public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"; public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable"; diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java index 11c3ca7ad09..d94bb571cbe 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; @@ -56,7 +55,7 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness { .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2")) .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), - new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1))) + new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis())) // update the topic config such that it triggers the deletion of segments .updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList()) // expect that the three offloaded remote log segments are deleted diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index 3ea17e1a098..7f2459cab31 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest { private static final String OUTPUT_TOPIC_4 = "outputTopic_4"; private static final String OUTPUT_TOPIC_5 = "outputTopic_5"; - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( + NUM_BROKERS, + mkProperties( + Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE)))); @BeforeAll public static void startCluster() throws IOException, InterruptedException { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 97840cd4a8c..bf0d54bc5c0 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit; import static java.time.Duration.ofMillis; import static java.time.Duration.ofMinutes; import static java.time.Instant.ofEpochMilli; +import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -102,7 +103,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KStreamAggregationIntegrationTest { private static final int NUM_BROKERS = 1; - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( + NUM_BROKERS, + mkProperties( + Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE)))); @BeforeAll public static void startCluster() throws Exception { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index eb35666675e..b95e11df4c6 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -35,10 +35,12 @@ import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -46,7 +48,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") public class SmokeTestDriverIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( + 3, + mkProperties( + Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE)))); @BeforeAll public static void startCluster() throws IOException {