This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 169e2119979 KAFKA-19892: Add acq lock timeout ms field to share ack
response. (#20901)
169e2119979 is described below
commit 169e21199791d02d195a739044734b89847266af
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon Nov 17 20:26:35 2025 +0530
KAFKA-19892: Add acq lock timeout ms field to share ack response. (#20901)
* Add field `AcquisitionLockTImeoutMs` to `ShareAcknowledgeResponse` to
communicate the timeout to the share consumer.
* This becomes useful with addition of RENEW ack type in KIP-1222.
* Tests have been updated.
Reviewers: Andrew Schofield <[email protected]>
---
.../apache/kafka/common/requests/ShareAcknowledgeResponse.java | 7 ++++---
.../resources/common/message/ShareAcknowledgeResponse.json | 2 ++
.../consumer/internals/ShareConsumeRequestManagerTest.java | 10 +++++-----
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++-
.../unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala | 7 +++++++
5 files changed, 20 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
index d303a852b79..33dcc117f2b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
@@ -107,13 +107,13 @@ public class ShareAcknowledgeResponse extends
AbstractResponse {
public static ShareAcknowledgeResponse of(Errors error,
int throttleTimeMs,
LinkedHashMap<TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData> responseData,
- List<Node> nodeEndpoints) {
- return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs,
responseData.entrySet().iterator(), nodeEndpoints));
+ List<Node> nodeEndpoints, int
acquisitionLockTimeout) {
+ return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs,
responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
}
public static ShareAcknowledgeResponseData toMessage(Errors error, int
throttleTimeMs,
Iterator<Map.Entry<TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData>> partIterator,
- List<Node>
nodeEndpoints) {
+ List<Node>
nodeEndpoints, int acquisitionLockTimeout) {
ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection
topicResponses = new
ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection();
while (partIterator.hasNext()) {
Map.Entry<TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData> entry = partIterator.next();
@@ -140,6 +140,7 @@ public class ShareAcknowledgeResponse extends
AbstractResponse {
.setRack(endpoint.rack())));
return data.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
+ .setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
.setResponses(topicResponses);
}
}
diff --git
a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
index 786ee78eb8e..3768621de4a 100644
--- a/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
+++ b/clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
@@ -43,6 +43,8 @@
"about": "The top level response error code." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
+ { "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "2+",
+ "about": "The time in milliseconds for which the acquired records are
locked." },
{ "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse",
"versions": "0+",
"about": "The response topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index f199999a8f0..13a0af65f91 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -2626,25 +2626,25 @@ public class ShareConsumeRequestManagerTest {
private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Collections.emptyMap();
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
}
private ShareAcknowledgeResponse
acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
partitionDataForAcknowledge(tp, Errors.NONE));
- return ShareAcknowledgeResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ return ShareAcknowledgeResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
}
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition
tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
partitionDataForAcknowledge(tp, error));
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
}
private ShareAcknowledgeResponse
fullAcknowledgeResponse(Map<TopicIdPartition, Errors> partitionErrorsMap) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = new HashMap<>();
partitionErrorsMap.forEach((tip, error) -> partitions.put(tip,
partitionDataForAcknowledge(tip, error)));
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
}
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition
tp,
@@ -2653,7 +2653,7 @@ public class ShareConsumeRequestManagerTest {
List<Node>
nodeEndpoints) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
partitionDataForAcknowledge(tp, error, currentLeader));
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), nodeEndpoints);
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), nodeEndpoints, 0);
}
private ShareFetchResponseData.PartitionData
partitionDataForFetch(TopicIdPartition tp,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c31c404eef9..b18f14794f1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4035,7 +4035,8 @@ class KafkaApis(val requestChannel: RequestChannel,
Errors.NONE,
0,
partitions,
- nodeEndpoints.values.toList.asJava
+ nodeEndpoints.values.toList.asJava,
+ config.shareGroupConfig.shareGroupRecordLockDurationMs
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 9538e2a425f..a5520ab764c 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -87,6 +87,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareAcknowledgeResponse =
IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest,
socket)
assertEquals(Errors.UNSUPPORTED_VERSION.code,
shareAcknowledgeResponse.data.errorCode)
+ assertEquals(0, shareAcknowledgeResponse.data.acquisitionLockTimeoutMs)
}
@ClusterTests(
@@ -505,6 +506,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setErrorCode(Errors.NONE.code())
val acknowledgePartitionData =
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
+ assertEquals(30000,
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData,
acknowledgePartitionData)
// Producing 10 more records to the topic
@@ -730,6 +732,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId,
shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1,
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
+ assertEquals(30000,
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
val expectedAcknowledgePartitionData = new
ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(PARTITION)
@@ -952,6 +955,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId,
shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1,
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
+ assertEquals(30000,
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
val expectedAcknowledgePartitionData = new
ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(PARTITION)
@@ -1183,6 +1187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(1, shareAcknowledgeResponseData.responses().size())
assertEquals(topicId,
shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
assertEquals(1,
shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
+ assertEquals(30000,
shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
var expectedAcknowledgePartitionData = new
ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(PARTITION)
@@ -1766,6 +1771,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code,
shareAcknowledgeResponseData.errorCode)
+ assertEquals(0, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
}
@ClusterTests(
@@ -1913,6 +1919,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code,
shareAcknowledgeResponseData.errorCode)
+ assertEquals(0, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
}
@ClusterTests(