This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new caf0b67fbbb KAFKA-16368: Update defaults for 
LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and 
NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106)
caf0b67fbbb is described below

commit caf0b67fbbb9993103d3f31b71c74eac0a6e6c29
Author: Jason Taylor <jasta...@amazon.com>
AuthorDate: Thu Jan 16 14:30:00 2025 +0000

    KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT 
and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (#18106)
    
    Reviewers: Divij Vaidya <di...@amazon.com>
---
 core/src/test/scala/unit/kafka/log/LogCleanerTest.scala          | 5 ++++-
 core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala   | 2 ++
 docs/upgrade.html                                                | 9 +++++++++
 .../java/org/apache/kafka/server/config/ServerLogConfigs.java    | 4 ++--
 .../kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java | 3 +--
 .../streams/integration/FineGrainedAutoResetIntegrationTest.java | 6 +++++-
 .../streams/integration/KStreamAggregationIntegrationTest.java   | 6 +++++-
 .../streams/integration/SmokeTestDriverIntegrationTest.java      | 7 ++++++-
 8 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 9100cc7af21..303db7442d1 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1918,7 +1918,10 @@ class LogCleanerTest extends Logging {
 
   @Test
   def testCleanTombstone(): Unit = {
-    val logConfig = new LogConfig(new Properties())
+    val properties = new Properties()
+    // This test uses future timestamps beyond the default of 1 hour.
+    properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.MaxValue.toString)
+    val logConfig = new LogConfig(properties)
 
     val log = makeLog(config = logConfig)
     val cleaner = makeCleaner(10)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 9bbfa7242c3..b86a5608c3d 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -87,6 +87,8 @@ class DumpLogSegmentsTest {
   private def createTestLog = {
     val props = new Properties
     props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
+    // This test uses future timestamps beyond the default of 1 hour.
+    props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.MaxValue.toString)
     log = UnifiedLog(
       dir = logDir,
       config = new LogConfig(props),
diff --git a/docs/upgrade.html b/docs/upgrade.html
index aa9820cab5e..b8816fee95b 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -318,6 +318,15 @@
                     KIP-714 is now enabled for Kafka Streams via <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension";>KIP-1076</a>.
                    This allows to not only collect the metric of the 
internally used clients of a Kafka Streams appliction via a broker-side plugin,
                    but also to collect the <a 
href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the 
Kafka Streams runtime itself.
+            </li>
+                    The default value of 'num.recovery.threads.per.data.dir' 
has been changed from 1 to 2. The impact of this is faster
+                    recovery post unclean shutdown at the expense of extra IO 
cycles.
+                    See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations";>KIP-1030</a>
 
+                </li>
+                <li>
+                    The default value of 'message.timestamp.after.max.ms' has 
been changed from Long.Max to 1 hour. The impact of this messages with a 
+                    timestamp of more than 1 hour in the future will be 
rejected when message.timestamp.type=CreateTime is set.
+                    See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations";>KIP-1030</a>
 
                 </li>
             </ul>
         </li>
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index 4cd83ef1acf..fa7ed93850f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -118,7 +118,7 @@ public class ServerLogConfigs {
             "If log.message.timestamp.type=CreateTime, the message will be 
rejected if the difference in timestamps exceeds " +
             "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime.";
     public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
-    public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 
Long.MAX_VALUE;
+    public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 
3600000; // 1 hour
     public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This 
configuration sets the allowable timestamp difference between the " +
             "message timestamp and the broker's timestamp. The message 
timestamp can be later than or equal to the broker's " +
             "timestamp, with the maximum allowable difference determined by 
the value set in this configuration. " +
@@ -126,7 +126,7 @@ public class ServerLogConfigs {
             "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime.";
 
     public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = 
"num.recovery.threads.per.data.dir";
-    public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1;
+    public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2;
     public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The 
number of threads per data directory to be used for log recovery at startup and 
flushing at shutdown";
 
     public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = 
"auto.create.topics.enable";
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
index 11c3ca7ad09..d94bb571cbe 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
 
@@ -56,7 +55,7 @@ public abstract class BaseDeleteSegmentsTest extends 
TieredStorageTestHarness {
                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
                 .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", 
"v0"), new KeyValueSpec("k1", "v1"),
-                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis()))
                 // update the topic config such that it triggers the deletion 
of segments
                 .updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())
                 // expect that the three offloaded remote log segments are 
deleted
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 3ea17e1a098..7f2459cab31 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest {
     private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
     private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
 
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+            NUM_BROKERS,
+            mkProperties(
+                    
Collections.singletonMap("log.message.timestamp.after.max.ms", 
String.valueOf(Long.MAX_VALUE))));
 
     @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 97840cd4a8c..bf0d54bc5c0 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
 import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofMinutes;
 import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -102,7 +103,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+            NUM_BROKERS,
+            mkProperties(
+                    
Collections.singletonMap("log.message.timestamp.after.max.ms", 
String.valueOf(Long.MAX_VALUE))));
 
     @BeforeAll
     public static void startCluster() throws Exception {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index eb35666675e..b95e11df4c6 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -35,10 +35,12 @@ import org.junit.jupiter.params.provider.CsvSource;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
 import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -46,7 +48,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(600)
 @Tag("integration")
 public class SmokeTestDriverIntegrationTest {
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+            3,
+            mkProperties(
+                    
Collections.singletonMap("log.message.timestamp.after.max.ms", 
String.valueOf(Long.MAX_VALUE))));
 
     @BeforeAll
     public static void startCluster() throws IOException {

Reply via email to