This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 169e2119979 KAFKA-19892: Add acq lock timeout ms field to share ack 
response. (#20901)
169e2119979 is described below

commit 169e21199791d02d195a739044734b89847266af
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon Nov 17 20:26:35 2025 +0530

    KAFKA-19892: Add acq lock timeout ms field to share ack response. (#20901)
    
    * Add field `AcquisitionLockTImeoutMs` to `ShareAcknowledgeResponse` to
    communicate the timeout to the share consumer.
    * This becomes useful with addition of RENEW ack type in KIP-1222.
    * Tests have been updated.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../apache/kafka/common/requests/ShareAcknowledgeResponse.java |  7 ++++---
 .../resources/common/message/ShareAcknowledgeResponse.json     |  2 ++
 .../consumer/internals/ShareConsumeRequestManagerTest.java     | 10 +++++-----
 core/src/main/scala/kafka/server/KafkaApis.scala               |  3 ++-
 .../unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala   |  7 +++++++
 5 files changed, 20 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
index d303a852b79..33dcc117f2b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
@@ -107,13 +107,13 @@ public class ShareAcknowledgeResponse extends 
AbstractResponse {
     public static ShareAcknowledgeResponse of(Errors error,
                                               int throttleTimeMs,
                                               LinkedHashMap<TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData> responseData,
-                                              List<Node> nodeEndpoints) {
-        return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs, 
responseData.entrySet().iterator(), nodeEndpoints));
+                                              List<Node> nodeEndpoints, int 
acquisitionLockTimeout) {
+        return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs, 
responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
     }
 
     public static ShareAcknowledgeResponseData toMessage(Errors error, int 
throttleTimeMs,
                                                          
Iterator<Map.Entry<TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData>> partIterator,
-                                                         List<Node> 
nodeEndpoints) {
+                                                         List<Node> 
nodeEndpoints, int acquisitionLockTimeout) {
         ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection 
topicResponses = new 
ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection();
         while (partIterator.hasNext()) {
             Map.Entry<TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData> entry = partIterator.next();
@@ -140,6 +140,7 @@ public class ShareAcknowledgeResponse extends 
AbstractResponse {
                         .setRack(endpoint.rack())));
         return data.setThrottleTimeMs(throttleTimeMs)
                 .setErrorCode(error.code())
+                .setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
                 .setResponses(topicResponses);
     }
 }
diff --git 
a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json 
b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
index 786ee78eb8e..3768621de4a 100644
--- a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
+++ b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
@@ -43,6 +43,8 @@
       "about": "The top level response error code." },
     { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "The top-level error message, or null if there was no error." },
+    { "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "2+",
+      "about": "The time in milliseconds for which the acquired records are 
locked." },
     { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", 
"versions": "0+",
       "about": "The response topics.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index f199999a8f0..13a0af65f91 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -2626,25 +2626,25 @@ public class ShareConsumeRequestManagerTest {
 
     private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Collections.emptyMap();
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
     }
 
     private ShareAcknowledgeResponse 
acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
                 partitionDataForAcknowledge(tp, Errors.NONE));
-        return ShareAcknowledgeResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+        return ShareAcknowledgeResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
     }
 
     private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition 
tp, Errors error) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
                 partitionDataForAcknowledge(tp, error));
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
     }
 
     private ShareAcknowledgeResponse 
fullAcknowledgeResponse(Map<TopicIdPartition, Errors> partitionErrorsMap) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = new HashMap<>();
         partitionErrorsMap.forEach((tip, error) -> partitions.put(tip, 
partitionDataForAcknowledge(tip, error)));
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
     }
 
     private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition 
tp,
@@ -2653,7 +2653,7 @@ public class ShareConsumeRequestManagerTest {
                                                              List<Node> 
nodeEndpoints) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
             partitionDataForAcknowledge(tp, error, currentLeader));
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), nodeEndpoints);
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), nodeEndpoints, 0);
     }
 
     private ShareFetchResponseData.PartitionData 
partitionDataForFetch(TopicIdPartition tp,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c31c404eef9..b18f14794f1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4035,7 +4035,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       Errors.NONE,
       0,
       partitions,
-      nodeEndpoints.values.toList.asJava
+      nodeEndpoints.values.toList.asJava,
+      config.shareGroupConfig.shareGroupRecordLockDurationMs
     )
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 9538e2a425f..a5520ab764c 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -87,6 +87,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val shareAcknowledgeResponse = 
IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest,
 socket)
 
     assertEquals(Errors.UNSUPPORTED_VERSION.code, 
shareAcknowledgeResponse.data.errorCode)
+    assertEquals(0, shareAcknowledgeResponse.data.acquisitionLockTimeoutMs)
   }
 
   @ClusterTests(
@@ -505,6 +506,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       .setErrorCode(Errors.NONE.code())
 
     val acknowledgePartitionData = 
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
+    assertEquals(30000, 
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
     compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, 
acknowledgePartitionData)
 
     // Producing 10 more records to the topic
@@ -730,6 +732,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     assertEquals(1, shareAcknowledgeResponseData.responses().size())
     assertEquals(topicId, 
shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
     assertEquals(1, 
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
+    assertEquals(30000, 
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
 
     val expectedAcknowledgePartitionData = new 
ShareAcknowledgeResponseData.PartitionData()
       .setPartitionIndex(PARTITION)
@@ -952,6 +955,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     assertEquals(1, shareAcknowledgeResponseData.responses().size())
     assertEquals(topicId, 
shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
     assertEquals(1, 
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
+    assertEquals(30000, 
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
 
     val expectedAcknowledgePartitionData = new 
ShareAcknowledgeResponseData.PartitionData()
       .setPartitionIndex(PARTITION)
@@ -1183,6 +1187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     assertEquals(1, shareAcknowledgeResponseData.responses().size())
     assertEquals(topicId, 
shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
     assertEquals(1, 
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
+    assertEquals(30000, 
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
 
     var expectedAcknowledgePartitionData = new 
ShareAcknowledgeResponseData.PartitionData()
       .setPartitionIndex(PARTITION)
@@ -1766,6 +1771,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
shareAcknowledgeResponseData.errorCode)
+    assertEquals(0, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
   }
 
   @ClusterTests(
@@ -1913,6 +1919,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
 
     val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
shareAcknowledgeResponseData.errorCode)
+    assertEquals(0, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
   }
 
   @ClusterTests(

Reply via email to