This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f9d2c2db06 KAFKA-18433: Add BatchSize to ShareFetch request (1/N) 
(#18439)
3f9d2c2db06 is described below

commit 3f9d2c2db061a10fa9db20a88e4b509e34191eb5
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Jan 8 15:29:43 2025 +0000

    KAFKA-18433: Add BatchSize to ShareFetch request (1/N) (#18439)
    
    Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy 
<[email protected]>
---
 .../apache/kafka/clients/consumer/internals/ShareSessionHandler.java | 2 +-
 .../java/org/apache/kafka/common/requests/ShareFetchRequest.java     | 3 ++-
 clients/src/main/resources/common/message/ShareFetchRequest.json     | 4 +++-
 .../scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala   | 5 +++--
 4 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 100c9ce61b6..27cfdc5981e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -171,7 +171,7 @@ public class ShareSessionHandler {
 
         return ShareFetchRequest.Builder.forConsumer(
                 groupId, nextMetadata, fetchConfig.maxWaitMs,
-                fetchConfig.minBytes, fetchConfig.maxBytes, 
fetchConfig.fetchSize,
+                fetchConfig.minBytes, fetchConfig.maxBytes, 
fetchConfig.fetchSize, fetchConfig.maxPollRecords,
                 added, removed, acknowledgementBatches);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 7ed14b4bdb1..f1a5753fef1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -49,7 +49,7 @@ public class ShareFetchRequest extends AbstractRequest {
         }
 
         public static Builder forConsumer(String groupId, ShareRequestMetadata 
metadata,
-                                          int maxWait, int minBytes, int 
maxBytes, int fetchSize,
+                                          int maxWait, int minBytes, int 
maxBytes, int fetchSize, int batchSize,
                                           List<TopicIdPartition> send, 
List<TopicIdPartition> forget,
                                           Map<TopicIdPartition, 
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
             ShareFetchRequestData data = new ShareFetchRequestData();
@@ -67,6 +67,7 @@ public class ShareFetchRequest extends AbstractRequest {
             data.setMaxWaitMs(maxWait);
             data.setMinBytes(minBytes);
             data.setMaxBytes(maxBytes);
+            data.setBatchSize(batchSize);
 
             // Build a map of topics to fetch keyed by topic ID, and within 
each a map of partitions keyed by index
             Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> 
fetchMap = new HashMap<>();
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json 
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index 6af76797996..b0b91b82228 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -37,6 +37,8 @@
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": 
"0x7fffffff",
       "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
+    { "name": "BatchSize", "type": "int32", "versions": "0+",
+      "about": "The optimal number of records for batches of acquired records 
and acknowledgements." },
     { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
       "about": "The topics to fetch.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The 
unique topic ID."},
@@ -45,7 +47,7 @@
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index." },
         { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
-          "about": "The maximum bytes to fetch from this partition. 0 when 
only acknowledgement with no fetching is required. See KIP-74 for cases where 
this limit may not be honored." },
+          "about": "TO BE REMOVED. The maximum bytes to fetch from this 
partition. 0 when only acknowledgement with no fetching is required. See KIP-74 
for cases where this limit may not be honored." },
         { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", 
"versions": "0+",
           "about": "Record batches to acknowledge.", "fields": [
           { "name": "FirstOffset", "type": "int64", "versions": "0+",
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 73f9fce42e6..211faeab405 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -2366,8 +2366,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
                                       acknowledgementsMap: 
Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]],
                                       maxWaitMs: Int = MAX_WAIT_MS,
                                       minBytes: Int = 0,
-                                      maxBytes: Int = Int.MaxValue): 
ShareFetchRequest = {
-    ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, 
minBytes, maxBytes, maxPartitionBytes, send.asJava, forget.asJava, 
acknowledgementsMap.asJava)
+                                      maxBytes: Int = Int.MaxValue,
+                                      batchSize: Int = 500): ShareFetchRequest 
= {
+    ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, 
minBytes, maxBytes, maxPartitionBytes, batchSize, send.asJava, forget.asJava, 
acknowledgementsMap.asJava)
       .build()
   }
   

Reply via email to