This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new beda2f3917c KAFKA-16368: Update default linger.ms to 5ms for KIP-1030 
(#18080)
beda2f3917c is described below

commit beda2f3917c7cca493b8f3228e1d40703359c9e4
Author: Jason Taylor <jasta...@amazon.com>
AuthorDate: Thu Jan 16 09:50:06 2025 +0000

    KAFKA-16368: Update default linger.ms to 5ms for KIP-1030 (#18080)
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Divij Vaidya <di...@amazon.com>
---
 .../org/apache/kafka/clients/producer/ProducerConfig.java | 15 ++++++++++-----
 .../kafka/api/ProducerSendWhileDeletionTest.scala         |  3 ++-
 .../scala/integration/kafka/api/TransactionsTest.scala    |  6 ++++--
 .../kafka/server/DynamicBrokerReconfigurationTest.scala   |  3 ++-
 docs/upgrade.html                                         |  3 +++
 5 files changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index acafe119e77..23dd02bda98 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -93,8 +93,11 @@ public class ProducerConfig extends AbstractConfig {
                                                  + "<p>"
                                                  + "Note: This setting gives 
the upper bound of the batch size to be sent. If we have fewer than this many 
bytes accumulated "
                                                  + "for this partition, we 
will 'linger' for the <code>linger.ms</code> time waiting for more records to 
show up. "
-                                                 + "This 
<code>linger.ms</code> setting defaults to 0, which means we'll immediately 
send out a record even the accumulated "
-                                                 + "batch size is under this 
<code>batch.size</code> setting.";
+                                                 + "This 
<code>linger.ms</code> setting defaults to 5, which means the producer will 
wait for 5ms or until the record batch is "
+                                                 + "of 
<code>batch.size</code>(whichever happens first) before sending the record 
batch. Note that broker backpressure can "
+                                                 + " result in a higher 
effective linger time than this setting."
+                                                 + "The default changed from 0 
to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically 
result in "
+                                                 + "similar or lower producer 
latency despite the increased linger.";
 
     /** <code>partitioner.adaptive.partitioning.enable</code> */
     public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG 
= "partitioner.adaptive.partitioning.enable";
@@ -147,8 +150,10 @@ public class ProducerConfig extends AbstractConfig {
                                                 + "of as analogous to Nagle's 
algorithm in TCP. This setting gives the upper bound on the delay for batching: 
once "
                                                 + "we get <code>" + 
BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent 
immediately regardless of this "
                                                 + "setting, however if we have 
fewer than this many bytes accumulated for this partition we will 'linger' for 
the "
-                                                + "specified time waiting for 
more records to show up. This setting defaults to 0 (i.e. no delay). Setting 
<code>" + LINGER_MS_CONFIG + "=5</code>, "
-                                                + "for example, would have the 
effect of reducing the number of requests sent but would add up to 5ms of 
latency to records sent in the absence of load.";
+                                                + "specified time waiting for 
more records to show up. This setting defaults to 5 (i.e. 5ms delay). 
Increasing <code>" + LINGER_MS_CONFIG + "=50</code>, "
+                                                + "for example, would have the 
effect of reducing the number of requests sent but would add up to 50ms of 
latency to records sent in the absence of load."
+                                                + "The default changed from 0 
to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically 
result in "
+                                                + "similar or lower producer 
latency despite the increased linger.";
 
     /** <code>request.timeout.ms</code> */
     public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
@@ -383,7 +388,7 @@ public class ProducerConfig extends AbstractConfig {
                                 
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, 
Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
                                 
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), 
Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
                                 .define(PARTITIONER_IGNORE_KEYS_CONFIG, 
Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
-                                .define(LINGER_MS_CONFIG, Type.LONG, 0, 
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(LINGER_MS_CONFIG, Type.LONG, 5, 
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 
120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", 
Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, 
CommonClientConfigs.SEND_BUFFER_DOC)
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
index 8d3e76e7448..0ee52530e57 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -34,6 +34,7 @@ import scala.jdk.CollectionConverters._
 class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
   val producerCount: Int = 1
   val brokerCount: Int = 2
+  val defaultLingerMs: Int = 5;
 
   serverConfig.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 2.toString)
   serverConfig.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, 
2.toString)
@@ -41,7 +42,7 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
 
   producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
   producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString)
-  producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString)
+  producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, (10000 + 
defaultLingerMs).toString)
 
   /**
    * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index b5b51c6f498..b32fea75ca6 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -717,8 +717,9 @@ class TransactionsTest extends IntegrationTestHarness {
     "kraft,consumer,false",
   ))
   def testBumpTransactionalEpochWithTV2Disabled(quorum: String, groupProtocol: 
String, isTV2Enabled: Boolean): Unit = {
+    val defaultLinger = 5;
     val producer = createTransactionalProducer("transactionalProducer",
-      deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
+      deliveryTimeoutMs = 5000 + defaultLinger, requestTimeoutMs = 5000)
     val consumer = transactionalConsumers.head
     try {
       // Create a topic with RF=1 so that a single broker failure will render 
it unavailable
@@ -783,8 +784,9 @@ class TransactionsTest extends IntegrationTestHarness {
     "kraft, consumer, true"
   ))
   def testBumpTransactionalEpochWithTV2Enabled(quorum: String, groupProtocol: 
String, isTV2Enabled: Boolean): Unit = {
+    val defaultLinger = 5;
     val producer = createTransactionalProducer("transactionalProducer",
-      deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
+      deliveryTimeoutMs = 5000 + defaultLinger, requestTimeoutMs = 5000)
     val consumer = transactionalConsumers.head
 
     try {
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 545e4359fdc..26c65f46603 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1349,7 +1349,8 @@ val configEntries = props.asScala.map { case (k, v) => 
new AlterConfigOp(new Con
     private var _retries = Int.MaxValue
     private var _acks = -1
     private var _requestTimeoutMs = 30000
-    private var _deliveryTimeoutMs = 30000
+    private val defaultLingerMs = 5;
+    private var _deliveryTimeoutMs = 30000 + defaultLingerMs
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this 
}
     def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 30505e6c92a..aa9820cab5e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -209,6 +209,9 @@
                         </li>
                         <li>The deprecated 
<code>sendOffsetsToTransaction(Map&lt;TopicPartition, OffsetAndMetadata&gt;, 
String)</code> method has been removed from the Producer API.
                         </li>
+                        <li>The default <code>linger.ms</code> changed from 0 
to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically 
result in 
+                            similar or lower producer latency despite the 
increased linger.
+                        </li>
                     </ul>
                 </li>
                 <li><b>Admin client</b>

Reply via email to