This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 214bb07d6a4 KAFKA-19845: [1/N] Renew acks in share consumer (#20838)
214bb07d6a4 is described below
commit 214bb07d6a47f67b37b7c3aed86ebebe7f7ac992
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Nov 11 20:21:51 2025 +0000
KAFKA-19845: [1/N] Renew acks in share consumer (#20838)
Implements AcknowledgeType.RENEW in the share consumer as part of
KIP-1222. There will be a future PR with additional tests.
Reviewers: Apoorv Mittal <[email protected]>, Shivsundar R
<[email protected]>, Abhinav Dixit <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 92 ++++++++++++++++-
.../consumer/internals/Acknowledgements.java | 9 ++
.../internals/ShareConsumeRequestManager.java | 114 +++++++++++++--------
.../consumer/internals/ShareConsumerImpl.java | 70 ++++++++++---
.../clients/consumer/internals/ShareFetch.java | 65 +++++++++++-
.../consumer/internals/ShareInFlightBatch.java | 90 ++++++++++++++--
.../consumer/internals/ShareSessionHandler.java | 50 +++++++--
.../consumer/internals/events/BackgroundEvent.java | 1 +
.../ShareRenewAcknowledgementsCompleteEvent.java | 42 ++++++++
.../common/requests/ShareAcknowledgeRequest.java | 3 +-
.../kafka/common/requests/ShareFetchRequest.java | 6 +-
.../common/message/ShareFetchRequest.json | 2 +-
.../consumer/internals/AcknowledgementsTest.java | 25 ++++-
.../internals/ShareCompletedFetchTest.java | 98 +++++++++---------
.../internals/ShareConsumeRequestManagerTest.java | 87 ++++++++++++++--
.../consumer/internals/ShareConsumerImplTest.java | 87 ++++++++++++++++
.../internals/ShareFetchCollectorTest.java | 68 ++++++++++++
.../kafka/server/share/SharePartitionManager.java | 4 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../server/ShareFetchAcknowledgeRequestTest.scala | 11 +-
20 files changed, 772 insertions(+), 154 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 2f7e72fce95..4cc6c195eac 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -133,6 +133,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ShareConsumerTest {
private final ClusterInstance cluster;
private final TopicPartition tp = new TopicPartition("topic", 0);
+ private Uuid tpId;
private final TopicPartition tp2 = new TopicPartition("topic2", 0);
private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
private List<TopicPartition> sgsTopicPartitions;
@@ -151,7 +152,7 @@ public class ShareConsumerTest {
public void setup() {
try {
this.cluster.waitForReadyBrokers();
- createTopic("topic");
+ tpId = createTopic("topic");
createTopic("topic2");
sgsTopicPartitions = IntStream.range(0, 3)
.mapToObj(part -> new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
@@ -2906,6 +2907,95 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testRenewAcknowledgementOnPoll() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) ->
+ offsetsByTopicPartition.forEach((tip, offsets) ->
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ assertEquals(10, records.count());
+
+ int count = 0;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ if (count % 2 == 0) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ } else {
+ shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+ }
+ count++;
+ }
+
+ // Get the rest of all 5 records.
+ records = waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ }
+
+ shareConsumer.commitSync();
+ assertEquals(15, acknowledgementsCommitted.get());
+ }
+ }
+
+ @ClusterTest
+ public void testRenewAcknowledgementOnCommitSync() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) ->
+ offsetsByTopicPartition.forEach((tip, offsets) ->
acknowledgementsCommitted.addAndGet(offsets.size())));
+
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ assertEquals(10, records.count());
+
+ int count = 0;
+ Map<TopicIdPartition, Optional<KafkaException>> result;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ if (count % 2 == 0) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ } else {
+ shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
+ }
+ result = shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(), result.get(new
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+ count++;
+ }
+
+ // Get the rest of all 5 records.
+ records = waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
+ }
+ }
+ }
+
/**
* Util class to encapsulate state for a consumer/producer
* being executed by an {@link ExecutorService}.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
index 5bce77651b9..a60d24520d0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java
@@ -143,6 +143,15 @@ public class Acknowledgements {
return acknowledgeException;
}
+ /**
+ * Whether an acknowledgement error code was received in the response from
the broker.
+ *
+ * @return Whether an acknowledgement error code was received in the
response from the broker.
+ */
+ public boolean isCompletedExceptionally() {
+ return acknowledgeException != null;
+ }
+
/**
* Merges two sets of acknowledgements. If there are overlapping
acknowledgements, the
* merged set wins.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 5b5214c834a..eff925d342e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -22,6 +22,7 @@ import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollRes
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@@ -200,7 +201,6 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
}
-
// Iterate over the session handlers to see if there are
acknowledgements to be sent for partitions
// which are no longer part of the current subscription.
// We fail acknowledgements for records fetched from a previous leader.
@@ -228,6 +228,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
} else {
log.debug("Leader for the partition is down or
has changed, failing Acknowledgements for partition {}", tip);
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
}
});
@@ -262,6 +263,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
/**
+ * Add acknowledgements for a topic-partition to the node's in-flight
acknowledgements.
*
* @return True if we can add acknowledgements to the share session.
* If we cannot add acknowledgements, they are completed with {@link
Errors#INVALID_SHARE_SESSION_EPOCH} exception.
@@ -274,6 +276,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
// Failing the acknowledgements as we cannot have piggybacked
acknowledgements in the initial ShareFetchRequest.
log.debug("Cannot send acknowledgements on initial epoch for
ShareSession for partition {}", tip);
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
+ sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
acknowledgements));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
acknowledgements));
return false;
} else {
@@ -389,6 +392,11 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
}
+ private void
sendShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
+ ShareRenewAcknowledgementsCompleteEvent event = new
ShareRenewAcknowledgementsCompleteEvent(acknowledgementsMap);
+ backgroundEventHandler.add(event);
+ }
+
/**
*
* @param acknowledgeRequestState Contains the acknowledgements to be sent.
@@ -536,6 +544,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
resultCount.incrementAndGet();
} else {
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
}
}
@@ -612,6 +621,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()));
}
}
@@ -650,6 +660,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip,
nodeAcks.acknowledgements()));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
nodeAcks.acknowledgements()));
}
});
@@ -669,6 +680,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
sendShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
}
});
@@ -695,7 +707,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
new IllegalStateException("Attempt to call close()
when there is an existing close request for node : " + node.id()));
} else {
// There can only be one close() happening at a time. So
per node, there will be one acknowledge request state.
- acknowledgeRequestStates.get(nodeId).setCloseRequest(new
AcknowledgeRequestState(logContext,
+ acknowledgeRequestStates.get(nodeId).setCloseRequest(
+ new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() +
":3",
deadlineMs,
retryBackoffMs,
@@ -737,7 +750,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
private void handleShareFetchSuccess(Node fetchTarget,
- @SuppressWarnings("unused")
ShareFetchRequestData requestData,
+ ShareFetchRequestData requestData,
ClientResponse resp) {
try {
log.debug("Completed ShareFetch request from node {}
successfully", fetchTarget.id());
@@ -756,13 +769,16 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (response.error() == Errors.UNKNOWN_TOPIC_ID) {
metadata.requestUpdate(false);
}
- // Complete any inFlight acknowledgements with the error code
from the response.
+ // Complete any in-flight acknowledgements with the error code
from the response.
Map<TopicIdPartition, Acknowledgements>
nodeAcknowledgementsInFlight =
fetchAcknowledgementsInFlight.get(fetchTarget.id());
if (nodeAcknowledgementsInFlight != null) {
nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
acks.complete(Errors.forCode(response.error().code()).exception());
metricsManager.recordFailedAcknowledgements(acks.size());
});
+ if (requestData.isRenewAck()) {
+
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
+ }
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
nodeAcknowledgementsInFlight.clear();
}
@@ -772,12 +788,12 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
final Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
response.data().responses().forEach(topicResponse ->
- topicResponse.partitions().forEach(partition -> {
- TopicIdPartition tip =
lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
- if (tip != null) {
- responseData.put(tip, partition);
- }
- })
+ topicResponse.partitions().forEach(partition -> {
+ TopicIdPartition tip =
lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
+ if (tip != null) {
+ responseData.put(tip, partition);
+ }
+ })
);
final Set<TopicPartition> partitions =
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
@@ -802,6 +818,9 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
acks.complete(Errors.forCode(partitionData.acknowledgeErrorCode())
.exception(partitionData.acknowledgeErrorMessage()));
Map<TopicIdPartition, Acknowledgements> acksMap =
Map.of(tip, acks);
+ if (requestData.isRenewAck()) {
+
sendShareRenewAcknowledgementsCompleteEvent(acksMap);
+ }
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
}
}
@@ -839,6 +858,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition,
acknowledgements) -> {
acknowledgements.complete(new
InvalidRecordStateException(INVALID_RESPONSE));
+
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition,
acknowledgements));
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition,
acknowledgements));
});
}
@@ -886,6 +906,9 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
acks.complete(Errors.UNKNOWN_SERVER_ERROR.exception());
}
Map<TopicIdPartition, Acknowledgements> acksMap =
Map.of(tip, acks);
+ if (requestData.isRenewAck()) {
+
sendShareRenewAcknowledgementsCompleteEvent(nodeAcknowledgementsInFlight);
+ }
maybeSendShareAcknowledgeCommitCallbackEvent(acksMap);
}
}
@@ -917,7 +940,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (partitionData.errorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
}
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forCode(partitionData.errorCode()));
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forCode(partitionData.errorCode()), requestData.isRenewAck());
}));
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
@@ -944,7 +967,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
return;
}
- handlePartitionError(partitionData,
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip,
shouldRetry);
+ handlePartitionError(partitionData,
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState,
+ partitionError, tip, shouldRetry,
requestData.isRenewAck());
}));
processRetryLogic(acknowledgeRequestState, shouldRetry,
responseCompletionTimeMs);
@@ -990,7 +1014,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forException(error));
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forException(error), requestData.isRenewAck());
}));
acknowledgeRequestState.processingComplete();
@@ -1010,7 +1034,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
AcknowledgeRequestState
acknowledgeRequestState,
Errors partitionError,
TopicIdPartition tip,
- AtomicBoolean shouldRetry) {
+ AtomicBoolean shouldRetry,
+ boolean isRenewAck) {
if (partitionError.exception() != null) {
boolean retry = false;
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER ||
partitionError == Errors.FENCED_LEADER_EPOCH || partitionError ==
Errors.UNKNOWN_TOPIC_OR_PARTITION) {
@@ -1029,10 +1054,10 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
} else {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
partitionError);
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
partitionError, isRenewAck);
}
} else {
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
partitionError);
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
partitionError, isRenewAck);
}
}
@@ -1058,7 +1083,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
log.debug("For {}, received error {}, with leaderIdAndEpoch {} in
ShareAcknowledge", tp, partitionError, partitionData.currentLeader());
if (partitionData.currentLeader().leaderId() != -1 &&
partitionData.currentLeader().leaderEpoch() != -1) {
- partitionsWithUpdatedLeaderInfo.put(tp, new
Metadata.LeaderIdAndEpoch(
+ partitionsWithUpdatedLeaderInfo.put(tp,
+ new Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())
));
@@ -1186,7 +1212,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
Map<TopicIdPartition, Acknowledgements>
finalAcknowledgementsToSend = new HashMap<>(
- incompleteAcknowledgements.isEmpty() ?
acknowledgementsToSend : incompleteAcknowledgements);
+ incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend
: incompleteAcknowledgements);
for (Map.Entry<TopicIdPartition, Acknowledgements> entry :
finalAcknowledgementsToSend.entrySet()) {
sessionHandler.addPartitionToFetch(entry.getKey(),
entry.getValue());
@@ -1198,7 +1224,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Node nodeToSend = metadata.fetch().nodeById(nodeId);
if (requestBuilder == null) {
- handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
+ handleAcknowledgeShareSessionNotFound();
return null;
} else if (nodeToSend != null) {
nodesWithPendingRequests.add(nodeId);
@@ -1255,8 +1281,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
boolean isEmpty() {
return acknowledgementsToSend.isEmpty() &&
- incompleteAcknowledgements.isEmpty() &&
- inFlightAcknowledgements.isEmpty();
+ incompleteAcknowledgements.isEmpty() &&
+ inFlightAcknowledgements.isEmpty();
}
/**
@@ -1274,11 +1300,11 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* Sets the error code in the acknowledgements and sends the response
* through a background event.
*/
- void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors
acknowledgeErrorCode) {
+ void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors
acknowledgeErrorCode, boolean isRenewAck) {
Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
acks.complete(acknowledgeErrorCode.exception());
- resultHandler.complete(tip, acks, requestType);
+ resultHandler.complete(tip, acks, requestType, isRenewAck);
} else {
log.error("Invalid partition {} received in ShareAcknowledge
response", tip);
}
@@ -1292,24 +1318,28 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Acknowledgements acks = incompleteAcknowledgements.get(tip);
if (acks != null) {
acks.complete(Errors.REQUEST_TIMED_OUT.exception());
- resultHandler.complete(tip, acks, requestType);
+ // We do not know whether this is a renew ack, but handling
the error as if it were, will ensure
+ // that we do not leave dangling acknowledgements
+ resultHandler.complete(tip, acks, requestType, true);
}
}
/**
* Set the error code for all remaining acknowledgements in the event
- * of a session error which prevents the remaining acknowledgements
from
+ * of a share session not found error which prevents the remaining
acknowledgements from
* being sent.
*/
- void handleSessionErrorCode(Errors errorCode) {
+ void handleAcknowledgeShareSessionNotFound() {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToClear
=
- incompleteAcknowledgements.isEmpty() ?
acknowledgementsToSend : incompleteAcknowledgements;
+ incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend
: incompleteAcknowledgements;
acknowledgementsMapToClear.forEach((tip, acks) -> {
if (acks != null) {
- acks.complete(errorCode.exception());
+ acks.complete(Errors.SHARE_SESSION_NOT_FOUND.exception());
}
- resultHandler.complete(tip, acks, requestType);
+ // We do not know whether this is a renew ack, but handling
the error as if it were, will ensure
+ // that we do not leave dangling acknowledgements
+ resultHandler.complete(tip, acks, requestType, true);
});
acknowledgementsMapToClear.clear();
processingComplete();
@@ -1335,7 +1365,9 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (!inFlightAcknowledgements.isEmpty()) {
inFlightAcknowledgements.forEach((partition, acknowledgements)
-> {
acknowledgements.complete(exception);
- resultHandler.complete(partition, acknowledgements,
requestType);
+ // We do not know whether this is a renew ack, but
handling the error as if it were, will ensure
+ // that we do not leave dangling acknowledgements
+ resultHandler.complete(partition, acknowledgements,
requestType, true);
});
inFlightAcknowledgements.clear();
}
@@ -1404,9 +1436,12 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* Handle the result of a ShareAcknowledge request sent to one or more
nodes and
* signal the completion when all results are known.
*/
- public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, AcknowledgeRequestType type) {
+ public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck) {
if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
if (acknowledgements != null) {
+ if (isRenewAck) {
+
sendShareRenewAcknowledgementsCompleteEvent(Map.of(partition,
acknowledgements));
+ }
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition,
acknowledgements));
}
} else {
@@ -1414,6 +1449,9 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
result.put(partition, acknowledgements);
}
if (remainingResults != null &&
remainingResults.decrementAndGet() == 0) {
+ if (isRenewAck) {
+ sendShareRenewAcknowledgementsCompleteEvent(result);
+ }
maybeSendShareAcknowledgeCommitCallbackEvent(result);
future.ifPresent(future -> future.complete(result));
}
@@ -1486,14 +1524,6 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
this.partitionIndex = partitionIndex;
}
- int getPartitionIndex() {
- return partitionIndex;
- }
-
- Uuid getTopicId() {
- return topicId;
- }
-
@Override
public int hashCode() {
return Objects.hash(topicId, partitionIndex);
@@ -1505,7 +1535,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (o == null || getClass() != o.getClass()) return false;
IdAndPartition that = (IdAndPartition) o;
return Objects.equals(topicId, that.topicId) &&
- partitionIndex == that.partitionIndex;
+ partitionIndex == that.partitionIndex;
}
}
@@ -1525,8 +1555,4 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
return super.toString().toLowerCase(Locale.ROOT);
}
}
-
- Map<TopicIdPartition, Acknowledgements>
getFetchAcknowledgementsToSend(Integer nodeId) {
- return fetchAcknowledgementsToSend.get(nodeId);
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index af925b316da..a2274b8e9cd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -45,6 +45,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCo
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -146,6 +147,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
process((ShareAcknowledgementCommitCallbackEvent) event);
break;
+ case SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE:
+ process((ShareRenewAcknowledgementsCompleteEvent) event);
+ break;
+
default:
throw new IllegalArgumentException("Background event type
" + event.type() + " was not expected");
}
@@ -160,6 +165,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
completedAcknowledgements.add(event.acknowledgementsMap());
}
}
+
+ private void process(final ShareRenewAcknowledgementsCompleteEvent
event) {
+ currentFetch.renew(event.acknowledgementsMap());
+ }
}
private final ApplicationEventHandler applicationEventHandler;
@@ -576,6 +585,9 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
// If using implicit acknowledgement, acknowledge the previously
fetched records
acknowledgeBatchIfImplicitAcknowledgement();
+ // If using explicit acknowledgement, make sure all in-flight
records have been acknowledged
+ ensureInFlightAcknowledgedIfExplicitAcknowledgement();
+
kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
if (subscriptions.hasNoSubscriptionOrUserAssignment()) {
@@ -654,39 +666,55 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> acknowledgementsMap) {
- if (currentFetch.isEmpty()) {
+ Map<TopicIdPartition, NodeAcknowledgements> acksToSend =
acknowledgementsMap;
+
+ if (currentFetch.isEmpty() && !currentFetch.hasRenewals()) {
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
if (fetch.isEmpty()) {
+ // Check for any acknowledgements which could have come from
control records (GAP) and send them.
Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();
-
if (!controlRecordAcknowledgements.isEmpty()) {
// Asynchronously commit any waiting acknowledgements from
control records.
sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
}
+
// We only send one ShareFetchEvent per poll call.
if (shouldSendShareFetchEvent) {
- // Check for any acknowledgements which could have come
from control records (GAP) and include them.
- applicationEventHandler.add(new
ShareFetchEvent(acknowledgementsMap));
+ applicationEventHandler.add(new
ShareFetchEvent(acksToSend));
shouldSendShareFetchEvent = false;
// Notify the network thread to wake up and start the next
round of fetching
applicationEventHandler.wakeupNetworkThread();
+ acksToSend = Map.of();
}
- } else if (!acknowledgementsMap.isEmpty()) {
- // Asynchronously commit any waiting acknowledgements
- sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
}
- return fetch;
- } else {
- if (!acknowledgementsMap.isEmpty()) {
+
+ if (!acksToSend.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
- sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
+ sendShareAcknowledgeAsyncEvent(acksToSend);
}
- if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
- // We cannot leave unacknowledged records in EXPLICIT
acknowledgement mode, so we throw an exception to the application.
- throw new IllegalStateException("All records must be
acknowledged in explicit acknowledgement mode.");
+ return fetch;
+ } else if (currentFetch.hasRenewals()) {
+ // First, take any records which have been renewed and move them
back into in-flight records.
+ currentFetch.takeRenewedRecords();
+
+ // If some records are in renewing state...
+ if (currentFetch.hasRenewals()) {
+ // We only send one ShareFetchEvent per poll call.
+ if (shouldSendShareFetchEvent) {
+ applicationEventHandler.add(new
ShareFetchEvent(acksToSend));
+ shouldSendShareFetchEvent = false;
+ // Notify the network thread to wake up and start the next
round of fetching
+ applicationEventHandler.wakeupNetworkThread();
+ acksToSend = Map.of();
+ }
}
- return currentFetch;
}
+
+ if (!acksToSend.isEmpty()) {
+ // Asynchronously commit any waiting acknowledgements
+ sendShareAcknowledgeAsyncEvent(acksToSend);
+ }
+ return currentFetch;
}
private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition,
NodeAcknowledgements> acknowledgementsMap) {
@@ -1107,6 +1135,18 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
}
+ /**
+ * If the acknowledgement mode is EXPLICIT, ensure that all in-flight
records have been acknowledged.
+ */
+ private void ensureInFlightAcknowledgedIfExplicitAcknowledgement() {
+ if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
+ if (!currentFetch.checkAllInFlightAreAcknowledged()) {
+ // We cannot leave unacknowledged records in EXPLICIT
acknowledgement mode, so we throw an exception to the application.
+ throw new IllegalStateException("All records must be
acknowledged in explicit acknowledgement mode.");
+ }
+ }
+ }
+
/**
* Returns any ready acknowledgements to be sent to the cluster.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 406110fe502..6eb3ba7d77d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -89,7 +89,9 @@ public class ShareFetch<K, V> {
Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry =
iterator.next();
ShareInFlightBatch<K, V> batch = entry.getValue();
if (batch.isEmpty()) {
- iterator.remove();
+ if (!batch.hasRenewals()) {
+ iterator.remove();
+ }
} else {
numRecords += batch.numRecords();
}
@@ -106,6 +108,29 @@ public class ShareFetch<K, V> {
return numRecords() == 0;
}
+ /**
+ * @return {@code true} if this fetch contains records being renewed
+ */
+ public boolean hasRenewals() {
+ boolean hasRenewals = false;
+ for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry :
batches.entrySet()) {
+ if (entry.getValue().hasRenewals()) {
+ hasRenewals = true;
+ break;
+ }
+ }
+ return hasRenewals;
+ }
+
+ /**
+ * Take any renewed records and move them back into in-flight state.
+ */
+ public void takeRenewedRecords() {
+ for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry :
batches.entrySet()) {
+ entry.getValue().takeRenewals();
+ }
+ }
+
/**
* Acknowledge a single record in the current batch.
*
@@ -124,7 +149,9 @@ public class ShareFetch<K, V> {
}
/**
- * Acknowledge a single record by its topic, partition and offset in the
current batch.
+ * Acknowledge a single record which experienced an exception during its
delivery by its topic, partition
+ * and offset in the current batch. This method is specifically for
overriding the default acknowledge
+ * type for records whose delivery failed.
*
* @param topic The topic of the record to acknowledge
* @param partition The partition of the record
@@ -156,6 +183,23 @@ public class ShareFetch<K, V> {
batches.forEach((tip, batch) -> batch.acknowledgeAll(type));
}
+ /**
+ * Checks whether all in-flight records have been acknowledged. This is
required for explicit
+ * acknowledgement mode.
+ *
+ * @return Whether all in-flight records have been acknowledged
+ */
+ public boolean checkAllInFlightAreAcknowledged() {
+ boolean allInFlightAreAcknowledged = true;
+ for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry :
batches.entrySet()) {
+ if (!entry.getValue().checkAllInFlightAreAcknowledged()) {
+ allInFlightAreAcknowledged = false;
+ break;
+ }
+ }
+ return allInFlightAreAcknowledged;
+ }
+
/**
* Removes all acknowledged records from the in-flight records and returns
the map of acknowledgements
* to send. If some records were not acknowledged, the in-flight records
will not be empty after this
@@ -173,4 +217,21 @@ public class ShareFetch<K, V> {
});
return acknowledgementMap;
}
+
+ /**
+ * Handles completed renew acknowledgements by returning successfully
renewed records
+ * to the set of in-flight records.
+ *
+ * @param acknowledgementsMap Map from topic-partition to acknowledgements
for
+ * completed renew acknowledgements
+ *
+ * @return The number of records renewed
+ */
+ public int renew(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap) {
+ int recordsRenewed = 0;
+ for (Map.Entry<TopicIdPartition, Acknowledgements> entry :
acknowledgementsMap.entrySet()) {
+ recordsRenewed +=
batches.get(entry.getKey()).renew(entry.getValue());
+ }
+ return recordsRenewed;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
index 0fa0499aa1f..34051fc4fdb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicIdPartition;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,43 +30,56 @@ import java.util.TreeSet;
public class ShareInFlightBatch<K, V> {
private final int nodeId;
- final TopicIdPartition partition;
+ private final TopicIdPartition partition;
private final Map<Long, ConsumerRecord<K, V>> inFlightRecords;
+ private Map<Long, ConsumerRecord<K, V>> renewingRecords;
+ private Map<Long, ConsumerRecord<K, V>> renewedRecords;
private final Set<Long> acknowledgedRecords;
private Acknowledgements acknowledgements;
private ShareInFlightBatchException exception;
private boolean hasCachedException = false;
+ private boolean checkForRenewAcknowledgements = false;
public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
this.nodeId = nodeId;
this.partition = partition;
- inFlightRecords = new TreeMap<>();
- acknowledgedRecords = new TreeSet<>();
- acknowledgements = Acknowledgements.empty();
+ this.inFlightRecords = new TreeMap<>();
+ this.acknowledgedRecords = new TreeSet<>();
+ this.acknowledgements = Acknowledgements.empty();
}
- public void addAcknowledgement(long offset, AcknowledgeType
acknowledgeType) {
- acknowledgements.add(offset, acknowledgeType);
+ public void addAcknowledgement(long offset, AcknowledgeType type) {
+ acknowledgements.add(offset, type);
+ if (type == AcknowledgeType.RENEW) {
+ checkForRenewAcknowledgements = true;
+ }
}
public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type)
{
if (inFlightRecords.get(record.offset()) != null) {
acknowledgements.add(record.offset(), type);
acknowledgedRecords.add(record.offset());
+ if (type == AcknowledgeType.RENEW) {
+ checkForRenewAcknowledgements = true;
+ }
return;
}
throw new IllegalStateException("The record cannot be acknowledged.");
}
- public int acknowledgeAll(AcknowledgeType type) {
- int recordsAcknowledged = 0;
+ public void acknowledgeAll(AcknowledgeType type) {
for (Map.Entry<Long, ConsumerRecord<K, V>> entry :
inFlightRecords.entrySet()) {
if (acknowledgements.addIfAbsent(entry.getKey(), type)) {
acknowledgedRecords.add(entry.getKey());
- recordsAcknowledged++;
}
}
- return recordsAcknowledged;
+ if (type == AcknowledgeType.RENEW) {
+ checkForRenewAcknowledgements = true;
+ }
+ }
+
+ public boolean checkAllInFlightAreAcknowledged() {
+ return inFlightRecords.size() == acknowledgedRecords.size();
}
public void addRecord(ConsumerRecord<K, V> record) {
@@ -78,6 +92,9 @@ public class ShareInFlightBatch<K, V> {
public void merge(ShareInFlightBatch<K, V> other) {
inFlightRecords.putAll(other.inFlightRecords);
+ if (other.checkForRenewAcknowledgements) {
+ checkForRenewAcknowledgements = true;
+ }
}
List<ConsumerRecord<K, V>> getInFlightRecords() {
@@ -93,6 +110,21 @@ public class ShareInFlightBatch<K, V> {
}
Acknowledgements takeAcknowledgedRecords() {
+ if (checkForRenewAcknowledgements) {
+ if (renewingRecords == null) {
+ renewingRecords = new HashMap<>();
+ }
+ if (renewedRecords == null) {
+ renewedRecords = new HashMap<>();
+ }
+ Map<Long, AcknowledgeType> ackTypeMap =
acknowledgements.getAcknowledgementsTypeMap();
+ acknowledgedRecords.forEach(offset -> {
+ if (ackTypeMap.get(offset) == AcknowledgeType.RENEW) {
+ renewingRecords.put(offset, inFlightRecords.get(offset));
+ }
+ });
+ }
+
// Usually, all records will be acknowledged, so we can just clear the
in-flight records leaving
// an empty batch, which will trigger more fetching
if (acknowledgedRecords.size() == inFlightRecords.size()) {
@@ -105,9 +137,47 @@ public class ShareInFlightBatch<K, V> {
Acknowledgements currentAcknowledgements = acknowledgements;
acknowledgements = Acknowledgements.empty();
+ checkForRenewAcknowledgements = false;
return currentAcknowledgements;
}
+ int renew(Acknowledgements acknowledgements) {
+ int recordsRenewed = 0;
+ boolean isCompletedExceptionally =
acknowledgements.isCompletedExceptionally();
+ if (acknowledgements.isCompleted()) {
+ Map<Long, AcknowledgeType> ackTypeMap =
acknowledgements.getAcknowledgementsTypeMap();
+ for (Map.Entry<Long, AcknowledgeType> ackTypeEntry :
ackTypeMap.entrySet()) {
+ long offset = ackTypeEntry.getKey();
+ AcknowledgeType ackType = ackTypeEntry.getValue();
+ ConsumerRecord<K, V> record = renewingRecords.remove(offset);
+ if (ackType == AcknowledgeType.RENEW) {
+ if (record != null && !isCompletedExceptionally) {
+ // The record is moved into renewed state, and will
then become in-flight later.
+ renewedRecords.put(offset, record);
+ recordsRenewed++;
+ }
+ }
+ }
+ } else {
+ throw new IllegalStateException("Renewing with uncompleted
acknowledgements");
+ }
+ return recordsRenewed;
+ }
+
+ boolean hasRenewals() {
+ if (renewingRecords == null) {
+ return false;
+ }
+ return !renewingRecords.isEmpty() || !renewedRecords.isEmpty();
+ }
+
+ void takeRenewals() {
+ if (renewedRecords != null) {
+ inFlightRecords.putAll(renewedRecords);
+ renewedRecords.clear();
+ }
+ }
+
Acknowledgements getAcknowledgements() {
return acknowledgements;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 6dc56ee26f1..348855a341b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -169,18 +170,39 @@ public class ShareSessionHandler {
// The replaced topic-partitions need to be removed, and their
replacements are already added
removed.addAll(replaced);
+ boolean hasRenewAcknowledgements = false;
Map<TopicIdPartition,
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementBatches = new
HashMap<>();
- nextAcknowledgements.forEach((partition, acknowledgements) ->
acknowledgementBatches.put(partition,
acknowledgements.getAcknowledgementBatches()
- .stream().map(AcknowledgementBatch::toShareFetchRequest)
- .collect(Collectors.toList())));
+ if (!nextAcknowledgements.isEmpty()) {
+ for (Map.Entry<TopicIdPartition, Acknowledgements> partitionsAcks
: nextAcknowledgements.entrySet()) {
+ List<AcknowledgementBatch> partitionAckBatches =
partitionsAcks.getValue().getAcknowledgementBatches();
+ for (AcknowledgementBatch ackBatch : partitionAckBatches) {
+ if
(ackBatch.acknowledgeTypes().contains(AcknowledgeType.RENEW.id)) {
+ hasRenewAcknowledgements = true;
+ }
+
acknowledgementBatches.computeIfAbsent(partitionsAcks.getKey(), k -> new
ArrayList<>()).add(ackBatch.toShareFetchRequest());
+ }
+ }
+ }
nextPartitions = new LinkedHashMap<>();
nextAcknowledgements = new LinkedHashMap<>();
- return ShareFetchRequest.Builder.forConsumer(
+ if (hasRenewAcknowledgements) {
+ // If the request has renew acknowledgements, the ShareFetch is
only used to send the acknowledgements
+ // and potentially update the share session. The parameters for
wait time, number of bytes and number of
+ // records are all zero.
+ return ShareFetchRequest.Builder.forConsumer(
+ groupId, nextMetadata, 0,
+ 0, 0, 0,
+ 0, shareFetchConfig.shareAcquireMode.id, true,
+ added, removed, acknowledgementBatches);
+ } else {
+ return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, shareFetchConfig.maxWaitMs,
shareFetchConfig.minBytes, shareFetchConfig.maxBytes,
shareFetchConfig.maxPollRecords,
- shareFetchConfig.maxPollRecords,
shareFetchConfig.shareAcquireMode.id, added, removed, acknowledgementBatches);
+ shareFetchConfig.maxPollRecords,
shareFetchConfig.shareAcquireMode.id, false,
+ added, removed, acknowledgementBatches);
+ }
}
public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String
groupId, ShareFetchConfig shareFetchConfig) {
@@ -191,15 +213,23 @@ public class ShareSessionHandler {
return null;
}
+ boolean hasRenewAcknowledgements = false;
Map<TopicIdPartition,
List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgementBatches
= new HashMap<>();
- nextAcknowledgements.forEach((partition, acknowledgements) ->
- acknowledgementBatches.put(partition,
acknowledgements.getAcknowledgementBatches()
-
.stream().map(AcknowledgementBatch::toShareAcknowledgeRequest)
- .collect(Collectors.toList())));
+ if (!nextAcknowledgements.isEmpty()) {
+ for (Map.Entry<TopicIdPartition, Acknowledgements> partitionsAcks
: nextAcknowledgements.entrySet()) {
+ List<AcknowledgementBatch> partitionAckBatches =
partitionsAcks.getValue().getAcknowledgementBatches();
+ for (AcknowledgementBatch ackBatch : partitionAckBatches) {
+ if
(ackBatch.acknowledgeTypes().contains(AcknowledgeType.RENEW.id)) {
+ hasRenewAcknowledgements = true;
+ }
+
acknowledgementBatches.computeIfAbsent(partitionsAcks.getKey(), k -> new
ArrayList<>()).add(ackBatch.toShareAcknowledgeRequest());
+ }
+ }
+ }
nextAcknowledgements = new LinkedHashMap<>();
- return ShareAcknowledgeRequest.Builder.forConsumer(groupId,
nextMetadata, acknowledgementBatches);
+ return ShareAcknowledgeRequest.Builder.forConsumer(groupId,
nextMetadata, hasRenewAcknowledgements, acknowledgementBatches);
}
private String topicIdPartitionsToLogString(Collection<TopicIdPartition>
partitions) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 6fa737c7278..9f84df9414d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -29,6 +29,7 @@ public abstract class BackgroundEvent {
ERROR,
CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK,
+ SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE,
STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
new file mode 100644
index 00000000000..1151ac04a52
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareRenewAcknowledgementsCompleteEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.Acknowledgements;
+import org.apache.kafka.common.TopicIdPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ShareRenewAcknowledgementsCompleteEvent extends BackgroundEvent {
+
+ private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+
+ public ShareRenewAcknowledgementsCompleteEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
+ super(Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE);
+ this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
+ }
+
+ public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
+ return acknowledgementsMap;
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", acknowledgementsMap=" +
acknowledgementsMap;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
index 43bec9c5fda..1f5bba66b8c 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
@@ -38,7 +38,7 @@ public class ShareAcknowledgeRequest extends AbstractRequest {
this.data = data;
}
- public static ShareAcknowledgeRequest.Builder forConsumer(String
groupId, ShareRequestMetadata metadata,
+ public static ShareAcknowledgeRequest.Builder forConsumer(String
groupId, ShareRequestMetadata metadata, boolean isRenewAck,
Map<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>>
acknowledgementsMap) {
ShareAcknowledgeRequestData data = new
ShareAcknowledgeRequestData();
data.setGroupId(groupId);
@@ -46,6 +46,7 @@ public class ShareAcknowledgeRequest extends AbstractRequest {
data.setMemberId(metadata.memberId().toString());
data.setShareSessionEpoch(metadata.epoch());
}
+ data.setIsRenewAck(isRenewAck);
ShareAcknowledgeRequestData.AcknowledgeTopicCollection ackTopics =
new ShareAcknowledgeRequestData.AcknowledgeTopicCollection();
for (Map.Entry<TopicIdPartition,
List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgeEntry :
acknowledgementsMap.entrySet()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index ec044bca955..53f9373d35f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -45,7 +45,8 @@ public class ShareFetchRequest extends AbstractRequest {
public static Builder forConsumer(String groupId, ShareRequestMetadata
metadata,
int maxWait, int minBytes, int
maxBytes, int maxRecords,
- int batchSize, byte
shareAcquireMode, List<TopicIdPartition> send, List<TopicIdPartition> forget,
+ int batchSize, byte
shareAcquireMode, boolean isRenewAck,
+ List<TopicIdPartition> send,
List<TopicIdPartition> forget,
Map<TopicIdPartition,
List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
data.setGroupId(groupId);
@@ -63,6 +64,7 @@ public class ShareFetchRequest extends AbstractRequest {
data.setMaxRecords(maxRecords);
data.setBatchSize(batchSize);
data.setShareAcquireMode(shareAcquireMode);
+ data.setIsRenewAck(isRenewAck);
// Build a map of topics to fetch keyed by topic ID, and within
each a map of partitions keyed by index
ShareFetchRequestData.FetchTopicCollection fetchTopics = new
ShareFetchRequestData.FetchTopicCollection();
@@ -147,7 +149,7 @@ public class ShareFetchRequest extends AbstractRequest {
}
// The v1 only supports ShareAcquireMode.BATCH_OPTIMIZED.
if (data.shareAcquireMode() !=
ShareAcquireMode.BATCH_OPTIMIZED.id()) {
- throw new UnsupportedVersionException("The v1 ShareFetch
does not support ShareAcquireMode.RECORD_LIMIT");
+ throw new UnsupportedVersionException("The v1 ShareFetch
only supports ShareAcquireMode.BATCH_OPTIMIZED");
}
}
return new ShareFetchRequest(data, version);
diff --git a/clients/src/main/resources/common/message/ShareFetchRequest.json
b/clients/src/main/resources/common/message/ShareFetchRequest.json
index afc08813711..1357858cc41 100644
--- a/clients/src/main/resources/common/message/ShareFetchRequest.json
+++ b/clients/src/main/resources/common/message/ShareFetchRequest.json
@@ -43,7 +43,7 @@
{ "name": "BatchSize", "type": "int32", "versions": "1+",
"about": "The optimal number of records for batches of acquired records
and acknowledgements." },
{ "name": "ShareAcquireMode", "type": "int8", "versions": "2+", "default":
"0", "ignorable": true,
- "about": "The acquire mode to control the fetch behavior: 0 -
batch-optimized, 1 - record-limit" },
+ "about": "The acquire mode to control the fetch behavior -
0:batch-optimized,1:record-limit." },
{ "name": "IsRenewAck", "type": "bool", "versions": "2+", "default":
"false",
"about": "Whether Renew type acknowledgements present in
AcknowledgementBatches." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
index b6818ab51b5..60836465003 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementsTest.java
@@ -17,13 +17,17 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AcknowledgementsTest {
@@ -475,7 +479,6 @@ public class AcknowledgementsTest {
assertEquals(AcknowledgeType.REJECT.id,
ackList2.get(2).acknowledgeTypes().get(0));
}
-
@Test
public void testNoncontiguousGaps() {
acks.addGap(2L);
@@ -503,4 +506,24 @@ public class AcknowledgementsTest {
assertEquals(1, ackList2.get(1).acknowledgeTypes().size());
assertEquals(Acknowledgements.ACKNOWLEDGE_TYPE_GAP,
ackList2.get(1).acknowledgeTypes().get(0));
}
+
+ @Test
+ public void testCompleteSuccess() {
+ acks.add(0, AcknowledgeType.RENEW);
+ assertFalse(acks.isCompleted());
+
+ acks.complete(null);
+ assertTrue(acks.isCompleted());
+ assertNull(acks.getAcknowledgeException());
+ }
+
+ @Test
+ public void testCompleteException() {
+ acks.add(0, AcknowledgeType.RENEW);
+ assertFalse(acks.isCompleted());
+
+ acks.complete(new KafkaException());
+ assertTrue(acks.isCompleted());
+ assertNotNull(acks.getAcknowledgeException());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index 95f16966292..a6a4108189c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -74,8 +74,8 @@ public class ShareCompletedFetchTest {
int numRecordsPerBatch = 10;
int numRecords = 20; // Records for 10-29, in 2 equal batches
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecordsPerBatch, 2))
- .setAcquiredRecords(acquiredRecords(startingOffset,
numRecords));
+ .setRecords(newRecords(startingOffset, numRecordsPerBatch, 2))
+ .setAcquiredRecords(acquiredRecords(startingOffset, numRecords));
Deserializers<String, String> deserializers = newStringDeserializers();
@@ -139,8 +139,8 @@ public class ShareCompletedFetchTest {
long startingOffset = 10L;
int numRecords = 10;
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecords + 500))
- .setAcquiredRecords(acquiredRecords(startingOffset + 500,
numRecords));
+ .setRecords(newRecords(startingOffset, numRecords + 500))
+ .setAcquiredRecords(acquiredRecords(startingOffset + 500,
numRecords));
Deserializers<String, String> deserializers = newStringDeserializers();
@@ -167,8 +167,9 @@ public class ShareCompletedFetchTest {
int numRecords = 10;
Records rawRecords = newTransactionalRecords(numRecords);
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(rawRecords)
- .setAcquiredRecords(acquiredRecords(0L, numRecords));
+ .setRecords(rawRecords)
+ .setAcquiredRecords(acquiredRecords(0L, numRecords));
+
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 10, true);
@@ -184,8 +185,8 @@ public class ShareCompletedFetchTest {
int startingOffset = 0;
int numRecords = 10;
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecords))
- .setAcquiredRecords(acquiredRecords(0L, 10));
+ .setRecords(newRecords(startingOffset, numRecords))
+ .setAcquiredRecords(acquiredRecords(0L, 10));
try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
@@ -200,7 +201,7 @@ public class ShareCompletedFetchTest {
@Test
public void testNoRecordsInFetch() {
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setPartitionIndex(0);
+ .setPartitionIndex(0);
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
@@ -219,7 +220,7 @@ public class ShareCompletedFetchTest {
Compression.NONE,
TimestampType.CREATE_TIME,
0);
- final UUIDSerializer serializer = new UUIDSerializer()) {
+ final UUIDSerializer serializer = new UUIDSerializer()) {
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME,
UUID.randomUUID())));
builder.append(0L, "key".getBytes(), "value".getBytes());
Headers headers = new RecordHeaders();
@@ -229,9 +230,9 @@ public class ShareCompletedFetchTest {
Records records = builder.build();
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setPartitionIndex(0)
- .setRecords(records)
- .setAcquiredRecords(acquiredRecords(0L, 4));
+ .setPartitionIndex(0)
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords(0L, 4));
try (final Deserializers<UUID, UUID> deserializers =
newUuidDeserializers()) {
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
@@ -301,8 +302,8 @@ public class ShareCompletedFetchTest {
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>(acquiredRecords(0L, 3));
acquiredRecords.addAll(acquiredRecords(6L, 3));
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecords))
- .setAcquiredRecords(acquiredRecords);
+ .setRecords(newRecords(startingOffset, numRecords))
+ .setAcquiredRecords(acquiredRecords);
Deserializers<String, String> deserializers = newStringDeserializers();
@@ -335,8 +336,8 @@ public class ShareCompletedFetchTest {
}
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecords))
- .setAcquiredRecords(acquiredRecords);
+ .setRecords(newRecords(startingOffset, numRecords))
+ .setAcquiredRecords(acquiredRecords);
Deserializers<String, String> deserializers = newStringDeserializers();
@@ -366,17 +367,17 @@ public class ShareCompletedFetchTest {
// Offsets 5-9 will be duplicates
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
- .setFirstOffset(0L)
- .setLastOffset(9L)
- .setDeliveryCount((short) 1));
+ .setFirstOffset(0L)
+ .setLastOffset(9L)
+ .setDeliveryCount((short) 1));
acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
- .setFirstOffset(5L)
- .setLastOffset(14L)
- .setDeliveryCount((short) 2));
+ .setFirstOffset(5L)
+ .setLastOffset(14L)
+ .setDeliveryCount((short) 2));
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecords))
- .setAcquiredRecords(acquiredRecords);
+ .setRecords(newRecords(startingOffset, numRecords))
+ .setAcquiredRecords(acquiredRecords);
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
@@ -392,25 +393,24 @@ public class ShareCompletedFetchTest {
// Verify first occurrence (offset 5 should have deliveryCount=1 from
first range)
ConsumerRecord<String, String> record5 = records.stream()
- .filter(r -> r.offset() == 5L)
- .findFirst()
- .orElse(null);
+ .filter(r -> r.offset() == 5L)
+ .findFirst()
+ .orElse(null);
assertNotNull(record5);
assertEquals(Optional.of((short) 1), record5.deliveryCount());
// Verify offset 10 has deliveryCount=2 from second range
ConsumerRecord<String, String> record10 = records.stream()
- .filter(r -> r.offset() == 10L)
- .findFirst()
- .orElse(null);
+ .filter(r -> r.offset() == 10L)
+ .findFirst()
+ .orElse(null);
assertNotNull(record10);
assertEquals(Optional.of((short) 2), record10.deliveryCount());
// Verify all offsets are unique
Set<Long> offsetSet = new HashSet<>();
for (ConsumerRecord<String, String> record : records) {
- assertTrue(offsetSet.add(record.offset()),
- "Duplicate offset found in results: " + record.offset());
+ assertTrue(offsetSet.add(record.offset()), "Duplicate offset found
in results: " + record.offset());
}
}
@@ -423,13 +423,13 @@ public class ShareCompletedFetchTest {
ShareFetchMetricsAggregator shareFetchMetricsAggregator = new
ShareFetchMetricsAggregator(shareFetchMetricsManager, partitionSet);
return new ShareCompletedFetch(
- logContext,
- BufferSupplier.create(),
- 0,
- TIP,
- partitionData,
- shareFetchMetricsAggregator,
- ApiKeys.SHARE_FETCH.latestVersion());
+ logContext,
+ BufferSupplier.create(),
+ 0,
+ TIP,
+ partitionData,
+ shareFetchMetricsAggregator,
+ ApiKeys.SHARE_FETCH.latestVersion());
}
private static Deserializers<UUID, UUID> newUuidDeserializers() {
@@ -481,9 +481,9 @@ public class ShareCompletedFetchTest {
public static List<ShareFetchResponseData.AcquiredRecords>
acquiredRecords(long firstOffset, int count) {
ShareFetchResponseData.AcquiredRecords acquiredRecords = new
ShareFetchResponseData.AcquiredRecords()
- .setFirstOffset(firstOffset)
- .setLastOffset(firstOffset + count - 1)
- .setDeliveryCount((short) 1);
+ .setFirstOffset(firstOffset)
+ .setLastOffset(firstOffset + count - 1)
+ .setDeliveryCount((short) 1);
return Collections.singletonList(acquiredRecords);
}
@@ -518,11 +518,11 @@ public class ShareCompletedFetchTest {
int offset,
Time time) {
MemoryRecords.writeEndTransactionalMarker(buffer,
- offset,
- time.milliseconds(),
- 0,
- PRODUCER_ID,
- PRODUCER_EPOCH,
- new EndTransactionMarker(ControlRecordType.COMMIT, 0));
+ offset,
+ time.milliseconds(),
+ 0,
+ PRODUCER_ID,
+ PRODUCER_EPOCH,
+ new EndTransactionMarker(ControlRecordType.COMMIT, 0));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 11d5adb8a0f..d2abe110ceb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -130,7 +131,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity",
"JavaNCSS"})
public class ShareConsumeRequestManagerTest {
private final String topicName = "test";
private final String topicName2 = "test-2";
@@ -175,6 +176,7 @@ public class ShareConsumeRequestManagerTest {
private List<ShareFetchResponseData.AcquiredRecords> emptyAcquiredRecords;
private ShareFetchMetricsRegistry shareFetchMetricsRegistry;
private List<Map<TopicIdPartition, Acknowledgements>>
completedAcknowledgements;
+ private HashSet<Long> renewedRecords;
@BeforeEach
public void setup() {
@@ -182,6 +184,7 @@ public class ShareConsumeRequestManagerTest {
acquiredRecords = ShareCompletedFetchTest.acquiredRecords(1L, 3);
emptyAcquiredRecords = new ArrayList<>();
completedAcknowledgements = new LinkedList<>();
+ renewedRecords = new HashSet<>();
}
private void assignFromSubscribed(Set<TopicPartition> partitions) {
@@ -482,16 +485,16 @@ public class ShareConsumeRequestManagerTest {
ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
// Passing null acknowledgements should mean we do not send the
background event at all.
- resultHandler.complete(tip0, null,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
+ resultHandler.complete(tip0, null,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
assertEquals(0, completedAcknowledgements.size());
// Setting the request type to COMMIT_SYNC should still not send any
background event
// as we have initialized remainingResults to null.
- resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+ resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
assertEquals(0, completedAcknowledgements.size());
// Sending non-null acknowledgements means we do send the background
event
- resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
+ resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
}
@@ -511,16 +514,16 @@ public class ShareConsumeRequestManagerTest {
ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
// We only send the background event after all results have been
completed.
- resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+ resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());
- resultHandler.complete(t2ip0, null,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+ resultHandler.complete(t2ip0, null,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());
// After third response is received, we send the background event.
- resultHandler.complete(tip1, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
+ resultHandler.complete(tip1, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
assertEquals(1, completedAcknowledgements.size());
assertEquals(2, completedAcknowledgements.get(0).size());
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
@@ -985,6 +988,7 @@ public class ShareConsumeRequestManagerTest {
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
+
// Subscription changes.
subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
@@ -1024,6 +1028,7 @@ public class ShareConsumeRequestManagerTest {
// Send acknowledgements via ShareFetch
shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
+
// Subscription changes.
subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
@@ -1092,7 +1097,6 @@ public class ShareConsumeRequestManagerTest {
// Change the subscription.
subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
-
// Now we will be sending the request to node1 only as leader for tip1
is node1.
// We do not build the request for tip0 as there are no
acknowledgements to send.
NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
@@ -2539,6 +2543,63 @@ public class ShareConsumeRequestManagerTest {
}
}
+ @Test
+ public void testShareFetchWithRenewAcknowledgement() {
+ buildRequestManager();
+
+ assignFromSubscribed(Collections.singleton(tp0));
+ sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
+
+ Acknowledgements acknowledgements = getAcknowledgements(1,
+ AcknowledgeType.RENEW, AcknowledgeType.RENEW,
AcknowledgeType.RENEW);
+
+ // Reading records from the share fetch buffer.
+ fetchRecords();
+
+ // Piggyback acknowledgements
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
+
+ NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
+ assertEquals(1, pollResult.unsentRequests.size());
+ ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder)
pollResult.unsentRequests.get(0).requestBuilder();
+ assertTrue(builder.data().isRenewAck());
+
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ assertEquals(3.0,
+
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
+
+ assertEquals(0, renewedRecords.size());
+
+ client.prepareResponse(fullFetchResponse(tip0, MemoryRecords.EMPTY,
List.of(), Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ assertEquals(3, renewedRecords.size());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.isEmpty());
+
+ Acknowledgements acknowledgements2 = getAcknowledgements(1,
+ AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT,
AcknowledgeType.ACCEPT);
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.containsKey(tp0));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+ assertEquals(6.0,
+
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
+ }
+
private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition
tp, Errors error) {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions
= Map.of(tp,
new ShareFetchResponseData.PartitionData()
@@ -2702,7 +2763,7 @@ public class ShareConsumeRequestManagerTest {
subscriptions,
shareFetchConfig,
deserializers);
- BackgroundEventHandler backgroundEventHandler = new
TestableBackgroundEventHandler(time, completedAcknowledgements);
+ BackgroundEventHandler backgroundEventHandler = new
TestableBackgroundEventHandler(time, completedAcknowledgements, renewedRecords);
shareConsumeRequestManager = spy(new
TestableShareConsumeRequestManager<>(
logContext,
groupId,
@@ -2896,16 +2957,22 @@ public class ShareConsumeRequestManagerTest {
private static class TestableBackgroundEventHandler extends
BackgroundEventHandler {
List<Map<TopicIdPartition, Acknowledgements>>
completedAcknowledgements;
+ Set<Long> renewedRecords;
- public TestableBackgroundEventHandler(Time time,
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements) {
+ public TestableBackgroundEventHandler(Time time,
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements,
Set<Long> renewedRecords) {
super(new LinkedBlockingQueue<>(), time,
mock(AsyncConsumerMetrics.class));
this.completedAcknowledgements = completedAcknowledgements;
+ this.renewedRecords = renewedRecords;
}
public void add(BackgroundEvent event) {
if (event.type() ==
BackgroundEvent.Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK) {
ShareAcknowledgementCommitCallbackEvent
shareAcknowledgementCommitCallbackEvent =
(ShareAcknowledgementCommitCallbackEvent) event;
completedAcknowledgements.add(shareAcknowledgementCommitCallbackEvent.acknowledgementsMap());
+ } else if (event.type() ==
BackgroundEvent.Type.SHARE_RENEW_ACKNOWLEDGEMENTS_COMPLETE) {
+ ShareRenewAcknowledgementsCompleteEvent
shareRenewAcknowledgementsCompleteEvent =
(ShareRenewAcknowledgementsCompleteEvent) event;
+
shareRenewAcknowledgementsCompleteEvent.acknowledgementsMap().values().forEach(acks
->
+ acks.getAcknowledgementsTypeMap().forEach((offset,
ackType) -> renewedRecords.add(offset)));
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 9a6da634c50..6e819a6888a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,6 +31,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCo
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareRenewAcknowledgementsCompleteEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@@ -467,6 +469,91 @@ public class ShareConsumerImplTest {
assertEquals(2, newRecords.count(), "Should have received 2 new
records");
}
+ @Test
+ public void testExplicitModeRenewAndAcknowledgeOnPoll() {
+ // Setup consumer with explicit acknowledgement mode
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(
+ mock(ShareFetchBuffer.class),
+ subscriptions,
+ "group-id",
+ "client-id",
+ "explicit");
+
+ // Setup test data
+ String topic = "test-topic";
+ int partition = 0;
+ TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
partition, topic);
+ ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0,
tip);
+ batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1",
"value1"));
+ batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2",
"value2"));
+
+ // Setup first fetch to return records
+ ShareFetch<String, String> firstFetch = ShareFetch.empty();
+ firstFetch.add(tip, batch);
+ doReturn(firstFetch)
+ .when(fetchCollector)
+ .collect(any(ShareFetchBuffer.class));
+
+ // Setup subscription
+ List<String> topics = Collections.singletonList(topic);
+
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
+ consumer.subscribe(topics);
+
+ // First poll should succeed and return records
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ assertEquals(2, records.count(), "Should have received 2 records");
+
+ // Renew the first record and accept the second
+ Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
+ consumer.acknowledge(iterator.next(), AcknowledgeType.RENEW);
+ consumer.acknowledge(iterator.next(), AcknowledgeType.ACCEPT);
+
+ // Second poll should succeed and return the renewed record again
+ records = consumer.poll(Duration.ofMillis(100));
+ assertEquals(0, records.count(), "Should have received 1 record");
+ assertTrue(firstFetch.hasRenewals());
+
+ Acknowledgements acks = Acknowledgements.empty();
+ acks.add(0, AcknowledgeType.RENEW);
+ acks.complete(null);
+ ShareRenewAcknowledgementsCompleteEvent e = new
ShareRenewAcknowledgementsCompleteEvent(Map.of(tip, acks));
+ backgroundEventQueue.add(e);
+
+ records = consumer.poll(Duration.ofMillis(100));
+ assertEquals(1, records.count(), "Should have received 1 record");
+ assertFalse(firstFetch.hasRenewals());
+ iterator = records.iterator();
+ ConsumerRecord<String, String> renewedRecord = iterator.next();
+ assertEquals(0, renewedRecord.offset());
+ consumer.acknowledge(renewedRecord);
+
+ // Setup next fetch to return no records
+ doReturn(ShareFetch.empty())
+ .when(fetchCollector)
+ .collect(any(ShareFetchBuffer.class));
+
+ // Third poll should return no records
+ records = consumer.poll(Duration.ofMillis(100));
+ assertTrue(records.isEmpty());
+
+ // Setup next fetch to return new records
+ ShareFetch<String, String> thirdFetch = ShareFetch.empty();
+ ShareInFlightBatch<String, String> newBatch = new
ShareInFlightBatch<>(2, tip);
+ newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3",
"value3"));
+ newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4",
"value4"));
+ thirdFetch.add(tip, newBatch);
+
+ // Reset mock to return new records
+ doReturn(thirdFetch)
+ .when(fetchCollector)
+ .collect(any(ShareFetchBuffer.class));
+
+ // Verify that poll succeeds and returns new records
+ ConsumerRecords<String, String> newRecords =
consumer.poll(Duration.ofMillis(100));
+ assertEquals(2, newRecords.count(), "Should have received 2 new
records");
+ }
+
@Test
public void testCloseWithTopicAuthorizationException() {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index 0667ebde700..d3291983d6e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@@ -49,6 +51,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -130,6 +133,71 @@ public class ShareFetchCollectorTest {
assertTrue(completedFetch.isConsumed());
}
+ @Test
+ public void testWithRenew() {
+ int recordCount = DEFAULT_MAX_POLL_RECORDS;
+ buildDependencies();
+ subscribeAndAssign(topicAPartition0);
+
+ ShareCompletedFetch completedFetch = completedFetchBuilder
+ .recordCount(recordCount)
+ .build();
+
+ // Validate that the buffer is empty until after we add the fetch data.
+ assertTrue(fetchBuffer.isEmpty());
+ fetchBuffer.add(List.of(completedFetch));
+ assertFalse(fetchBuffer.isEmpty());
+
+ // Validate that the completed fetch isn't initialized just because we
add it to the buffer.
+ assertFalse(completedFetch.isInitialized());
+
+ // Fetch the data and validate that we get all the records we want
back.
+ ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
+ assertFalse(fetch.isEmpty());
+ assertEquals(recordCount, fetch.numRecords());
+
+ // When we collected the data from the buffer, this will cause the
completed fetch to get initialized.
+ assertTrue(completedFetch.isInitialized());
+
+ // However, even though we've collected the data, it isn't
(completely) consumed yet.
+ assertFalse(completedFetch.isConsumed());
+
+ // The buffer is now considered "empty" because our queue is empty.
+ assertTrue(fetchBuffer.isEmpty());
+ assertNull(fetchBuffer.peek());
+ assertNull(fetchBuffer.poll());
+
+ // However, while the queue is "empty", the next-in-line fetch is
actually still in the buffer.
+ assertNotNull(fetchBuffer.nextInLineFetch());
+
+ assertEquals(500, fetch.numRecords());
+ ConsumerRecord<String, String> record = new
ConsumerRecord<>(topicAPartition0.topic(), topicAPartition0.partition(), 0, "",
"");
+ fetch.acknowledge(record, AcknowledgeType.RENEW);
+ assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords());
+ assertFalse(fetch.hasRenewals());
+
+ Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap =
fetch.takeAcknowledgedRecords();
+ assertTrue(fetch.hasRenewals());
+ assertEquals(DEFAULT_MAX_POLL_RECORDS - 1, fetch.numRecords());
+
+ Acknowledgements acks =
acknowledgementsMap.get(topicAPartition0).acknowledgements();
+ acks.complete(null);
+ fetch.renew(Map.of(topicAPartition0, acks));
+ assertTrue(fetch.hasRenewals());
+ fetch.takeRenewedRecords();
+ assertFalse(fetch.hasRenewals());
+ assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords());
+
+ // Now attempt to collect more records from the fetch buffer.
+ fetch = fetchCollector.collect(fetchBuffer);
+ assertEquals(0, fetch.numRecords());
+ assertTrue(fetch.isEmpty());
+
+ // However, once we read *past* the end of the records in the
ShareCompletedFetch, then we will call
+ // drain on it, and it will be considered all consumed.
+ assertTrue(completedFetch.isConsumed());
+ }
+
@ParameterizedTest
@MethodSource("testErrorInInitializeSource")
public void testErrorInInitialize(RuntimeException expectedException) {
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 095fe2956b6..3150e8e28a6 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -623,8 +623,8 @@ public class SharePartitionManager implements AutoCloseable
{
// Visible for testing.
void processShareFetch(ShareFetch shareFetch) {
- if (shareFetch.topicIdPartitions().isEmpty()) {
- // If there are no partitions to fetch then complete the future
with an empty map.
+ if (shareFetch.topicIdPartitions().isEmpty() ||
shareFetch.maxFetchRecords() == 0 || shareFetch.fetchParams().maxBytes == 0) {
+ // If there are no partitions or no data requested to fetch then
complete the future with an empty map.
shareFetch.maybeComplete(Map.of());
return;
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e742dbe6bd3..2b96099afcb 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -725,7 +725,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val send: Seq[TopicIdPartition] = Seq(
new TopicIdPartition(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID),
new TopicPartition(topic, part)))
val ackMap = new util.HashMap[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]]
- requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100,
0, Int.MaxValue, 500, 500, ShareAcquireMode.BATCH_OPTIMIZED.id(),
+ requests.ShareFetchRequest.Builder.forConsumer(shareGroup, metadata, 100,
0, Int.MaxValue, 500, 500, ShareAcquireMode.BATCH_OPTIMIZED.id(), false,
send.asJava, Seq.empty.asJava, ackMap).build()
}
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index b76a19faa7e..9538e2a425f 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -2473,16 +2473,17 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
maxBytes: Int = Int.MaxValue,
maxRecords: Int = 500,
batchSize: Int = 500,
- shareAcquireMode: ShareAcquireMode =
ShareAcquireMode.BATCH_OPTIMIZED): ShareFetchRequest = {
- ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs,
minBytes, maxBytes, maxRecords, batchSize, shareAcquireMode.id, send, forget,
acknowledgementsMap)
+ shareAcquireMode: ShareAcquireMode =
ShareAcquireMode.BATCH_OPTIMIZED,
+ isRenewAck: Boolean = false):
ShareFetchRequest = {
+ ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs,
minBytes, maxBytes, maxRecords, batchSize, shareAcquireMode.id, isRenewAck,
send, forget, acknowledgementsMap)
.build()
}
private def createShareAcknowledgeRequest(groupId: String,
metadata: ShareRequestMetadata,
- acknowledgementsMap:
util.Map[TopicIdPartition,
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]]
- ): ShareAcknowledgeRequest = {
- ShareAcknowledgeRequest.Builder.forConsumer(groupId, metadata,
acknowledgementsMap)
+ acknowledgementsMap:
util.Map[TopicIdPartition,
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]],
+ isRenewAck: Boolean = false):
ShareAcknowledgeRequest = {
+ ShareAcknowledgeRequest.Builder.forConsumer(groupId, metadata, isRenewAck,
acknowledgementsMap)
.build()
}
}