This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new bf760d4ebed KAFKA-18558: Added check before adding previously subscribed partitions (#18562) bf760d4ebed is described below commit bf760d4ebede1eb286ce0e978132b1d5d236d7ac Author: ShivsundarR <s...@confluent.io> AuthorDate: Thu Jan 16 08:17:48 2025 -0500 KAFKA-18558: Added check before adding previously subscribed partitions (#18562) Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../internals/ShareConsumeRequestManager.java | 16 +-- .../internals/ShareConsumeRequestManagerTest.java | 138 +++++++++++++++++++++ 2 files changed, 147 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index a83e971600e..20a022cb6ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -203,15 +203,17 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi if (acknowledgementsToSend != null) { metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); - } - sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); - partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); - partitionsToForgetMap.get(node).add(tip); + sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); + handlerMap.put(node, sessionHandler); + + partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); + partitionsToForgetMap.get(node).add(tip); - topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); - fetchedPartitions.add(tip); - log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); + topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); + fetchedPartitions.add(tip); + log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); + } } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 220483cf22d..0e74a9768a8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -60,6 +60,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.ShareAcknowledgeResponse; +import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; @@ -849,6 +850,13 @@ public class ShareConsumeRequestManagerTest { shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + + // We should send a fetch to the newly subscribed partition. + assertEquals(1, sendFetches()); + } @Test @@ -881,6 +889,129 @@ public class ShareConsumeRequestManagerTest { metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue()); } + @Test + public void testShareFetchWithSubscriptionChangeMultipleNodes() { + buildRequestManager(); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + Set<TopicPartition> partitions = new HashSet<>(); + partitions.add(tp0); + partitions.add(tp1); + subscriptions.assignFromSubscribed(Collections.singletonList(tp0)); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + Node tp0Leader = metadata.fetch().leaderFor(tp0); + Node tp1Leader = metadata.fetch().leaderFor(tp1); + + assertEquals(nodeId0, tp0Leader); + assertEquals(nodeId1, tp1Leader); + + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(0L, AcknowledgeType.ACCEPT); + acknowledgements.add(1L, AcknowledgeType.RELEASE); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + + // Send acknowledgements via ShareFetch + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + fetchRecords(); + // Subscription changes. + subscriptions.assignFromSubscribed(Collections.singletonList(tp1)); + + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(2, pollResult.unsentRequests.size()); + + ShareFetchRequest.Builder builder1, builder2; + if (pollResult.unsentRequests.get(0).node().get() == nodeId0) { + builder1 = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + builder2 = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(1).requestBuilder(); + assertEquals(nodeId1, pollResult.unsentRequests.get(1).node().get()); + } else { + builder1 = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(1).requestBuilder(); + builder2 = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + assertEquals(nodeId0, pollResult.unsentRequests.get(1).node().get()); + assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get()); + } + + // Verify the builder data for node0. + assertEquals(1, builder1.data().topics().size()); + assertEquals(tip0.topicId(), builder1.data().topics().get(0).topicId()); + assertEquals(1, builder1.data().topics().get(0).partitions().size()); + assertEquals(0, builder1.data().topics().get(0).partitions().get(0).partitionIndex()); + assertEquals(1, builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().size()); + assertEquals(0L, builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).firstOffset()); + assertEquals(2L, builder1.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).lastOffset()); + + assertEquals(1, builder1.data().forgottenTopicsData().size()); + assertEquals(tip0.topicId(), builder1.data().forgottenTopicsData().get(0).topicId()); + assertEquals(1, builder1.data().forgottenTopicsData().get(0).partitions().size()); + assertEquals(0, builder1.data().forgottenTopicsData().get(0).partitions().get(0)); + + // Verify the builder data for node1. + assertEquals(1, builder2.data().topics().size()); + assertEquals(tip1.topicId(), builder2.data().topics().get(0).topicId()); + assertEquals(1, builder2.data().topics().get(0).partitions().size()); + assertEquals(1, builder2.data().topics().get(0).partitions().get(0).partitionIndex()); + } + + @Test + public void testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgements() { + buildRequestManager(); + + subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.assignFromSubscribed(Collections.singletonList(tp0)); + + client.updateMetadata( + RequestTestUtils.metadataUpdateWithIds(2, singletonMap(topicName, 2), + tp -> validLeaderEpoch, topicIds, false)); + Node nodeId0 = metadata.fetch().nodeById(0); + Node nodeId1 = metadata.fetch().nodeById(1); + Node tp0Leader = metadata.fetch().leaderFor(tp0); + Node tp1Leader = metadata.fetch().leaderFor(tp1); + + assertEquals(nodeId0, tp0Leader); + assertEquals(nodeId1, tp1Leader); + + // Send the first ShareFetch + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + // Prepare an empty response + client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + fetchRecords(); + + // Change the subscription. + subscriptions.assignFromSubscribed(Collections.singletonList(tp1)); + + + // Now we will be sending the request to node1 only as leader for tip1 is node1. + // We do not build the request for tip0 as there are no acknowledgements to send. + NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); + assertEquals(1, pollResult.unsentRequests.size()); + assertEquals(nodeId1, pollResult.unsentRequests.get(0).node().get()); + + ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder(); + + assertEquals(1, builder.data().topics().size()); + assertEquals(tip1.topicId(), builder.data().topics().get(0).topicId()); + assertEquals(1, builder.data().topics().get(0).partitions().size()); + assertEquals(1, builder.data().topics().get(0).partitions().get(0).partitionIndex()); + assertEquals(0, builder.data().forgottenTopicsData().size()); + } + @Test public void testRetryAcknowledgementsWithLeaderChange() { buildRequestManager(); @@ -1809,6 +1940,13 @@ public class ShareConsumeRequestManagerTest { return pollResult.unsentRequests.size(); } + private NetworkClientDelegate.PollResult sendFetchesReturnPollResult() { + fetch(new HashMap<>()); + NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds()); + networkClientDelegate.addAll(pollResult.unsentRequests); + return pollResult; + } + private int sendAcknowledgements() { NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds()); networkClientDelegate.addAll(pollResult.unsentRequests);