This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4fd1bedaba2 KAFKA-12392 Deprecate '--max-partition-memory-bytes'
option in ConsoleProducer (#20952)
4fd1bedaba2 is described below
commit 4fd1bedaba2ab9dda932457a2efa55186c15c069
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Mon Nov 24 15:00:51 2025 +0800
KAFKA-12392 Deprecate '--max-partition-memory-bytes' option in
ConsoleProducer (#20952)
This patch implements
[KIP-1231](https://cwiki.apache.org/confluence/x/xQl3Fw) where we
replace `--max-partition-memory-bytes` with existing `--batch-size`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
docs/upgrade.html | 4 ++++
.../java/org/apache/kafka/tools/ConsoleProducer.java | 19 ++++++++++++++-----
3 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 1bf8431f11a..40b50d2b5ad 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -279,7 +279,7 @@
<suppress checks="BooleanExpressionComplexity"
files="(StreamsResetter|DefaultMessageFormatter).java"/>
<suppress checks="NPathComplexity"
-
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
+
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader|ConsoleProducer).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 785250b6d52..1796b974d9d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -25,6 +25,10 @@
<h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in
4.2.0</a></h5>
<ul>
+ <li>
+ The <code>--max-partition-memory-bytes</code> option in
<code>kafka-console-producer</code>
+ is deprecated and will be removed in Kafka 5.0. Please use
<code>--batch-size</code> instead.
+ </li>
<li>
Queues for Kafka (<a
href="https://cwiki.apache.org/confluence/x/4hA0Dw">KIP-932</a>) is
production-ready in Apache Kafka 4.2. This feature introduces a new kind of
group called
share groups, as an alternative to consumer groups. Consumers in a
share group cooperatively consume records from topics, without assigning each
partition to just one consumer.
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
index b5f117b7218..c216e2ee8fd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
@@ -126,6 +126,7 @@ public class ConsoleProducer {
private final OptionSpec<Long> metadataExpiryMsOpt;
private final OptionSpec<Long> maxBlockMsOpt;
private final OptionSpec<Long> maxMemoryBytesOpt;
+ @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
private final OptionSpec<String> messageReaderOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
@@ -156,8 +157,11 @@ public class ConsoleProducer {
.withOptionalArg()
.describedAs("compression-codec")
.ofType(String.class);
- batchSizeOpt = parser.accepts("batch-size", "Number of messages to
send in a single batch if they are not being sent synchronously. " +
- "please note that this option will be replaced if
max-partition-memory-bytes is also set")
+ batchSizeOpt = parser.accepts("batch-size", "The buffer size in
bytes allocated for a partition. " +
+ "When records are received which are smaller than
this size the producer " +
+ "will attempt to optimistically group them
together until this size is reached. " +
+ "This is the option to control batch.size in
producer configs. " +
+ "Please note that this option will be replaced if
max-partition-memory-bytes is also set.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
@@ -212,9 +216,11 @@ public class ConsoleProducer {
.ofType(Long.class)
.defaultsTo(32 * 1024 * 1024L);
maxPartitionMemoryBytesOpt =
parser.accepts("max-partition-memory-bytes",
- "The buffer size allocated for a partition. When
records are received which are smaller than this size the producer " +
+ "(Deprecated) The buffer size in bytes allocated
for a partition. " +
+ "When records are received which are
smaller than this size the producer " +
"will attempt to optimistically group them
together until this size is reached. " +
- "This is the option to control
`batch.size` in producer configs.")
+ "This is the option to control batch.size
in producer configs. " +
+ "This option will be removed in Apache
Kafka 5.0. Use --batch-size instead.")
.withRequiredArg()
.describedAs("memory in bytes per partition")
.ofType(Integer.class)
@@ -335,6 +341,10 @@ public class ConsoleProducer {
readerPropertyOpt = propertyOpt;
}
+ if (options.has(maxPartitionMemoryBytesOpt)) {
+ System.out.println("Warning: --max-partition-memory-bytes is
deprecated and will be removed in Apache Kafka 5.0. Use --batch-size instead.");
+ }
+
try {
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
} catch (IllegalArgumentException e) {
@@ -398,7 +408,6 @@ public class ConsoleProducer {
CommandLineUtils.maybeMergeOptions(props, RETRY_BACKOFF_MS_CONFIG,
options, retryBackoffMsOpt);
CommandLineUtils.maybeMergeOptions(props, SEND_BUFFER_CONFIG,
options, socketBufferSizeOpt);
CommandLineUtils.maybeMergeOptions(props, BUFFER_MEMORY_CONFIG,
options, maxMemoryBytesOpt);
- // We currently have 2 options to set the batch.size value. We'll
deprecate/remove one of them in KIP-717.
CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG,
options, batchSizeOpt);
CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG,
options, maxPartitionMemoryBytesOpt);
CommandLineUtils.maybeMergeOptions(props, METADATA_MAX_AGE_CONFIG,
options, metadataExpiryMsOpt);