This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0cbc5e083a9 KAFKA-17217: Batched acknowledge requests per node in 
ShareConsumeRequestManager (#16727)
0cbc5e083a9 is described below

commit 0cbc5e083a936025a85f127102dc1032f6cf4fd9
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Aug 8 20:43:37 2024 +0530

    KAFKA-17217: Batched acknowledge requests per node in 
ShareConsumeRequestManager (#16727)
    
    In ShareConsumeRequestManager, currently every time we perform a 
commitSync/commitAsync/acknowledgeOnClose we create one AcknowledgeRequestState 
for each call. But this can be optimised further as we can batch up the 
acknowledgements to be sent to the same node before the next poll() is invoked.
    
    This will ensure that between 2 polls, the acknowledgements are accumulated 
in one request per node and then sent during poll, resulting in lesser RPC 
calls.
    
    To achieve this, we are storing a pair of acknowledge request states for 
every node, the first value denotes the requestState for commitAsync() and the 
second value denotes the requestState for commitSync() and 
acknowledgeOnClose(). All the acknowledgements to be sent to a particular node 
are stored in the corresponding acknowledgeRequestState based on whether it was 
synchronous or asynchronous.
    
    Reviewers:  Andrew Schofield <[email protected]>,  Manikumar Reddy 
<[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 428 +++++++++++++++------
 .../internals/ShareConsumeRequestManagerTest.java  | 149 +++++++
 2 files changed, 465 insertions(+), 112 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 7427edbb880..d46fff79d30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -50,13 +50,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -84,7 +83,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private Uuid memberId;
     private boolean fetchMoreRecords = false;
     private final Map<TopicIdPartition, Acknowledgements> 
fetchAcknowledgementsMap;
-    private final Queue<AcknowledgeRequestState> acknowledgeRequestStates;
+    private final Map<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStates;
     private final long retryBackoffMs;
     private final long retryBackoffMaxMs;
     private boolean closing = false;
@@ -115,7 +114,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         this.retryBackoffMaxMs = retryBackoffMaxMs;
         this.sessionHandlers = new HashMap<>();
         this.nodesWithPendingRequests = new HashSet<>();
-        this.acknowledgeRequestStates = new LinkedList<>();
+        this.acknowledgeRequestStates = new HashMap<>();
         this.fetchAcknowledgementsMap = new HashMap<>();
         this.closeFuture = new CompletableFuture<>();
     }
@@ -215,37 +214,29 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
      */
     private PollResult processAcknowledgements(long currentTimeMs) {
         List<UnsentRequest> unsentRequests = new ArrayList<>();
-        Iterator<AcknowledgeRequestState> iterator = 
acknowledgeRequestStates.iterator();
-        while (iterator.hasNext()) {
-            AcknowledgeRequestState acknowledgeRequestState = iterator.next();
-            if (acknowledgeRequestState.isProcessed()) {
-                iterator.remove();
-            } else if (!acknowledgeRequestState.maybeExpire()) {
-                if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
-                    log.trace("Skipping acknowledge request because previous 
request to {} has not been processed", acknowledgeRequestState.nodeId);
-                } else {
-                    if (acknowledgeRequestState.canSendRequest(currentTimeMs)) 
{
-                        acknowledgeRequestState.onSendAttempt(currentTimeMs);
-                        UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
-                        if (request != null) {
-                            unsentRequests.add(request);
-                        }
-                    }
-                }
+        AtomicBoolean isAsyncDone = new AtomicBoolean();
+        for (Map.Entry<Integer, Pair<AcknowledgeRequestState>> requestStates : 
acknowledgeRequestStates.entrySet()) {
+            int nodeId = requestStates.getKey();
+
+            if (!isNodeFree(nodeId)) {
+                log.trace("Skipping acknowledge request because previous 
request to {} has not been processed, so acks are not sent", nodeId);
             } else {
-                // Fill in TimeoutException
-                for (TopicIdPartition tip : 
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.REQUEST_TIMED_OUT);
+                isAsyncDone.set(false);
+                // For commitAsync
+                maybeBuildRequest(requestStates.getValue().getAsyncRequest(), 
currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
+                // Check to ensure we start processing commitSync/close only 
if there are no commitAsync requests left to process.
+                if (!isNodeFree(nodeId)) {
+                    log.trace("Skipping acknowledge request because previous 
request to {} has not been processed, so acks are not sent", nodeId);
+                } else if (isAsyncDone.get()) {
+                    
maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, 
false, isAsyncDone).ifPresent(unsentRequests::add);
                 }
-                iterator.remove();
             }
         }
 
         PollResult pollResult = null;
         if (!unsentRequests.isEmpty()) {
             pollResult = new PollResult(unsentRequests);
-        } else if (!acknowledgeRequestStates.isEmpty()) {
+        } else if (checkAndRemoveCompletedAcknowledgements()) {
             // Return empty result until all the acknowledgement request 
states are processed
             pollResult = PollResult.EMPTY;
         } else if (closing) {
@@ -259,6 +250,69 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         return pollResult;
     }
 
+    private boolean isNodeFree(int nodeId) {
+        return !nodesWithPendingRequests.contains(nodeId);
+    }
+
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+                                                      long currentTimeMs,
+                                                      boolean onCommitAsync,
+                                                      AtomicBoolean 
isAsyncDone) {
+        if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.onClose && acknowledgeRequestState.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone.set(true);
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                if (onCommitAsync) {
+                    isAsyncDone.set(true);
+                }
+                return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+            } else {
+                // We wait for the backoff before we can send this request.
+                if (onCommitAsync) {
+                    isAsyncDone.set(false);
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            if (onCommitAsync) {
+                isAsyncDone.set(true);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Prunes the empty acknowledgementRequestStates.
+     * Returns true if there are still some acknowledgements left to be 
processed.
+     */
+    private boolean checkAndRemoveCompletedAcknowledgements() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())
 || 
isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest()))
 {
+                areAnyAcksLeft = true;
+            } else if (!closing) {
+                iterator.remove();
+            }
+        }
+        if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true;
+        return areAnyAcksLeft;
+    }
+
+    private boolean isRequestStateInProgress(AcknowledgeRequestState 
acknowledgeRequestState) {
+        return acknowledgeRequestState != null && 
!(acknowledgeRequestState.isEmpty());
+    }
+
     /**
      * Enqueue an AcknowledgeRequestState to be picked up on the next poll
      *
@@ -280,29 +334,42 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
             Node node = cluster.nodeById(nodeId);
             if (node != null) {
-                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
-                for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-                    Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
-                    if (acknowledgements != null) {
-                        acknowledgementsMapForNode.put(tip, acknowledgements);
-
-                        
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
-                        resultCount.incrementAndGet();
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
+                // Ensure there is no commitSync()/close() request already 
present as they are blocking calls
+                // and only one request can be active at a time.
+                if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != 
null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
+                    log.error("Attempt to call commitSync() when there is an 
existing sync request for node {}", node.id());
+                    future.completeExceptionally(
+                            new IllegalStateException("Attempt to call 
commitSync() when there is an existing sync request for node : " + node.id()));
+                } else {
+                    Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
+                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                        Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
+                        if (acknowledgements != null) {
+                            acknowledgementsMapForNode.put(tip, 
acknowledgements);
+
+                            
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                            log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                            resultCount.incrementAndGet();
+                        }
                     }
+
+
+                    // There can only be one commitSync()/close() happening at 
a time. So per node, there will be one acknowledge request state representing 
commitSync() and close().
+                    acknowledgeRequestStates.get(nodeId).setSyncRequest(new 
AcknowledgeRequestState(logContext,
+                            ShareConsumeRequestManager.class.getSimpleName() + 
":1",
+                            deadlineMs,
+                            retryBackoffMs,
+                            retryBackoffMaxMs,
+                            sessionHandler,
+                            nodeId,
+                            acknowledgementsMapForNode,
+                            this::handleShareAcknowledgeSuccess,
+                            this::handleShareAcknowledgeFailure,
+                            resultHandler
+                    ));
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":1",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeSuccess,
-                        this::handleShareAcknowledgeFailure,
-                        resultHandler
-                ));
             }
         });
 
@@ -324,6 +391,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             Node node = cluster.nodeById(nodeId);
             if (node != null) {
                 Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
+
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
                 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
                     Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
                     if (acknowledgements != null) {
@@ -332,22 +402,32 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
                         log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
                         resultCount.incrementAndGet();
+                        AcknowledgeRequestState asyncRequestState = 
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+                        if (asyncRequestState == null) {
+                            
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new 
AcknowledgeRequestState(logContext,
+                                    
ShareConsumeRequestManager.class.getSimpleName() + ":2",
+                                    Long.MAX_VALUE,
+                                    retryBackoffMs,
+                                    retryBackoffMaxMs,
+                                    sessionHandler,
+                                    nodeId,
+                                    acknowledgementsMapForNode,
+                                    this::handleShareAcknowledgeSuccess,
+                                    this::handleShareAcknowledgeFailure,
+                                    resultHandler
+                            ));
+                        } else {
+                            Acknowledgements prevAcks = 
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
+                            if (prevAcks != null) {
+                                
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
+                            }
+                        }
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":2",
-                        Long.MAX_VALUE,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeSuccess,
-                        this::handleShareAcknowledgeFailure,
-                        resultHandler
-                ));
             }
         });
+
+        resultHandler.completeIfEmpty();
     }
 
     /**
@@ -382,22 +462,36 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":3",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeCloseSuccess,
-                        this::handleShareAcknowledgeCloseFailure,
-                        resultHandler,
-                        true
-                ));
+
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
+                // Ensure there is no commitSync()/close() request already 
present as they are blocking calls
+                // and only one request can be active at a time.
+                if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != 
null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
+                    log.error("Attempt to call close() when there is an 
existing sync request for node {}-{}", node.id(), 
acknowledgeRequestStates.get(nodeId).getSyncRequest());
+                    closeFuture.completeExceptionally(
+                            new IllegalStateException("Attempt to call close() 
when there is an existing sync request for node : " + node.id()));
+                } else {
+                    // There can only be one commitSync()/close() happening at 
a time. So per node, there will be one acknowledge request state.
+                    acknowledgeRequestStates.get(nodeId).setSyncRequest(new 
AcknowledgeRequestState(logContext,
+                            ShareConsumeRequestManager.class.getSimpleName() + 
":3",
+                            deadlineMs,
+                            retryBackoffMs,
+                            retryBackoffMaxMs,
+                            sessionHandler,
+                            nodeId,
+                            acknowledgementsMapForNode,
+                            this::handleShareAcknowledgeCloseSuccess,
+                            this::handleShareAcknowledgeCloseFailure,
+                            resultHandler,
+                            true
+                    ));
+
+                }
             }
         });
 
+        resultHandler.completeIfEmpty();
         return closeFuture;
     }
 
@@ -518,33 +612,48 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
             if (!handler.handleResponse(response, requestVersion)) {
                 acknowledgeRequestState.onFailedAttempt(currentTimeMs);
-                if (response.error().exception() instanceof RetriableException 
&& !closing) {
+                if (response.error().exception() instanceof RetriableException 
&& !acknowledgeRequestState.onClose) {
                     // We retry the request until the timer expires, unless we 
are closing.
+                    acknowledgeRequestState.retryRequest();
                 } else {
-                    requestData.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
-                        TopicIdPartition tip = new 
TopicIdPartition(topic.topicId(),
-                                partition.partitionIndex(),
-                                metadata.topicNames().get(topic.topicId()));
-                        
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                    
response.data().responses().forEach(shareAcknowledgeTopicResponse -> 
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
+                        TopicIdPartition tip = new 
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
+                                partitionData.partitionIndex(),
+                                
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
+
                         
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
+                        metricsManager.recordLatency(resp.requestLatencyMs());
                     }));
-
-                    acknowledgeRequestState.processingComplete();
                 }
             } else {
-                response.data().responses().forEach(topic -> 
topic.partitions().forEach(partition -> {
-                    TopicIdPartition tip = new 
TopicIdPartition(topic.topicId(),
-                            partition.partitionIndex(),
-                            metadata.topicNames().get(topic.topicId()));
-                    if (partition.errorCode() != Errors.NONE.code()) {
-                        
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                AtomicBoolean shouldRetry = new AtomicBoolean(false);
+                // Check all partition level error codes
+                
response.data().responses().forEach(shareAcknowledgeTopicResponse -> 
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
+                    Errors partitionError = 
Errors.forCode(partitionData.errorCode());
+                    TopicIdPartition tip = new 
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
+                            partitionData.partitionIndex(),
+                            
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
+                    if (partitionError.exception() != null) {
+                        if (partitionError.exception() instanceof 
RetriableException && !acknowledgeRequestState.onClose) {
+                            // Move to incomplete acknowledgements to retry
+                            acknowledgeRequestState.moveToIncompleteAcks(tip);
+                            shouldRetry.set(true);
+                        } else {
+                            
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
+                            
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
+                        }
+                    } else {
+                        
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
                     }
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partition.errorCode()));
                 }));
 
+                if (shouldRetry.get()) {
+                    acknowledgeRequestState.onFailedAttempt(currentTimeMs);
+                } else {
+                    acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs);
+                }
                 acknowledgeRequestState.processingComplete();
             }
-
             metricsManager.recordLatency(resp.requestLatencyMs());
         } finally {
             log.debug("Removing pending request for node {} - success", 
fetchTarget.id());
@@ -560,12 +669,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         try {
             log.debug("Completed ShareAcknowledge request from node {} 
unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
             acknowledgeRequestState.sessionHandler().handleError(error);
+            acknowledgeRequestState.onFailedAttempt(currentTimeMs);
 
             requestData.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
                 TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
                         partition.partitionIndex(),
                         metadata.topicNames().get(topic.topicId()));
-                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                 acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error));
             }));
         } finally {
@@ -588,11 +698,12 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         partition.partitionIndex(),
                         metadata.topicNames().get(topic.topicId()));
                 if (partition.errorCode() != Errors.NONE.code()) {
-                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                 }
                 acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partition.errorCode()));
             }));
 
+            acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs);
             metricsManager.recordLatency(resp.requestLatencyMs());
             acknowledgeRequestState.processingComplete();
         } finally {
@@ -610,12 +721,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         try {
             log.debug("Completed ShareAcknowledge on close request from node 
{} unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
             acknowledgeRequestState.sessionHandler().handleError(error);
+            acknowledgeRequestState.onFailedAttempt(currentTimeMs);
 
             requestData.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
                 TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
                         partition.partitionIndex(),
                         metadata.topicNames().get(topic.topicId()));
-                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                 acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error));
             }));
         } finally {
@@ -653,7 +765,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     /**
      * Represents a request to acknowledge delivery that can be retried or 
aborted.
      */
-    class AcknowledgeRequestState extends TimedRequestState {
+    public class AcknowledgeRequestState extends TimedRequestState {
 
         /**
          * The share session handler.
@@ -668,7 +780,17 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         /**
          * The map of acknowledgements to send
          */
-        private final Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap;
+        private final Map<TopicIdPartition, Acknowledgements> 
acknowledgementsToSend;
+
+        /**
+         * The map of acknowledgements to be retried in the next attempt.
+         */
+        private final Map<TopicIdPartition, Acknowledgements> 
incompleteAcknowledgements;
+
+        /**
+         * The in-flight acknowledgements
+         */
+        private final Map<TopicIdPartition, Acknowledgements> 
inFlightAcknowledgements;
 
         /**
          * The handler to call on a successful response from ShareAcknowledge.
@@ -680,11 +802,6 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          */
         private final ResponseHandler<Throwable> errorHandler;
 
-        /**
-         * Whether the request has been processed and will not be retried.
-         */
-        private boolean isProcessed = false;
-
         /**
          * This handles completing a future when all results are known.
          */
@@ -725,11 +842,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, 
deadlineTimer(time, deadlineMs));
             this.sessionHandler = sessionHandler;
             this.nodeId = nodeId;
-            this.acknowledgementsMap = acknowledgementsMap;
             this.successHandler = successHandler;
             this.errorHandler = errorHandler;
+            this.acknowledgementsToSend = acknowledgementsMap;
             this.resultHandler = resultHandler;
             this.onClose = onClose;
+            this.inFlightAcknowledgements = new HashMap<>();
+            this.incompleteAcknowledgements = new HashMap<>();
         }
 
         UnsentRequest buildRequest(long currentTimeMs) {
@@ -738,7 +857,10 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 sessionHandler.notifyClose();
             }
 
-            for (Map.Entry<TopicIdPartition, Acknowledgements> entry : 
acknowledgementsMap.entrySet()) {
+            Map<TopicIdPartition, Acknowledgements> 
finalAcknowledgementsToSend = new HashMap<>(
+                    incompleteAcknowledgements.isEmpty() ? 
acknowledgementsToSend : incompleteAcknowledgements);
+
+            for (Map.Entry<TopicIdPartition, Acknowledgements> entry : 
finalAcknowledgementsToSend.entrySet()) {
                 sessionHandler.addPartitionToFetch(entry.getKey(), 
entry.getValue());
             }
 
@@ -749,26 +871,50 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
             BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
                 if (error != null) {
-                    onFailedAttempt(currentTimeMs);
                     errorHandler.handle(nodeToSend, requestBuilder.data(), 
this, error, currentTimeMs);
                     processingComplete();
                 } else {
                     successHandler.handle(nodeToSend, requestBuilder.data(), 
this, clientResponse, currentTimeMs);
+                    if (onClose && !closeFuture.isDone()) {
+                        closeFuture.complete(null);
+                    }
                 }
             };
 
             if (requestBuilder == null) {
-                log.trace("Building ShareAcknowledge request to send to node 
{} failed", nodeToSend.id());
                 handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
                 return null;
             } else {
-                log.trace("Building ShareAcknowledge request to send to node 
{}", nodeToSend.id());
+                inFlightAcknowledgements.putAll(finalAcknowledgementsToSend);
+                if (incompleteAcknowledgements.isEmpty()) {
+                    acknowledgementsToSend.clear();
+                } else {
+                    incompleteAcknowledgements.clear();
+                }
                 return new UnsentRequest(requestBuilder, 
Optional.of(nodeToSend)).whenComplete(responseHandler);
             }
         }
 
-        int getAcknowledgementsCount(TopicIdPartition tip) {
-            Acknowledgements acks = acknowledgementsMap.get(tip);
+        int getInFlightAcknowledgementsCount(TopicIdPartition tip) {
+            Acknowledgements acks = inFlightAcknowledgements.get(tip);
+            if (acks == null) {
+                return 0;
+            } else {
+                return acks.size();
+            }
+        }
+
+        int getIncompleteAcknowledgementsCount(TopicIdPartition tip) {
+            Acknowledgements acks = incompleteAcknowledgements.get(tip);
+            if (acks == null) {
+                return 0;
+            } else {
+                return acks.size();
+            }
+        }
+
+        int getAcknowledgementsToSendCount(TopicIdPartition tip) {
+            Acknowledgements acks = acknowledgementsToSend.get(tip);
             if (acks == null) {
                 return 0;
             } else {
@@ -776,31 +922,48 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             }
         }
 
+        boolean isEmpty() {
+            return acknowledgementsToSend.isEmpty() &&
+                    incompleteAcknowledgements.isEmpty() &&
+                    inFlightAcknowledgements.isEmpty();
+        }
+
         /**
          * Sets the error code in the acknowledgements and sends the response
          * through a background event.
          */
         void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode) {
-            Acknowledgements acks = acknowledgementsMap.remove(tip);
+            Acknowledgements acks = inFlightAcknowledgements.get(tip);
             if (acks != null) {
                 acks.setAcknowledgeErrorCode(acknowledgeErrorCode);
             }
             resultHandler.complete(tip, acks);
         }
 
+        /**
+         * Sets the error code for the acknowledgements which were timed out
+         * after some retries.
+         */
+        void handleAcknowledgeTimedOut(TopicIdPartition tip) {
+            Acknowledgements acks = incompleteAcknowledgements.get(tip);
+            if (acks != null) {
+                acks.setAcknowledgeErrorCode(Errors.REQUEST_TIMED_OUT);
+            }
+            resultHandler.complete(tip, acks);
+        }
+
         /**
          * Set the error code for all remaining acknowledgements in the event
          * of a session error which prevents the remains acknowledgements from
          * being sent.
          */
         void handleSessionErrorCode(Errors errorCode) {
-            acknowledgementsMap.forEach((tip, acks) -> {
+            inFlightAcknowledgements.forEach((tip, acks) -> {
                 if (acks != null) {
                     acks.setAcknowledgeErrorCode(errorCode);
                 }
                 resultHandler.complete(tip, acks);
             });
-            acknowledgementsMap.clear();
             processingComplete();
         }
 
@@ -809,17 +972,28 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         }
 
         void processingComplete() {
-            isProcessed = true;
+            inFlightAcknowledgements.clear();
             resultHandler.completeIfEmpty();
         }
 
-        boolean isProcessed() {
-            return isProcessed;
+        void retryRequest() {
+            incompleteAcknowledgements.putAll(inFlightAcknowledgements);
+            inFlightAcknowledgements.clear();
         }
 
         boolean maybeExpire() {
             return numAttempts > 0 && isExpired();
         }
+
+        public void moveToIncompleteAcks(TopicIdPartition tip) {
+            Acknowledgements acks = inFlightAcknowledgements.remove(tip);
+            if (acks != null) {
+                Acknowledgements existingAcks = 
incompleteAcknowledgements.putIfAbsent(tip, acks);
+                if (existingAcks != null) {
+                    incompleteAcknowledgements.get(tip).merge(acks);
+                }
+            }
+        }
     }
 
     /**
@@ -876,4 +1050,34 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             }
         }
     }
+
+    static class Pair<V> {
+        private V asyncRequest;
+        private V syncRequest;
+
+        public Pair(V asyncRequest, V syncRequest) {
+            this.asyncRequest = asyncRequest;
+            this.syncRequest = syncRequest;
+        }
+
+        public void setAsyncRequest(V asyncRequest) {
+            this.asyncRequest = asyncRequest;
+        }
+
+        public void setSyncRequest(V second) {
+            this.syncRequest = second;
+        }
+
+        public V getAsyncRequest() {
+            return asyncRequest;
+        }
+
+        public V getSyncRequest() {
+            return syncRequest;
+        }
+    }
+
+    Pair<AcknowledgeRequestState> requestStates(int nodeId) {
+        return acknowledgeRequestStates.get(nodeId);
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 9f5253e4c67..57f6c168705 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -417,6 +418,150 @@ public class ShareConsumeRequestManagerTest {
         completedAcknowledgements.clear();
     }
 
+    @Test
+    public void testBatchingAcknowledgeRequestStates() {
+        buildRequestManager();
+
+        assignFromSubscribed(Collections.singleton(tp0));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
+                ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
acknowledgements));
+
+        Acknowledgements acknowledgements2 = Acknowledgements.empty();
+        acknowledgements.add(4L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(5L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(6L, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
acknowledgements2));
+
+        assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
+        assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
+
+        client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
+    }
+
+    @Test
+    public void testPendingCommitAsyncBeforeCommitSync() {
+        buildRequestManager();
+
+        assignFromSubscribed(Collections.singleton(tp0));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
+                ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
acknowledgements));
+
+        Acknowledgements acknowledgements2 = Acknowledgements.empty();
+        acknowledgements2.add(4L, AcknowledgeType.ACCEPT);
+        acknowledgements2.add(5L, AcknowledgeType.ACCEPT);
+        acknowledgements2.add(6L, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, 
acknowledgements2), 60000L);
+
+        assertEquals(3, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
+        assertEquals(3, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        assertEquals(3, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
+
+        client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0));
+        assertEquals(3, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        assertEquals(3, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
+
+        client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
+    }
+
+    @Test
+    public void testRetryAcknowledgements() throws InterruptedException {
+        buildRequestManager();
+
+        assignFromSubscribed(Collections.singleton(tp0));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, buildRecords(1L, 6, 1),
+                ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+        acknowledgements.add(4L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(5L, AcknowledgeType.RELEASE);
+        acknowledgements.add(6L, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, 
acknowledgements), 60000L);
+        
assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest());
+
+        assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
+
+        client.prepareResponse(fullAcknowledgeResponse(tip0, 
Errors.REQUEST_TIMED_OUT));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0));
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
+
+        TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, 
shareConsumeRequestManager.sendAcknowledgements()));
+
+        assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
+
+        client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0));
+        assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0));
+    }
+
     @Test
     public void testMultipleTopicsFetch() {
         buildRequestManager();
@@ -967,6 +1112,10 @@ public class ShareConsumeRequestManagerTest {
             networkClientDelegate.addAll(pollResult.unsentRequests);
             return pollResult.unsentRequests.size();
         }
+
+        public Pair<AcknowledgeRequestState> requestStates(int nodeId) {
+            return super.requestStates(nodeId);
+        }
     }
 
     private class TestableNetworkClientDelegate extends NetworkClientDelegate {


Reply via email to