This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e40b80c866 KAFKA-18769: Improve leadership changes handling in 
ShareConsumeRequestManager. (#18851)
0e40b80c866 is described below

commit 0e40b80c86631f10d221e1ce8e43055d91451564
Author: ShivsundarR <[email protected]>
AuthorDate: Wed Feb 12 10:54:01 2025 -0500

    KAFKA-18769: Improve leadership changes handling in 
ShareConsumeRequestManager. (#18851)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 163 ++++++---
 .../internals/ShareConsumeRequestManagerTest.java  | 371 ++++++++++++++++++++-
 2 files changed, 475 insertions(+), 59 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 42087e96eef..07782ed771a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -191,8 +191,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
 
         // Iterate over the session handlers to see if there are 
acknowledgements to be sent for partitions
-        // which are no longer part of the current subscription, or whose 
records were fetched from a
-        // previous leader.
+        // which are no longer part of the current subscription.
+        // We fail acknowledgements for records fetched from a previous leader.
         Cluster cluster = metadata.fetch();
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
             Node node = cluster.nodeById(nodeId);
@@ -203,14 +203,20 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     Map<TopicIdPartition, Acknowledgements> 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
                     if (nodeAcksFromFetchMap != null) {
                         nodeAcksFromFetchMap.forEach((tip, acks) -> {
-                            
metricsManager.recordAcknowledgementSent(acks.size());
-                            
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acks);
+                            if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
+                                
metricsManager.recordAcknowledgementSent(acks.size());
+                                
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acks);
 
-                            sessionHandler.addPartitionToAcknowledgeOnly(tip, 
acks);
-                            handlerMap.put(node, sessionHandler);
+                                
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
+                                handlerMap.put(node, sessionHandler);
 
-                            topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
-                            log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, nodeId);
+                                topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+                                log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, nodeId);
+                            } else {
+                                log.debug("Leader for the partition is down or 
has changed, failing Acknowledgements for partition {}", tip);
+                                
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                                
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, 
acks));
+                            }
                         });
 
                         nodeAcksFromFetchMap.clear();
@@ -475,11 +481,16 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
                     NodeAcknowledgements nodeAcknowledgements = 
acknowledgementsMap.get(tip);
                     if ((nodeAcknowledgements != null) && 
(nodeAcknowledgements.nodeId() == node.id())) {
-                        acknowledgementsMapForNode.put(tip, 
nodeAcknowledgements.acknowledgements());
+                        if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
+                            acknowledgementsMapForNode.put(tip, 
nodeAcknowledgements.acknowledgements());
 
-                        
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
-                        log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
-                        resultCount.incrementAndGet();
+                            
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
+                            log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                            resultCount.incrementAndGet();
+                        } else {
+                            
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                            
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, 
nodeAcknowledgements.acknowledgements()));
+                        }
                     }
                 }
 
@@ -523,29 +534,34 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
                     NodeAcknowledgements nodeAcknowledgements = 
acknowledgementsMap.get(tip);
                     if ((nodeAcknowledgements != null) && 
(nodeAcknowledgements.nodeId() == node.id())) {
-                        Acknowledgements acknowledgements = 
nodeAcknowledgements.acknowledgements();
-                        acknowledgementsMapForNode.put(tip, acknowledgements);
-
-                        
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
-                        AcknowledgeRequestState asyncRequestState = 
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
-                        if (asyncRequestState == null) {
-                            
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new 
AcknowledgeRequestState(logContext,
-                                    
ShareConsumeRequestManager.class.getSimpleName() + ":2",
-                                    Long.MAX_VALUE,
-                                    retryBackoffMs,
-                                    retryBackoffMaxMs,
-                                    sessionHandler,
-                                    nodeId,
-                                    acknowledgementsMapForNode,
-                                    resultHandler,
-                                    AcknowledgeRequestType.COMMIT_ASYNC
-                            ));
-                        } else {
-                            Acknowledgements prevAcks = 
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
-                            if (prevAcks != null) {
-                                
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
+                        if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
+                            Acknowledgements acknowledgements = 
nodeAcknowledgements.acknowledgements();
+                            acknowledgementsMapForNode.put(tip, 
acknowledgements);
+
+                            
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                            log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                            AcknowledgeRequestState asyncRequestState = 
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+                            if (asyncRequestState == null) {
+                                
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new 
AcknowledgeRequestState(logContext,
+                                        
ShareConsumeRequestManager.class.getSimpleName() + ":2",
+                                        Long.MAX_VALUE,
+                                        retryBackoffMs,
+                                        retryBackoffMaxMs,
+                                        sessionHandler,
+                                        nodeId,
+                                        acknowledgementsMapForNode,
+                                        resultHandler,
+                                        AcknowledgeRequestType.COMMIT_ASYNC
+                                ));
+                            } else {
+                                Acknowledgements prevAcks = 
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
+                                if (prevAcks != null) {
+                                    
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
+                                }
                             }
+                        } else {
+                            
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                            
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, 
nodeAcknowledgements.acknowledgements()));
                         }
                     }
                 }
@@ -572,40 +588,57 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         final ResultHandler resultHandler = new ResultHandler(resultCount, 
Optional.empty());
 
         closing = true;
+        Map<Integer, Map<TopicIdPartition, Acknowledgements>> 
acknowledgementsMapAllNodes = new HashMap<>();
+
+        acknowledgementsMap.forEach((tip, nodeAcks) -> {
+            if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+                Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new 
HashMap<>());
+                Acknowledgements prevAcks = acksMap.putIfAbsent(tip, 
nodeAcks.acknowledgements());
+                if (prevAcks != null) {
+                    acksMap.get(tip).merge(nodeAcks.acknowledgements());
+                }
+            } else {
+                
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, 
nodeAcks.acknowledgements()));
+            }
+        });
 
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
             Node node = cluster.nodeById(nodeId);
             if (node != null) {
-                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
-
-                acknowledgementsMap.forEach((tip, nodeAcks) -> {
-                    Acknowledgements acknowledgements = 
Acknowledgements.empty();
-                    Map<TopicIdPartition, Acknowledgements> 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
-                    if (nodeAcksFromFetchMap != null) {
-                        Acknowledgements acksFromFetchMap = 
nodeAcksFromFetchMap.remove(tip);
-                        if (acksFromFetchMap != null) {
-                            acknowledgements.merge(acksFromFetchMap);
+                //Add any waiting piggyback acknowledgements for the node.
+                Map<TopicIdPartition, Acknowledgements> fetchAcks = 
fetchAcknowledgementsToSend.remove(nodeId);
+                if (fetchAcks != null) {
+                    fetchAcks.forEach((tip, acks) -> {
+                        if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
+                            Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
+                            Acknowledgements prevAcks = 
acksMap.putIfAbsent(tip, acks);
+                            if (prevAcks != null) {
+                                acksMap.get(tip).merge(acks);
+                            }
+                        } else {
+                            
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                            
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip, 
acks));
                         }
-                    }
-
-                    if (nodeAcks.nodeId() == node.id()) {
-                        acknowledgements.merge(nodeAcks.acknowledgements());
-                    }
-
-                    if (!acknowledgements.isEmpty()) {
-                        acknowledgementsMapForNode.put(tip, acknowledgements);
+                    });
+                }
 
+                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId);
+                if (acknowledgementsMapForNode != null) {
+                    acknowledgementsMapForNode.forEach((tip, acknowledgements) 
-> {
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
                         log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
                         resultCount.incrementAndGet();
-                    }
-                });
+                    });
+                } else {
+                    acknowledgementsMapForNode = new HashMap<>();
+                }
 
                 acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
 
                 // Ensure there is no close() request already present as they 
are blocking calls
                 // and only one request can be active at a time.
-                if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != 
null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
+                if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != 
null && 
isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest()))
 {
                     log.error("Attempt to call close() when there is an 
existing close request for node {}-{}", node.id(), 
acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
                     closeFuture.completeExceptionally(
                             new IllegalStateException("Attempt to call close() 
when there is an existing close request for node : " + node.id()));
@@ -630,6 +663,28 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         return closeFuture;
     }
 
+    /**
+     * The method checks whether the leader for a topicIdPartition has changed.
+     * @param nodeId The previous leader for the partition.
+     * @param topicIdPartition The TopicIdPartition to check.
+     * @return Returns true if leader information is available and leader has 
changed.
+     * If the leader information is not available or if the leader has not 
changed, it returns false.
+     */
+    private boolean isLeaderKnownToHaveChanged(int nodeId, TopicIdPartition 
topicIdPartition) {
+        Optional<Node> leaderNode = 
metadata.currentLeader(topicIdPartition.topicPartition()).leader;
+        if (leaderNode.isPresent()) {
+            if (leaderNode.get().id() != nodeId) {
+                log.debug("Node {} is no longer the leader for partition {}, 
failing acknowledgements", nodeId, topicIdPartition);
+                return true;
+            }
+        } else {
+            log.debug("No leader found for partition {}", topicIdPartition);
+            metadata.requestUpdate(false);
+            return false;
+        }
+        return false;
+    }
+
     private void handleShareFetchSuccess(Node fetchTarget,
                                          @SuppressWarnings("unused") 
ShareFetchRequestData requestData,
                                          ClientResponse resp) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index ede3f5415fc..999e7bbe0cc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -848,18 +848,97 @@ public class ShareConsumeRequestManagerTest {
         acknowledgements.add(2L, AcknowledgeType.ACCEPT);
         acknowledgements.add(3L, AcknowledgeType.REJECT);
 
-        assignFromSubscribed(singleton(tp1));
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap(topicName2, 1),
+                        tp -> validLeaderEpoch, topicIds, false));
 
         shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
new NodeAcknowledgements(0, acknowledgements)));
 
         assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
 
-        client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+        client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        // We should send a fetch to the newly subscribed partition.
+        assertEquals(1, sendFetches());
+
+    }
+
+    @Test
+    public void testCommitSyncWithSubscriptionChange() {
+        buildRequestManager();
+
+        assignFromSubscribed(singleton(tp0));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap(topicName2, 1),
+                        tp -> validLeaderEpoch, topicIds, false));
+
+        shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, 
new NodeAcknowledgements(0, acknowledgements)),
+                calculateDeadlineMs(time.timer(100)));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
         networkClientDelegate.poll(time.timer(0));
 
         // We should send a fetch to the newly subscribed partition.
         assertEquals(1, sendFetches());
+    }
+
+    @Test
+    public void testCloseWithSubscriptionChange() {
+        buildRequestManager();
+
+        assignFromSubscribed(singleton(tp0));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap(topicName2, 1),
+                        tp -> validLeaderEpoch, topicIds, false));
+
+        
shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, 
new NodeAcknowledgements(0, acknowledgements)),
+                calculateDeadlineMs(time.timer(100)));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        // As we are closing, we would not send any more fetches.
+        assertEquals(0, sendFetches());
     }
 
     @Test
@@ -884,7 +963,12 @@ public class ShareConsumeRequestManagerTest {
         shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new 
NodeAcknowledgements(0, acknowledgements)));
         fetchRecords();
         // Subscription changes.
-        assignFromSubscribed(singleton(tp1));
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap(topicName2, 1),
+                        tp -> validLeaderEpoch, topicIds, false));
 
         assertEquals(1, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -1738,6 +1822,7 @@ public class ShareConsumeRequestManagerTest {
     @EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH", 
"NOT_LEADER_OR_FOLLOWER"})
     public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors 
error) {
         buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
         subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
@@ -1796,9 +1881,11 @@ public class ShareConsumeRequestManagerTest {
         assertNotEquals(startingClusterMetadata, metadata.fetch());
 
         // Even though the partitions are on the same leader, records were 
fetched on the previous leader.
-        // A fetch is sent to the previous leader to remove the partition from 
the share session and get the acknowledge error code.
-        assertEquals(2, sendFetches());
+        // We do not send those acknowledgements to the previous leader, we 
fail them with NOT_LEADER_OR_FOLLOWER exception.
+        assertEquals(1, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+        assertEquals(acknowledgements, 
completedAcknowledgements.get(0).get(tip0));
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
 
         partitionData.clear();
         partitionData.put(tip0,
@@ -1835,6 +1922,280 @@ public class ShareConsumeRequestManagerTest {
         assertEquals(1, fetchedRecords.size());
     }
 
+    @Test
+    void testLeadershipChangeAfterFetchBeforeCommitAsync() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(partitions);
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 2),
+                        tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+
+        Cluster startingClusterMetadata = metadata.fetch();
+        assertFalse(metadata.updateRequested());
+
+        assertEquals(2, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = new LinkedHashMap<>();
+        partitionData.put(tip0,
+                new ShareFetchResponseData.PartitionData()
+                        .setPartitionIndex(tip0.topicPartition().partition())
+                        .setErrorCode(Errors.NONE.code())
+                        .setRecords(records)
+                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
+                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        partitionData.clear();
+        partitionData.put(tip1,
+                new ShareFetchResponseData.PartitionData()
+                        .setPartitionIndex(tip1.topicPartition().partition())
+                        .setErrorCode(Errors.NONE.code())
+                        .setRecords(records)
+                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
+                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = 
partitionRecords.get(tp0);
+        assertEquals(1, fetchedRecords.size());
+
+        fetchedRecords = partitionRecords.get(tp1);
+        assertEquals(2, fetchedRecords.size());
+
+        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+        Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
+        acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
+
+        Map<TopicIdPartition, NodeAcknowledgements> commitAcks = new 
HashMap<>();
+        commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
+        commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1));
+
+        // Move the leadership of tp0 onto node 1
+        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = 
new HashMap<>();
+        partitionLeaders.put(tp0, new 
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), 
Optional.of(validLeaderEpoch + 1)));
+        metadata.updatePartitionLeadership(partitionLeaders, List.of());
+
+        assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+        // We fail the acknowledgements for records which were received from 
node0 with NOT_LEADER_OR_FOLLOWER exception.
+        shareConsumeRequestManager.commitAsync(commitAcks);
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp0, 
completedAcknowledgements.get(0).get(tip0));
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+
+        // We only send acknowledgements for tip1 to node1.
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(1, completedAcknowledgements.get(1).size());
+        assertEquals(acknowledgementsTp1, 
completedAcknowledgements.get(1).get(tip1));
+        
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+
+    }
+
+    @Test
+    void testLeadershipChangeAfterFetchBeforeCommitSync() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(partitions);
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 2),
+                        tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+
+        Cluster startingClusterMetadata = metadata.fetch();
+        assertFalse(metadata.updateRequested());
+
+        assertEquals(2, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = new LinkedHashMap<>();
+        partitionData.put(tip0,
+                new ShareFetchResponseData.PartitionData()
+                        .setPartitionIndex(tip0.topicPartition().partition())
+                        .setErrorCode(Errors.NONE.code())
+                        .setRecords(records)
+                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
+                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        partitionData.clear();
+        partitionData.put(tip1,
+                new ShareFetchResponseData.PartitionData()
+                        .setPartitionIndex(tip1.topicPartition().partition())
+                        .setErrorCode(Errors.NONE.code())
+                        .setRecords(records)
+                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
+                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = 
partitionRecords.get(tp0);
+        assertEquals(1, fetchedRecords.size());
+
+        fetchedRecords = partitionRecords.get(tp1);
+        assertEquals(2, fetchedRecords.size());
+
+        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+        Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
+        acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
+
+        Map<TopicIdPartition, NodeAcknowledgements> commitAcks = new 
HashMap<>();
+        commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
+        commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1));
+
+        // Move the leadership of tp0 onto node 1
+        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = 
new HashMap<>();
+        partitionLeaders.put(tp0, new 
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), 
Optional.of(validLeaderEpoch + 1)));
+        metadata.updatePartitionLeadership(partitionLeaders, List.of());
+
+        assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+        // We fail the acknowledgements for records which were received from 
node0 with NOT_LEADER_OR_FOLLOWER exception.
+        shareConsumeRequestManager.commitSync(commitAcks, 
calculateDeadlineMs(time.timer(100)));
+
+        // Verify if the callback was invoked with the failed acknowledgements.
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp0, 
completedAcknowledgements.get(0).get(tip0));
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+
+        // We only send acknowledgements for tip1 to node1.
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(1, completedAcknowledgements.get(1).size());
+        assertEquals(acknowledgementsTp1, 
completedAcknowledgements.get(1).get(tip1));
+        
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+    }
+
+    @Test
+    void testLeadershipChangeAfterFetchBeforeClose() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(partitions);
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 2),
+                        tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+
+        Cluster startingClusterMetadata = metadata.fetch();
+        assertFalse(metadata.updateRequested());
+
+        assertEquals(2, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData = new LinkedHashMap<>();
+        partitionData.put(tip0,
+                new ShareFetchResponseData.PartitionData()
+                        .setPartitionIndex(tip0.topicPartition().partition())
+                        .setErrorCode(Errors.NONE.code())
+                        .setRecords(records)
+                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
+                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId0);
+        partitionData.clear();
+        partitionData.put(tip1,
+                new ShareFetchResponseData.PartitionData()
+                        .setPartitionIndex(tip1.topicPartition().partition())
+                        .setErrorCode(Errors.NONE.code())
+                        .setRecords(records)
+                        
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
+                        .setAcknowledgeErrorCode(Errors.NONE.code()));
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList()), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = 
partitionRecords.get(tp0);
+        assertEquals(1, fetchedRecords.size());
+
+        fetchedRecords = partitionRecords.get(tp1);
+        assertEquals(2, fetchedRecords.size());
+
+        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+        Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
+        acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.fetch(Collections.singletonMap(tip1, new 
NodeAcknowledgements(1, acknowledgementsTp1)));
+
+        // Move the leadership of tp0 onto node 1
+        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = 
new HashMap<>();
+        partitionLeaders.put(tp0, new 
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), 
Optional.of(validLeaderEpoch + 1)));
+        metadata.updatePartitionLeadership(partitionLeaders, List.of());
+
+        assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+        // We fail the acknowledgements for records which were received from 
node0 with NOT_LEADER_OR_FOLLOWER exception.
+        
shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0, 
new NodeAcknowledgements(0, acknowledgementsTp0)), 
calculateDeadlineMs(time.timer(100)));
+
+        // Verify if the callback was invoked with the failed acknowledgements.
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+        completedAcknowledgements.clear();
+
+        // As we are closing, we still send the request to both the nodes, but 
with empty acknowledgements to node0, as it is no longer the leader.
+        assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponseFrom(fullAcknowledgeResponse(tip1, Errors.NONE), 
nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+
+        client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp1, 
completedAcknowledgements.get(0).get(tip1));
+        
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
+    }
+
     @Test
     void testWhenLeadershipChangedAfterDisconnected() {
         buildRequestManager();


Reply via email to