This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf760d4ebed KAFKA-18558: Added check before adding previously 
subscribed partitions (#18562)
bf760d4ebed is described below

commit bf760d4ebede1eb286ce0e978132b1d5d236d7ac
Author: ShivsundarR <s...@confluent.io>
AuthorDate: Thu Jan 16 08:17:48 2025 -0500

    KAFKA-18558: Added check before adding previously subscribed partitions 
(#18562)
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../internals/ShareConsumeRequestManager.java      |  16 +--
 .../internals/ShareConsumeRequestManagerTest.java  | 138 +++++++++++++++++++++
 2 files changed, 147 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index a83e971600e..20a022cb6ca 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -203,15 +203,17 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                             if (acknowledgementsToSend != null) {
                                 
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
                                 fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
-                            }
 
-                            sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
-                            partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
-                            partitionsToForgetMap.get(node).add(tip);
+                                sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
+                                handlerMap.put(node, sessionHandler);
+
+                                partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
+                                partitionsToForgetMap.get(node).add(tip);
 
-                            topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
-                            fetchedPartitions.add(tip);
-                            log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
+                                topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+                                fetchedPartitions.add(tip);
+                                log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
+                            }
                         }
                     }
                 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 220483cf22d..0e74a9768a8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -60,6 +60,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
+import org.apache.kafka.common.requests.ShareFetchRequest;
 import org.apache.kafka.common.requests.ShareFetchResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -849,6 +850,13 @@ public class ShareConsumeRequestManagerTest {
         shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
acknowledgements));
 
         assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        // We should send a fetch to the newly subscribed partition.
+        assertEquals(1, sendFetches());
+
     }
 
     @Test
@@ -881,6 +889,129 @@ public class ShareConsumeRequestManagerTest {
                 
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
     }
 
+    @Test
+    public void testShareFetchWithSubscriptionChangeMultipleNodes() {
+        buildRequestManager();
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 2),
+                        tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+
+        assertEquals(nodeId0, tp0Leader);
+        assertEquals(nodeId1, tp1Leader);
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
emptyAcquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(0L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(1L, AcknowledgeType.RELEASE);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+
+        // Send acknowledgements via ShareFetch
+        shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, 
acknowledgements));
+        fetchRecords();
+        // Subscription changes.
+        subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
+
+        NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
+        assertEquals(2, pollResult.unsentRequests.size());
+
+        ShareFetchRequest.Builder builder1, builder2;
+        if (pollResult.unsentRequests.get(0).node().get() == nodeId0) {
+            builder1 = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(0).requestBuilder();
+            builder2 = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(1).requestBuilder();
+            assertEquals(nodeId1, 
pollResult.unsentRequests.get(1).node().get());
+        } else {
+            builder1 = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(1).requestBuilder();
+            builder2 = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(0).requestBuilder();
+            assertEquals(nodeId0, 
pollResult.unsentRequests.get(1).node().get());
+            assertEquals(nodeId1, 
pollResult.unsentRequests.get(0).node().get());
+        }
+
+        // Verify the builder data for node0.
+        assertEquals(1, builder1.data().topics().size());
+        assertEquals(tip0.topicId(), 
builder1.data().topics().get(0).topicId());
+        assertEquals(1, builder1.data().topics().get(0).partitions().size());
+        assertEquals(0, 
builder1.data().topics().get(0).partitions().get(0).partitionIndex());
+        assertEquals(1, 
builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().size());
+        assertEquals(0L, 
builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).firstOffset());
+        assertEquals(2L, 
builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).lastOffset());
+
+        assertEquals(1, builder1.data().forgottenTopicsData().size());
+        assertEquals(tip0.topicId(), 
builder1.data().forgottenTopicsData().get(0).topicId());
+        assertEquals(1, 
builder1.data().forgottenTopicsData().get(0).partitions().size());
+        assertEquals(0, 
builder1.data().forgottenTopicsData().get(0).partitions().get(0));
+
+        // Verify the builder data for node1.
+        assertEquals(1, builder2.data().topics().size());
+        assertEquals(tip1.topicId(), 
builder2.data().topics().get(0).topicId());
+        assertEquals(1, builder2.data().topics().get(0).partitions().size());
+        assertEquals(1, 
builder2.data().topics().get(0).partitions().get(0).partitionIndex());
+    }
+
+    @Test
+    public void 
testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgements() {
+        buildRequestManager();
+
+        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
+
+        client.updateMetadata(
+                RequestTestUtils.metadataUpdateWithIds(2, 
singletonMap(topicName, 2),
+                        tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+        Node tp0Leader = metadata.fetch().leaderFor(tp0);
+        Node tp1Leader = metadata.fetch().leaderFor(tp1);
+
+        assertEquals(nodeId0, tp0Leader);
+        assertEquals(nodeId1, tp1Leader);
+
+        // Send the first ShareFetch
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        // Prepare an empty response
+        client.prepareResponse(fullFetchResponse(tip0, records, 
emptyAcquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        fetchRecords();
+
+        // Change the subscription.
+        subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
+
+
+        // Now we will be sending the request to node1 only as leader for tip1 
is node1.
+        // We do not build the request for tip0 as there are no 
acknowledgements to send.
+        NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
+        assertEquals(1, pollResult.unsentRequests.size());
+        assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get());
+
+        ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) 
pollResult.unsentRequests.get(0).requestBuilder();
+
+        assertEquals(1, builder.data().topics().size());
+        assertEquals(tip1.topicId(), builder.data().topics().get(0).topicId());
+        assertEquals(1, builder.data().topics().get(0).partitions().size());
+        assertEquals(1, 
builder.data().topics().get(0).partitions().get(0).partitionIndex());
+        assertEquals(0, builder.data().forgottenTopicsData().size());
+    }
+
     @Test
     public void testRetryAcknowledgementsWithLeaderChange() {
         buildRequestManager();
@@ -1809,6 +1940,13 @@ public class ShareConsumeRequestManagerTest {
             return pollResult.unsentRequests.size();
         }
 
+        private NetworkClientDelegate.PollResult sendFetchesReturnPollResult() 
{
+            fetch(new HashMap<>());
+            NetworkClientDelegate.PollResult pollResult = 
poll(time.milliseconds());
+            networkClientDelegate.addAll(pollResult.unsentRequests);
+            return pollResult;
+        }
+
         private int sendAcknowledgements() {
             NetworkClientDelegate.PollResult pollResult = 
poll(time.milliseconds());
             networkClientDelegate.addAll(pollResult.unsentRequests);

Reply via email to