This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f9d2c2db06 KAFKA-18433: Add BatchSize to ShareFetch request (1/N)
(#18439)
3f9d2c2db06 is described below
commit 3f9d2c2db061a10fa9db20a88e4b509e34191eb5
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Jan 8 15:29:43 2025 +0000
KAFKA-18433: Add BatchSize to ShareFetch request (1/N) (#18439)
Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../apache/kafka/clients/consumer/internals/ShareSessionHandler.java | 2 +-
.../java/org/apache/kafka/common/requests/ShareFetchRequest.java | 3 ++-
clients/src/main/resources/common/message/ShareFetchRequest.json | 4 +++-
.../scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala | 5 +++--
4 files changed, 9 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 100c9ce61b6..27cfdc5981e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -171,7 +171,7 @@ public class ShareSessionHandler {
return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
- fetchConfig.minBytes, fetchConfig.maxBytes,
fetchConfig.fetchSize,
+ fetchConfig.minBytes, fetchConfig.maxBytes,
fetchConfig.fetchSize, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 7ed14b4bdb1..f1a5753fef1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -49,7 +49,7 @@ public class ShareFetchRequest extends AbstractRequest {
}
public static Builder forConsumer(String groupId, ShareRequestMetadata
metadata,
- int maxWait, int minBytes, int
maxBytes, int fetchSize,
+ int maxWait, int minBytes, int
maxBytes, int fetchSize, int batchSize,
List<TopicIdPartition> send,
List<TopicIdPartition> forget,
Map<TopicIdPartition,
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
@@ -67,6 +67,7 @@ public class ShareFetchRequest extends AbstractRequest {
data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes);
+ data.setBatchSize(batchSize);
// Build a map of topics to fetch keyed by topic ID, and within
each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>>
fetchMap = new HashMap<>();
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index 6af76797996..b0b91b82228 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -37,6 +37,8 @@
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default":
"0x7fffffff",
"about": "The maximum bytes to fetch. See KIP-74 for cases where this
limit may not be honored." },
+ { "name": "BatchSize", "type": "int32", "versions": "0+",
+ "about": "The optimal number of records for batches of acquired records
and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The
unique topic ID."},
@@ -45,7 +47,7 @@
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
- "about": "The maximum bytes to fetch from this partition. 0 when
only acknowledgement with no fetching is required. See KIP-74 for cases where
this limit may not be honored." },
+ "about": "TO BE REMOVED. The maximum bytes to fetch from this
partition. 0 when only acknowledgement with no fetching is required. See KIP-74
for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch",
"versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 73f9fce42e6..211faeab405 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -2366,8 +2366,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
acknowledgementsMap:
Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]],
maxWaitMs: Int = MAX_WAIT_MS,
minBytes: Int = 0,
- maxBytes: Int = Int.MaxValue):
ShareFetchRequest = {
- ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs,
minBytes, maxBytes, maxPartitionBytes, send.asJava, forget.asJava,
acknowledgementsMap.asJava)
+ maxBytes: Int = Int.MaxValue,
+ batchSize: Int = 500): ShareFetchRequest
= {
+ ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs,
minBytes, maxBytes, maxPartitionBytes, batchSize, send.asJava, forget.asJava,
acknowledgementsMap.asJava)
.build()
}