This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2880e041294 KAFKA-18779: Validate responses from broker in client for 
ShareFetch and ShareAcknowledge RPCs. (#18939)
2880e041294 is described below

commit 2880e041294639c41525767e644e5ba2564ed6f6
Author: Shivsundar R <[email protected]>
AuthorDate: Mon Feb 24 05:27:24 2025 -0500

    KAFKA-18779: Validate responses from broker in client for ShareFetch and 
ShareAcknowledge RPCs. (#18939)
    
    - Currently if we received extraneous topic partitions in the response
    or if the response was missing some partitions requested, we were
    processing the response as it came and even populated the callback with
    these partitions.
    
    - These invalid responses should be parsed at the
    `ShareConsumeRequestManager`.
    
    - If the response missed any acknowledgements for partitions that were
    requested, then we fail the request with `InvalidRecordStateException`
    and populate the callbacks.
    
    - For any extraneous partitions in the response, we log an error and
    ignore them.
    
    Some refactors are also done in this PR in ShareConsumeRequestManager to
    make the code more readable.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 252 +++++++++++++--------
 .../internals/ShareConsumeRequestManagerTest.java  | 100 +++++++-
 2 files changed, 254 insertions(+), 98 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 9e8add41a1a..375a99fdf51 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -28,9 +28,11 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
+import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
 import org.apache.kafka.common.message.ShareFetchRequestData;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -96,6 +98,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private final CompletableFuture<Void> closeFuture;
     private boolean isAcknowledgementCommitCallbackRegistered = false;
     private final Map<IdAndPartition, String> topicNamesMap = new HashMap<>();
+    private static final String INVALID_RESPONSE = "Acknowledgement not 
successful due to invalid response from broker";
 
     ShareConsumeRequestManager(final Time time,
                                final LogContext logContext,
@@ -366,7 +369,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                                       AtomicBoolean 
isAsyncSent) {
         boolean asyncSent = true;
         try {
-            if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
+            if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.isCloseRequest() && 
acknowledgeRequestState.isEmpty())) {
                 return Optional.empty();
             }
 
@@ -441,7 +444,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private boolean isRequestStateInProgress(AcknowledgeRequestState 
acknowledgeRequestState) {
         if (acknowledgeRequestState == null) {
             return false;
-        } else if (acknowledgeRequestState.onClose()) {
+        } else if (acknowledgeRequestState.isCloseRequest()) {
             return !acknowledgeRequestState.isProcessed;
         } else {
             return !(acknowledgeRequestState.isEmpty());
@@ -716,11 +719,12 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
 
             response.data().responses().forEach(topicResponse ->
-                    topicResponse.partitions().forEach(partition ->
-                            responseData.put(new 
TopicIdPartition(topicResponse.topicId(),
-                                    partition.partitionIndex(),
-                                    
metadata.topicNames().getOrDefault(topicResponse.topicId(),
-                                            topicNamesMap.remove(new 
IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))), 
partition))
+                    topicResponse.partitions().forEach(partition -> {
+                        TopicIdPartition tip = 
lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
+                        if (tip != null) {
+                            responseData.put(tip, partition);
+                        }
+                    })
             );
 
             final Set<TopicPartition> partitions = 
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
@@ -749,7 +753,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
                 Errors partitionError = 
Errors.forCode(partitionData.errorCode());
                 if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH) {
-                    log.debug("For {}, received error {}, with 
leaderIdAndEpoch {}", tip, partitionError, partitionData.currentLeader());
+                    log.debug("For {}, received error {}, with 
leaderIdAndEpoch {} in ShareFetch", tip, partitionError, 
partitionData.currentLeader());
                     if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
                         
partitionsWithUpdatedLeaderInfo.put(tip.topicPartition(), new 
Metadata.LeaderIdAndEpoch(
                             
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch())));
@@ -771,6 +775,14 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 }
             }
 
+            // Handle any acknowledgements which were not received in the 
response for this node.
+            if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
+                
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition, 
acknowledgements) -> {
+                    acknowledgements.complete(new 
InvalidRecordStateException(INVALID_RESPONSE));
+                    
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, 
acknowledgements));
+                });
+            }
+
             if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
                 List<Node> leaderNodes = 
response.data().nodeEndpoints().stream()
                     .map(e -> new Node(e.nodeId(), e.host(), e.port(), 
e.rack()))
@@ -797,13 +809,15 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             }
 
             requestData.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
-                TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
-                        partition.partitionIndex(),
-                        metadata.topicNames().get(topic.topicId()));
+                TopicIdPartition tip = lookupTopicId(topic.topicId(), 
partition.partitionIndex());
+                if (tip == null) {
+                    return;
+                }
 
                 Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsInFlight = 
fetchAcknowledgementsInFlight.get(fetchTarget.id());
                 if (nodeAcknowledgementsInFlight != null) {
                     Acknowledgements acks = 
nodeAcknowledgementsInFlight.remove(tip);
+
                     if (acks != null) {
                         
metricsManager.recordFailedAcknowledgements(acks.size());
                         if (error instanceof KafkaException) {
@@ -833,20 +847,21 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
             Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 
-            if (acknowledgeRequestState.onClose()) {
-                response.data().responses().forEach(topic -> 
topic.partitions().forEach(partition -> {
-                    TopicIdPartition tip = new 
TopicIdPartition(topic.topicId(),
-                            partition.partitionIndex(),
-                            metadata.topicNames().get(topic.topicId()));
-                    if (partition.errorCode() != Errors.NONE.code()) {
+            if (acknowledgeRequestState.isCloseRequest()) {
+                response.data().responses().forEach(topicResponse -> 
topicResponse.partitions().forEach(partitionData -> {
+                    TopicIdPartition tip = 
lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
+                    if (tip == null) {
+                        return;
+                    }
+
+                    if (partitionData.errorCode() != Errors.NONE.code()) {
                         
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                     }
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partition.errorCode()));
+                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forCode(partitionData.errorCode()));
                 }));
 
                 
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
                 acknowledgeRequestState.processingComplete();
-
             } else {
                 if 
(!acknowledgeRequestState.sessionHandler.handleResponse(response, 
resp.requestHeader().apiVersion())) {
                     // Received a response-level error code.
@@ -856,59 +871,23 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         // We retry the request until the timer expires, 
unless we are closing.
                         acknowledgeRequestState.moveAllToIncompleteAcks();
                     } else {
-                        
response.data().responses().forEach(shareAcknowledgeTopicResponse -> 
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
-                            TopicIdPartition tip = new 
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
-                                    partitionData.partitionIndex(),
-                                    
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
-
-                            
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
-                        }));
+                        
acknowledgeRequestState.processPendingInFlightAcknowledgements(response.error().exception());
                         acknowledgeRequestState.processingComplete();
                     }
                 } else {
                     AtomicBoolean shouldRetry = new AtomicBoolean(false);
                     // Check all partition level error codes
-                    
response.data().responses().forEach(shareAcknowledgeTopicResponse -> 
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
+                    response.data().responses().forEach(topicResponse -> 
topicResponse.partitions().forEach(partitionData -> {
                         Errors partitionError = 
Errors.forCode(partitionData.errorCode());
-                        TopicIdPartition tip = new 
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
-                                partitionData.partitionIndex(),
-                                
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
-                        if (partitionError.exception() != null) {
-                            boolean retry = false;
-
-                            if (partitionError == 
Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH) {
-                                // If the leader has changed, there's no point 
in retrying the operation because the acquisition locks
-                                // will have been released.
-                                TopicPartition tp = new 
TopicPartition(metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()),
 partitionData.partitionIndex());
-
-                                log.debug("For {}, received error {}, with 
leaderIdAndEpoch {}", tp, partitionError, partitionData.currentLeader());
-                                if (partitionData.currentLeader().leaderId() 
!= -1 && partitionData.currentLeader().leaderEpoch() != -1) {
-                                    partitionsWithUpdatedLeaderInfo.put(tp, 
new Metadata.LeaderIdAndEpoch(
-                                        
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch())));
-                                }
-                            } else if (partitionError.exception() instanceof 
RetriableException) {
-                                retry = true;
-                            }
-
-                            if (retry) {
-                                // Move to incomplete acknowledgements to retry
-                                
acknowledgeRequestState.moveToIncompleteAcks(tip);
-                                shouldRetry.set(true);
-                            } else {
-                                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
-                                
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
-                            }
-                        } else {
-                            
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
+                        TopicIdPartition tip = 
lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
+                        if (tip == null) {
+                            return;
                         }
+
+                        handlePartitionError(partitionData, 
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip, 
shouldRetry);
                     }));
 
-                    if (shouldRetry.get()) {
-                        
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
-                    } else {
-                        
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
-                        acknowledgeRequestState.processingComplete();
-                    }
+                    processRetryLogic(acknowledgeRequestState, shouldRetry, 
responseCompletionTimeMs);
                 }
             }
 
@@ -927,7 +906,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             log.debug("Removing pending request for node {} - success", 
fetchTarget.id());
             nodesWithPendingRequests.remove(fetchTarget.id());
 
-            if (acknowledgeRequestState.onClose()) {
+            if (acknowledgeRequestState.isCloseRequest()) {
                 log.debug("Removing node from ShareSession {}", 
fetchTarget.id());
                 sessionHandlers.remove(fetchTarget.id());
             }
@@ -945,9 +924,11 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
 
             requestData.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
-                TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
-                        partition.partitionIndex(),
-                        metadata.topicNames().get(topic.topicId()));
+                TopicIdPartition tip = lookupTopicId(topic.topicId(), 
partition.partitionIndex());
+                if (tip == null) {
+                    return;
+                }
+
                 
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                 acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error));
             }));
@@ -957,13 +938,81 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             log.debug("Removing pending request for node {} - failed", 
fetchTarget.id());
             nodesWithPendingRequests.remove(fetchTarget.id());
 
-            if (acknowledgeRequestState.onClose()) {
+            if (acknowledgeRequestState.isCloseRequest()) {
                 log.debug("Removing node from ShareSession {}", 
fetchTarget.id());
                 sessionHandlers.remove(fetchTarget.id());
             }
         }
     }
 
+    private void 
handlePartitionError(ShareAcknowledgeResponseData.PartitionData partitionData,
+                                      Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+                                      AcknowledgeRequestState 
acknowledgeRequestState,
+                                      Errors partitionError,
+                                      TopicIdPartition tip,
+                                      AtomicBoolean shouldRetry) {
+        if (partitionError.exception() != null) {
+            boolean retry = false;
+            if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH) {
+                // If the leader has changed, there's no point in retrying the 
operation because the acquisition locks
+                // will have been released.
+                updateLeaderInfoMap(partitionData, 
partitionsWithUpdatedLeaderInfo, partitionError, tip.topicPartition());
+            } else if (partitionError.exception() instanceof 
RetriableException) {
+                retry = true;
+            }
+
+            if (retry) {
+                if (acknowledgeRequestState.moveToIncompleteAcks(tip)) {
+                    shouldRetry.set(true);
+                }
+            } else {
+                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError);
+            }
+        } else {
+            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError);
+        }
+    }
+
+    private void processRetryLogic(AcknowledgeRequestState 
acknowledgeRequestState,
+                                   AtomicBoolean shouldRetry,
+                                   long responseCompletionTimeMs) {
+        if (shouldRetry.get()) {
+            acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
+
+            // Check for any acknowledgements that did not receive a response.
+            // These acknowledgements are failed with 
InvalidRecordStateException.
+            acknowledgeRequestState.processPendingInFlightAcknowledgements(new 
InvalidRecordStateException(INVALID_RESPONSE));
+        } else {
+            
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
+            acknowledgeRequestState.processingComplete();
+        }
+    }
+
+    private void 
updateLeaderInfoMap(ShareAcknowledgeResponseData.PartitionData partitionData,
+                                  Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+                                  Errors partitionError,
+                                  TopicPartition tp) {
+
+        log.debug("For {}, received error {}, with leaderIdAndEpoch {} in 
ShareAcknowledge", tp, partitionError, partitionData.currentLeader());
+        if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
+            partitionsWithUpdatedLeaderInfo.put(tp, new 
Metadata.LeaderIdAndEpoch(
+                    Optional.of(partitionData.currentLeader().leaderId()),
+                    Optional.of(partitionData.currentLeader().leaderEpoch())
+            ));
+        }
+    }
+
+    private TopicIdPartition lookupTopicId(Uuid topicId, int partitionIndex) {
+        String topicName = metadata.topicNames().getOrDefault(topicId,
+                topicNamesMap.remove(new IdAndPartition(topicId, 
partitionIndex)));
+        if (topicName == null) {
+            log.error("Topic name not found in metadata for topicId {} and 
partitionIndex {}", topicId, partitionIndex);
+            return null;
+        }
+        return new TopicIdPartition(topicId, partitionIndex, topicName);
+    }
+
     private List<TopicPartition> partitionsToFetch() {
         return subscriptions.fetchablePartitions(tp -> true);
     }
@@ -1061,7 +1110,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
         UnsentRequest buildRequest() {
             // If this is the closing request, close the share session by 
setting the final epoch
-            if (onClose()) {
+            if (isCloseRequest()) {
                 sessionHandler.notifyClose();
             }
 
@@ -1144,11 +1193,13 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * through a background event.
          */
         void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode) {
-            Acknowledgements acks = inFlightAcknowledgements.get(tip);
+            Acknowledgements acks = inFlightAcknowledgements.remove(tip);
             if (acks != null) {
                 acks.complete(acknowledgeErrorCode.exception());
+                resultHandler.complete(tip, acks, requestType);
+            } else {
+                log.error("Invalid partition {} received in ShareAcknowledge 
response", tip);
             }
-            resultHandler.complete(tip, acks, onCommitAsync());
         }
 
         /**
@@ -1159,8 +1210,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             Acknowledgements acks = incompleteAcknowledgements.get(tip);
             if (acks != null) {
                 acks.complete(Errors.REQUEST_TIMED_OUT.exception());
+                resultHandler.complete(tip, acks, requestType);
             }
-            resultHandler.complete(tip, acks, onCommitAsync());
         }
 
         /**
@@ -1176,7 +1227,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 if (acks != null) {
                     acks.complete(errorCode.exception());
                 }
-                resultHandler.complete(tip, acks, onCommitAsync());
+                resultHandler.complete(tip, acks, requestType);
             });
             acknowledgementsMapToClear.clear();
             processingComplete();
@@ -1187,11 +1238,26 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         }
 
         void processingComplete() {
-            inFlightAcknowledgements.clear();
+            // If there are any pending inFlightAcknowledgements after 
processing the response, we fail them with an InvalidRecordStateException.
+            processPendingInFlightAcknowledgements(new 
InvalidRecordStateException(INVALID_RESPONSE));
             resultHandler.completeIfEmpty();
             isProcessed = true;
         }
 
+        /**
+         * Fail any existing in-flight acknowledgements with the given 
exception and clear the map.
+         * We also send a background event to update {@link 
org.apache.kafka.clients.consumer.AcknowledgementCommitCallback }
+         */
+        private void processPendingInFlightAcknowledgements(KafkaException 
exception) {
+            if (!inFlightAcknowledgements.isEmpty()) {
+                inFlightAcknowledgements.forEach((partition, acknowledgements) 
-> {
+                    acknowledgements.complete(exception);
+                    resultHandler.complete(partition, acknowledgements, 
requestType);
+                });
+                inFlightAcknowledgements.clear();
+            }
+        }
+
         /**
          * Moves all the in-flight acknowledgements to incomplete 
acknowledgements to retry
          * in the next request.
@@ -1208,21 +1274,25 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         /**
          * Moves the in-flight acknowledgements for a given partition to 
incomplete acknowledgements to retry
          * in the next request.
+         *
+         * @param tip The TopicIdPartition for which we move the 
acknowledgements.
+         * @return True if the partition was sent in the request.
+         * <p> False if the partition was not part of the request, we log an 
error and ignore such partitions. </p>
          */
-        public void moveToIncompleteAcks(TopicIdPartition tip) {
+        public boolean moveToIncompleteAcks(TopicIdPartition tip) {
             Acknowledgements acks = inFlightAcknowledgements.remove(tip);
             if (acks != null) {
                 incompleteAcknowledgements.put(tip, acks);
+                return true;
+            } else {
+                log.error("Invalid partition {} received in ShareAcknowledge 
response", tip);
+                return false;
             }
         }
 
-        public boolean onClose() {
+        public boolean isCloseRequest() {
             return requestType == AcknowledgeRequestType.CLOSE;
         }
-
-        public boolean onCommitAsync() {
-            return requestType == AcknowledgeRequestType.COMMIT_ASYNC;
-        }
     }
 
     /**
@@ -1251,21 +1321,19 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Handle the result of a ShareAcknowledge request sent to one or more 
nodes and
          * signal the completion when all results are known.
          */
-        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, boolean isCommitAsync) {
-            if (!isCommitAsync && acknowledgements != null) {
-                result.put(partition, acknowledgements);
-            }
-
-            // For commitAsync, we do not wait for other results to complete, 
we prepare a background event
-            // for every ShareAcknowledgeResponse.
-            // For commitAsync, we send out a background event for every 
TopicIdPartition, so we use a singletonMap each time.
-            if (isCommitAsync) {
+        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type) {
+            if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
                 if (acknowledgements != null) {
                     
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition, 
acknowledgements));
                 }
-            } else if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
-                maybeSendShareAcknowledgeCommitCallbackEvent(result);
-                future.ifPresent(future -> future.complete(result));
+            } else {
+                if (acknowledgements != null) {
+                    result.put(partition, acknowledgements);
+                }
+                if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
+                    maybeSendShareAcknowledgeCommitCallbackEvent(result);
+                    future.ifPresent(future -> future.complete(result));
+                }
             }
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 017491090b0..2ce7c9c5da3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.ShareSessionNotFoundException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownServerException;
@@ -567,16 +568,16 @@ public class ShareConsumeRequestManagerTest {
         ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
 
         // Passing null acknowledgements should mean we do not send the 
background event at all.
-        resultHandler.complete(tip0, null, true);
+        resultHandler.complete(tip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
         assertEquals(0, completedAcknowledgements.size());
 
         // Setting isCommitAsync to false should still not send any background 
event
         // as we have initialized remainingResults to null.
-        resultHandler.complete(tip0, acknowledgements, false);
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
         assertEquals(0, completedAcknowledgements.size());
 
         // Sending non-null acknowledgements means we do send the background 
event
-        resultHandler.complete(tip0, acknowledgements, true);
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
     }
 
@@ -599,16 +600,16 @@ public class ShareConsumeRequestManagerTest {
         ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
 
         // We only send the background event after all results have been 
completed.
-        resultHandler.complete(tip0, acknowledgements, false);
+        resultHandler.complete(tip0, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
         assertEquals(0, completedAcknowledgements.size());
         assertFalse(future.isDone());
 
-        resultHandler.complete(t2ip0, null, false);
+        resultHandler.complete(t2ip0, null, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
         assertEquals(0, completedAcknowledgements.size());
         assertFalse(future.isDone());
 
         // After third response is received, we send the background event.
-        resultHandler.complete(tip1, acknowledgements, false);
+        resultHandler.complete(tip1, acknowledgements, 
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
         assertEquals(1, completedAcknowledgements.size());
         assertEquals(2, completedAcknowledgements.get(0).size());
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
@@ -1315,6 +1316,87 @@ public class ShareConsumeRequestManagerTest {
         assertThrows(NullPointerException.class, (Executable) 
shareFetch.records().get(t2p0));
     }
 
+    @Test
+    public void testShareFetchInvalidResponse() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
+                        tp -> validLeaderEpoch, topicIds, false));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(t2ip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+    }
+
+    @Test
+    public void testShareAcknowledgeInvalidResponse() throws 
InterruptedException {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
+                        tp -> validLeaderEpoch, topicIds, false));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        fetchRecords();
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.commitAsync(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        // If a top-level error is received, we still retry the 
acknowledgements independent of the topic-partitions received in the response.
+        client.prepareResponse(acknowledgeResponseWithTopLevelError(t2ip0, 
Errors.LEADER_NOT_AVAILABLE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(1, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
+
+        TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, 
shareConsumeRequestManager.sendAcknowledgements()));
+
+        client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        // If we do not get the expected partitions in the response, we fail 
these acknowledgements with InvalidRecordStateException.
+        assertEquals(InvalidRecordStateException.class, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException().getClass());
+        completedAcknowledgements.clear();
+
+        // Send remaining acknowledgements through piggybacking on the next 
fetch.
+        Acknowledgements acknowledgements1 = Acknowledgements.empty();
+        acknowledgements1.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements1.add(3L, AcknowledgeType.REJECT);
+
+        shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements1)));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(t2ip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        // If we do not get the expected partitions in the response, we fail 
these acknowledgements with InvalidRecordStateException.
+        assertEquals(InvalidRecordStateException.class, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException().getClass());
+    }
+
     @Test
     public void testCloseShouldBeIdempotent() {
         buildRequestManager();
@@ -2378,6 +2460,12 @@ public class ShareConsumeRequestManagerTest {
         return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
     }
 
+    private ShareAcknowledgeResponse 
acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
+        Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
+                partitionDataForAcknowledge(tp, Errors.NONE));
+        return ShareAcknowledgeResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList());
+    }
+
     private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition 
tp, Errors error) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
                 partitionDataForAcknowledge(tp, error));


Reply via email to