This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd4d58899bc KAFKA-17819 : Handle piggyback acknowledgements when 
subscription changes in ShareConsumeRequestManager. (#17537)
fd4d58899bc is described below

commit fd4d58899bcc5c4ac69c5d3c309bc05ed7bf4e20
Author: ShivsundarR <[email protected]>
AuthorDate: Tue Nov 5 02:36:11 2024 -0500

    KAFKA-17819 : Handle piggyback acknowledgements when subscription changes 
in ShareConsumeRequestManager. (#17537)
    
    Currently in ShareConsumeRequestManager, after we receive a 
ShareFetchResponse, if the subscription changes before we acknowledge(via 
ShareFetch), then we do not acknowledge the records which are not part of the 
updated subscription. Instead we must acknowledge all the records that we had 
received irrespective of the current subscription.
    This bug is only when we are acknowledging via ShareFetch where we use 
SubscriptionState::fetchablePartitions to obtain the partitions to fetch. In 
ShareAcknowledge, as we are getting the partitions from the active share 
sessions, even if the subscription changed, the session would remain active.
    
    
    Reviewers:  Andrew Schofield <[email protected]>,  Manikumar Reddy 
<[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 84 ++++++++++++++++++-
 .../kafka/common/requests/ShareFetchRequest.java   | 31 ++++---
 .../internals/ShareConsumeRequestManagerTest.java  | 94 ++++++++++++++++++++++
 .../java/kafka/test/api/ShareConsumerTest.java     | 40 +++++++++
 4 files changed, 233 insertions(+), 16 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index a92da18c0b5..cfaad3667fa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -55,6 +55,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
@@ -93,6 +94,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     private boolean closing = false;
     private final CompletableFuture<Void> closeFuture;
     private boolean isAcknowledgementCommitCallbackRegistered = false;
+    private final Map<IdAndPartition, String> forgottenTopicNames = new 
HashMap<>();
 
     ShareConsumeRequestManager(final Time time,
                                final LogContext logContext,
@@ -142,6 +144,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
         Map<Node, ShareSessionHandler> handlerMap = new HashMap<>();
         Map<String, Uuid> topicIds = metadata.topicIds();
+        Set<TopicIdPartition> fetchedPartitions = new HashSet<>();
         for (TopicPartition partition : partitionsToFetch()) {
             Optional<Node> leaderOpt = 
metadata.currentLeader(partition).leader;
 
@@ -172,14 +175,55 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
                 }
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
+                fetchedPartitions.add(tip);
 
-                log.debug("Added fetch request for partition {} to node {}", 
partition, node.id());
+                log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
+        // Map storing the list of partitions to forget in the upcoming 
request.
+        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+        Cluster cluster = metadata.fetch();
+        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription.
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Node node = cluster.nodeById(nodeId);
+            if (node != null) {
+                if (nodesWithPendingRequests.contains(node.id())) {
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                } else {
+                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
+                        if (!fetchedPartitions.contains(tip)) {
+                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsMap.get(tip);
+                            if (acknowledgementsToSend != null) {
+                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                            }
+                            sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
+                            partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
+                            partitionsToForgetMap.get(node).add(tip);
+
+                            forgottenTopicNames.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+                            fetchedPartitions.add(tip);
+                            log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
+                        }
+                    }
+                }
+            }
+        });
+
         Map<Node, ShareFetchRequest.Builder> builderMap = new 
LinkedHashMap<>();
         for (Map.Entry<Node, ShareSessionHandler> entry : 
handlerMap.entrySet()) {
-            builderMap.put(entry.getKey(), 
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+            ShareFetchRequest.Builder builder = 
entry.getValue().newShareFetchBuilder(groupId, fetchConfig);
+            Node node = entry.getKey();
+
+            if (partitionsToForgetMap.containsKey(node)) {
+                if (builder.data().forgottenTopicsData() == null) {
+                    builder.data().setForgottenTopicsData(new ArrayList<>());
+                }
+                builder.updateForgottenData(partitionsToForgetMap.get(node));
+            }
+
+            builderMap.put(node, builder);
         }
 
         List<UnsentRequest> requests = 
builderMap.entrySet().stream().map(entry -> {
@@ -594,7 +638,9 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                     topicResponse.partitions().forEach(partition ->
                             responseData.put(new 
TopicIdPartition(topicResponse.topicId(),
                                     partition.partitionIndex(),
-                                    
metadata.topicNames().get(topicResponse.topicId())), partition)));
+                                    
metadata.topicNames().getOrDefault(topicResponse.topicId(),
+                                            forgottenTopicNames.remove(new 
IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))), 
partition))
+            );
 
             final Set<TopicPartition> partitions = 
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
             final ShareFetchMetricsAggregator shareFetchMetricsAggregator = 
new ShareFetchMetricsAggregator(metricsManager, partitions);
@@ -1173,6 +1219,38 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         return acknowledgeRequestStates.get(nodeId);
     }
 
+    static class IdAndPartition {
+        private final Uuid topicId;
+        private final int partitionIndex;
+
+        IdAndPartition(Uuid topicId, int partitionIndex) {
+            this.topicId = topicId;
+            this.partitionIndex = partitionIndex;
+        }
+
+        int getPartitionIndex() {
+            return partitionIndex;
+        }
+
+        Uuid getTopicId() {
+            return topicId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topicId, partitionIndex);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            IdAndPartition that = (IdAndPartition) o;
+            return Objects.equals(topicId, that.topicId) &&
+                    partitionIndex == that.partitionIndex;
+        }
+    }
+
     public enum AcknowledgeRequestType {
         COMMIT_ASYNC((byte) 0),
         COMMIT_SYNC((byte) 1),
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 36586f74259..7ed14b4bdb1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -109,24 +109,29 @@ public class ShareFetchRequest extends AbstractRequest {
                 });
             }
 
+            Builder builder = new Builder(data, true);
             // And finally, forget the topic-partitions that are no longer in 
the session
             if (!forget.isEmpty()) {
-                Map<Uuid, List<Integer>> forgetMap = new HashMap<>();
-                for (TopicIdPartition tip : forget) {
-                    List<Integer> partList = 
forgetMap.computeIfAbsent(tip.topicId(), k -> new ArrayList<>());
-                    partList.add(tip.partition());
-                }
                 data.setForgottenTopicsData(new ArrayList<>());
-                forgetMap.forEach((topicId, partList) -> {
-                    ShareFetchRequestData.ForgottenTopic forgetTopic = new 
ShareFetchRequestData.ForgottenTopic()
-                            .setTopicId(topicId)
-                            .setPartitions(new ArrayList<>());
-                    partList.forEach(index -> 
forgetTopic.partitions().add(index));
-                    data.forgottenTopicsData().add(forgetTopic);
-                });
+                builder.updateForgottenData(forget);
             }
 
-            return new Builder(data, true);
+            return builder;
+        }
+
+        public void updateForgottenData(List<TopicIdPartition> forget) {
+            Map<Uuid, List<Integer>> forgetMap = new HashMap<>();
+            for (TopicIdPartition tip : forget) {
+                List<Integer> partList = 
forgetMap.computeIfAbsent(tip.topicId(), k -> new ArrayList<>());
+                partList.add(tip.partition());
+            }
+            forgetMap.forEach((topicId, partList) -> {
+                ShareFetchRequestData.ForgottenTopic forgetTopic = new 
ShareFetchRequestData.ForgottenTopic()
+                        .setTopicId(topicId)
+                        .setPartitions(new ArrayList<>());
+                partList.forEach(index -> forgetTopic.partitions().add(index));
+                data.forgottenTopicsData().add(forgetTopic);
+            });
         }
 
         public ShareFetchRequestData data() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 3c91c6cdcf9..6af9509d04b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -451,6 +451,45 @@ public class ShareConsumeRequestManagerTest {
         completedAcknowledgements.clear();
     }
 
+    @Test
+    public void testAcknowledgeOnCloseWithPendingCommitSync() {
+        buildRequestManager();
+        // Enabling the config so that background event is sent when the 
acknowledgement response is received.
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        assignFromSubscribed(Collections.singleton(tp0));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, 
acknowledgements),
+                calculateDeadlineMs(time.timer(100)));
+        shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
+                calculateDeadlineMs(time.timer(100)));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+
+        client.prepareResponse(emptyAcknowledgeResponse());
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        assertEquals(Collections.singletonMap(tip0, acknowledgements), 
completedAcknowledgements.get(0));
+        completedAcknowledgements.clear();
+    }
+
     @Test
     public void testBatchingAcknowledgeRequestStates() {
         buildRequestManager();
@@ -598,6 +637,61 @@ public class ShareConsumeRequestManagerTest {
         assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
     }
 
+    @Test
+    public void testCommitAsyncWithSubscriptionChange() {
+        buildRequestManager();
+
+        assignFromSubscribed(singleton(tp0));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        assignFromSubscribed(singleton(tp1));
+
+        shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
acknowledgements));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+    }
+
+    @Test
+    public void testShareFetchWithSubscriptionChange() {
+        buildRequestManager();
+
+        assignFromSubscribed(singleton(tp0));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(0L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(1L, AcknowledgeType.RELEASE);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+
+        // Send acknowledgements via ShareFetch
+        shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, 
acknowledgements));
+        fetchRecords();
+        // Subscription changes.
+        assignFromSubscribed(singleton(tp1));
+
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+        assertEquals(3.0,
+                
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
+    }
+
     @Test
     public void testRetryAcknowledgementsWithLeaderChange() throws 
InterruptedException {
         buildRequestManager();
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index da1877b43ab..000cbd3bda8 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -96,6 +96,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class ShareConsumerTest {
     private KafkaClusterTestKit cluster;
     private final TopicPartition tp = new TopicPartition("topic", 0);
+    private final TopicPartition tp2 = new TopicPartition("topic2", 0);
     private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
     private static final String DEFAULT_STATE_PERSISTER = 
"org.apache.kafka.server.share.persister.DefaultStatePersister";
     private static final String NO_OP_PERSISTER = 
"org.apache.kafka.server.share.persister.NoOpShareStatePersister";
@@ -129,6 +130,7 @@ public class ShareConsumerTest {
         cluster.waitForActiveController();
         cluster.waitForReadyBrokers();
         createTopic("topic");
+        createTopic("topic2");
         warmup();
     }
 
@@ -255,6 +257,44 @@ public class ShareConsumerTest {
         producer.close();
     }
 
+    @ParameterizedTest(name = "{displayName}.persister={0}")
+    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+    public void testAcknowledgementSentOnSubscriptionChange(String persister) 
throws ExecutionException, InterruptedException {
+        Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+        Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        producer.send(record);
+        ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), 
"value".getBytes());
+        producer.send(record2).get();
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        assertEquals(1, records.count());
+
+        shareConsumer.subscribe(Collections.singletonList(tp2.topic()));
+
+        // Waiting for heartbeat to propagate the subscription change.
+        TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
+                DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
from the updated subscription");
+
+        producer.send(record2).get();
+
+        //Starting the 3rd poll to invoke the callback
+        shareConsumer.poll(Duration.ofMillis(500));
+
+        // Verifying if the callback was invoked for the partitions in the old 
subscription.
+        assertTrue(partitionExceptionMap.containsKey(tp));
+        assertNull(partitionExceptionMap.get(tp));
+
+        producer.close();
+        shareConsumer.close();
+    }
+
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
     public void 
testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) {

Reply via email to