This is an automated email from the ASF dual-hosted git repository. divijv pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new beda2f3917c KAFKA-16368: Update default linger.ms to 5ms for KIP-1030 (#18080) beda2f3917c is described below commit beda2f3917c7cca493b8f3228e1d40703359c9e4 Author: Jason Taylor <jasta...@amazon.com> AuthorDate: Thu Jan 16 09:50:06 2025 +0000 KAFKA-16368: Update default linger.ms to 5ms for KIP-1030 (#18080) Reviewers: Ismael Juma <ism...@juma.me.uk>, Divij Vaidya <di...@amazon.com> --- .../org/apache/kafka/clients/producer/ProducerConfig.java | 15 ++++++++++----- .../kafka/api/ProducerSendWhileDeletionTest.scala | 3 ++- .../scala/integration/kafka/api/TransactionsTest.scala | 6 ++++-- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 3 ++- docs/upgrade.html | 3 +++ 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index acafe119e77..23dd02bda98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -93,8 +93,11 @@ public class ProducerConfig extends AbstractConfig { + "<p>" + "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated " + "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. " - + "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated " - + "batch size is under this <code>batch.size</code> setting."; + + "This <code>linger.ms</code> setting defaults to 5, which means the producer will wait for 5ms or until the record batch is " + + "of <code>batch.size</code>(whichever happens first) before sending the record batch. Note that broker backpressure can " + + " result in a higher effective linger time than this setting." + + "The default changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in " + + "similar or lower producer latency despite the increased linger."; /** <code>partitioner.adaptive.partitioning.enable</code> */ public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable"; @@ -147,8 +150,10 @@ public class ProducerConfig extends AbstractConfig { + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once " + "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this " + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the " - + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, " - + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load."; + + "specified time waiting for more records to show up. This setting defaults to 5 (i.e. 5ms delay). Increasing <code>" + LINGER_MS_CONFIG + "=50</code>, " + + "for example, would have the effect of reducing the number of requests sent but would add up to 50ms of latency to records sent in the absence of load." + + "The default changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in " + + "similar or lower producer latency despite the increased linger."; /** <code>request.timeout.ms</code> */ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; @@ -383,7 +388,7 @@ public class ProducerConfig extends AbstractConfig { .define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC) .define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC) .define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC) - .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) + .define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala index 8d3e76e7448..0ee52530e57 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala @@ -34,6 +34,7 @@ import scala.jdk.CollectionConverters._ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { val producerCount: Int = 1 val brokerCount: Int = 2 + val defaultLingerMs: Int = 5; serverConfig.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 2.toString) serverConfig.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, 2.toString) @@ -41,7 +42,7 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString) producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString) - producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString) + producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, (10000 + defaultLingerMs).toString) /** * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce. diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index b5b51c6f498..b32fea75ca6 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -717,8 +717,9 @@ class TransactionsTest extends IntegrationTestHarness { "kraft,consumer,false", )) def testBumpTransactionalEpochWithTV2Disabled(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = { + val defaultLinger = 5; val producer = createTransactionalProducer("transactionalProducer", - deliveryTimeoutMs = 5000, requestTimeoutMs = 5000) + deliveryTimeoutMs = 5000 + defaultLinger, requestTimeoutMs = 5000) val consumer = transactionalConsumers.head try { // Create a topic with RF=1 so that a single broker failure will render it unavailable @@ -783,8 +784,9 @@ class TransactionsTest extends IntegrationTestHarness { "kraft, consumer, true" )) def testBumpTransactionalEpochWithTV2Enabled(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = { + val defaultLinger = 5; val producer = createTransactionalProducer("transactionalProducer", - deliveryTimeoutMs = 5000, requestTimeoutMs = 5000) + deliveryTimeoutMs = 5000 + defaultLinger, requestTimeoutMs = 5000) val consumer = transactionalConsumers.head try { diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 545e4359fdc..26c65f46603 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -1349,7 +1349,8 @@ val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new Con private var _retries = Int.MaxValue private var _acks = -1 private var _requestTimeoutMs = 30000 - private var _deliveryTimeoutMs = 30000 + private val defaultLingerMs = 5; + private var _deliveryTimeoutMs = 30000 + defaultLingerMs def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this } def acks(acks: Int): ProducerBuilder = { _acks = acks; this } diff --git a/docs/upgrade.html b/docs/upgrade.html index 30505e6c92a..aa9820cab5e 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -209,6 +209,9 @@ </li> <li>The deprecated <code>sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>, String)</code> method has been removed from the Producer API. </li> + <li>The default <code>linger.ms</code> changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in + similar or lower producer latency despite the increased linger. + </li> </ul> </li> <li><b>Admin client</b>