This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fd4d58899bc KAFKA-17819 : Handle piggyback acknowledgements when
subscription changes in ShareConsumeRequestManager. (#17537)
fd4d58899bc is described below
commit fd4d58899bcc5c4ac69c5d3c309bc05ed7bf4e20
Author: ShivsundarR <[email protected]>
AuthorDate: Tue Nov 5 02:36:11 2024 -0500
KAFKA-17819 : Handle piggyback acknowledgements when subscription changes
in ShareConsumeRequestManager. (#17537)
Currently in ShareConsumeRequestManager, after we receive a
ShareFetchResponse, if the subscription changes before we acknowledge(via
ShareFetch), then we do not acknowledge the records which are not part of the
updated subscription. Instead we must acknowledge all the records that we had
received irrespective of the current subscription.
This bug is only when we are acknowledging via ShareFetch where we use
SubscriptionState::fetchablePartitions to obtain the partitions to fetch. In
ShareAcknowledge, as we are getting the partitions from the active share
sessions, even if the subscription changed, the session would remain active.
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 84 ++++++++++++++++++-
.../kafka/common/requests/ShareFetchRequest.java | 31 ++++---
.../internals/ShareConsumeRequestManagerTest.java | 94 ++++++++++++++++++++++
.../java/kafka/test/api/ShareConsumerTest.java | 40 +++++++++
4 files changed, 233 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index a92da18c0b5..cfaad3667fa 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -55,6 +55,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@@ -93,6 +94,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
private boolean closing = false;
private final CompletableFuture<Void> closeFuture;
private boolean isAcknowledgementCommitCallbackRegistered = false;
+ private final Map<IdAndPartition, String> forgottenTopicNames = new
HashMap<>();
ShareConsumeRequestManager(final Time time,
final LogContext logContext,
@@ -142,6 +144,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Map<Node, ShareSessionHandler> handlerMap = new HashMap<>();
Map<String, Uuid> topicIds = metadata.topicIds();
+ Set<TopicIdPartition> fetchedPartitions = new HashSet<>();
for (TopicPartition partition : partitionsToFetch()) {
Optional<Node> leaderOpt =
metadata.currentLeader(partition).leader;
@@ -172,14 +175,55 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
}
handler.addPartitionToFetch(tip, acknowledgementsToSend);
+ fetchedPartitions.add(tip);
- log.debug("Added fetch request for partition {} to node {}",
partition, node.id());
+ log.debug("Added fetch request for partition {} to node {}",
tip, node.id());
}
}
+ // Map storing the list of partitions to forget in the upcoming
request.
+ Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new
HashMap<>();
+ Cluster cluster = metadata.fetch();
+ // Iterating over the session handlers to see if there are
acknowledgements to be sent for partitions
+ // which are no longer part of the current subscription.
+ sessionHandlers.forEach((nodeId, sessionHandler) -> {
+ Node node = cluster.nodeById(nodeId);
+ if (node != null) {
+ if (nodesWithPendingRequests.contains(node.id())) {
+ log.trace("Skipping fetch because previous fetch request
to {} has not been processed", node.id());
+ } else {
+ for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
+ if (!fetchedPartitions.contains(tip)) {
+ Acknowledgements acknowledgementsToSend =
fetchAcknowledgementsMap.get(tip);
+ if (acknowledgementsToSend != null) {
+
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+ }
+ sessionHandler.addPartitionToFetch(tip,
acknowledgementsToSend);
+ partitionsToForgetMap.putIfAbsent(node, new
ArrayList<>());
+ partitionsToForgetMap.get(node).add(tip);
+
+ forgottenTopicNames.putIfAbsent(new
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+ fetchedPartitions.add(tip);
+ log.debug("Added fetch request for previously
subscribed partition {} to node {}", tip, node.id());
+ }
+ }
+ }
+ }
+ });
+
Map<Node, ShareFetchRequest.Builder> builderMap = new
LinkedHashMap<>();
for (Map.Entry<Node, ShareSessionHandler> entry :
handlerMap.entrySet()) {
- builderMap.put(entry.getKey(),
entry.getValue().newShareFetchBuilder(groupId, fetchConfig));
+ ShareFetchRequest.Builder builder =
entry.getValue().newShareFetchBuilder(groupId, fetchConfig);
+ Node node = entry.getKey();
+
+ if (partitionsToForgetMap.containsKey(node)) {
+ if (builder.data().forgottenTopicsData() == null) {
+ builder.data().setForgottenTopicsData(new ArrayList<>());
+ }
+ builder.updateForgottenData(partitionsToForgetMap.get(node));
+ }
+
+ builderMap.put(node, builder);
}
List<UnsentRequest> requests =
builderMap.entrySet().stream().map(entry -> {
@@ -594,7 +638,9 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
topicResponse.partitions().forEach(partition ->
responseData.put(new
TopicIdPartition(topicResponse.topicId(),
partition.partitionIndex(),
-
metadata.topicNames().get(topicResponse.topicId())), partition)));
+
metadata.topicNames().getOrDefault(topicResponse.topicId(),
+ forgottenTopicNames.remove(new
IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))),
partition))
+ );
final Set<TopicPartition> partitions =
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
final ShareFetchMetricsAggregator shareFetchMetricsAggregator =
new ShareFetchMetricsAggregator(metricsManager, partitions);
@@ -1173,6 +1219,38 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
return acknowledgeRequestStates.get(nodeId);
}
+ static class IdAndPartition {
+ private final Uuid topicId;
+ private final int partitionIndex;
+
+ IdAndPartition(Uuid topicId, int partitionIndex) {
+ this.topicId = topicId;
+ this.partitionIndex = partitionIndex;
+ }
+
+ int getPartitionIndex() {
+ return partitionIndex;
+ }
+
+ Uuid getTopicId() {
+ return topicId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicId, partitionIndex);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IdAndPartition that = (IdAndPartition) o;
+ return Objects.equals(topicId, that.topicId) &&
+ partitionIndex == that.partitionIndex;
+ }
+ }
+
public enum AcknowledgeRequestType {
COMMIT_ASYNC((byte) 0),
COMMIT_SYNC((byte) 1),
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
index 36586f74259..7ed14b4bdb1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -109,24 +109,29 @@ public class ShareFetchRequest extends AbstractRequest {
});
}
+ Builder builder = new Builder(data, true);
// And finally, forget the topic-partitions that are no longer in
the session
if (!forget.isEmpty()) {
- Map<Uuid, List<Integer>> forgetMap = new HashMap<>();
- for (TopicIdPartition tip : forget) {
- List<Integer> partList =
forgetMap.computeIfAbsent(tip.topicId(), k -> new ArrayList<>());
- partList.add(tip.partition());
- }
data.setForgottenTopicsData(new ArrayList<>());
- forgetMap.forEach((topicId, partList) -> {
- ShareFetchRequestData.ForgottenTopic forgetTopic = new
ShareFetchRequestData.ForgottenTopic()
- .setTopicId(topicId)
- .setPartitions(new ArrayList<>());
- partList.forEach(index ->
forgetTopic.partitions().add(index));
- data.forgottenTopicsData().add(forgetTopic);
- });
+ builder.updateForgottenData(forget);
}
- return new Builder(data, true);
+ return builder;
+ }
+
+ public void updateForgottenData(List<TopicIdPartition> forget) {
+ Map<Uuid, List<Integer>> forgetMap = new HashMap<>();
+ for (TopicIdPartition tip : forget) {
+ List<Integer> partList =
forgetMap.computeIfAbsent(tip.topicId(), k -> new ArrayList<>());
+ partList.add(tip.partition());
+ }
+ forgetMap.forEach((topicId, partList) -> {
+ ShareFetchRequestData.ForgottenTopic forgetTopic = new
ShareFetchRequestData.ForgottenTopic()
+ .setTopicId(topicId)
+ .setPartitions(new ArrayList<>());
+ partList.forEach(index -> forgetTopic.partitions().add(index));
+ data.forgottenTopicsData().add(forgetTopic);
+ });
}
public ShareFetchRequestData data() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 3c91c6cdcf9..6af9509d04b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -451,6 +451,45 @@ public class ShareConsumeRequestManagerTest {
completedAcknowledgements.clear();
}
+ @Test
+ public void testAcknowledgeOnCloseWithPendingCommitSync() {
+ buildRequestManager();
+ // Enabling the config so that background event is sent when the
acknowledgement response is received.
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ assignFromSubscribed(Collections.singleton(tp0));
+
+ // normal fetch
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0,
acknowledgements),
+ calculateDeadlineMs(time.timer(100)));
+ shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
+ calculateDeadlineMs(time.timer(100)));
+
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+
+ client.prepareResponse(emptyAcknowledgeResponse());
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ assertEquals(Collections.singletonMap(tip0, acknowledgements),
completedAcknowledgements.get(0));
+ completedAcknowledgements.clear();
+ }
+
@Test
public void testBatchingAcknowledgeRequestStates() {
buildRequestManager();
@@ -598,6 +637,61 @@ public class ShareConsumeRequestManagerTest {
assertEquals(0,
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
}
+ @Test
+ public void testCommitAsyncWithSubscriptionChange() {
+ buildRequestManager();
+
+ assignFromSubscribed(singleton(tp0));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ assignFromSubscribed(singleton(tp1));
+
+ shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0,
acknowledgements));
+
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+ }
+
+ @Test
+ public void testShareFetchWithSubscriptionChange() {
+ buildRequestManager();
+
+ assignFromSubscribed(singleton(tp0));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(0L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(1L, AcknowledgeType.RELEASE);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+
+ // Send acknowledgements via ShareFetch
+ shareConsumeRequestManager.fetch(Collections.singletonMap(tip0,
acknowledgements));
+ fetchRecords();
+ // Subscription changes.
+ assignFromSubscribed(singleton(tp1));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+ assertEquals(3.0,
+
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
+ }
+
@Test
public void testRetryAcknowledgementsWithLeaderChange() throws
InterruptedException {
buildRequestManager();
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index da1877b43ab..000cbd3bda8 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -96,6 +96,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ShareConsumerTest {
private KafkaClusterTestKit cluster;
private final TopicPartition tp = new TopicPartition("topic", 0);
+ private final TopicPartition tp2 = new TopicPartition("topic2", 0);
private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
private static final String DEFAULT_STATE_PERSISTER =
"org.apache.kafka.server.share.persister.DefaultStatePersister";
private static final String NO_OP_PERSISTER =
"org.apache.kafka.server.share.persister.NoOpShareStatePersister";
@@ -129,6 +130,7 @@ public class ShareConsumerTest {
cluster.waitForActiveController();
cluster.waitForReadyBrokers();
createTopic("topic");
+ createTopic("topic2");
warmup();
}
@@ -255,6 +257,44 @@ public class ShareConsumerTest {
producer.close();
}
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testAcknowledgementSentOnSubscriptionChange(String persister)
throws ExecutionException, InterruptedException {
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
+ Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ producer.send(record);
+ ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record2).get();
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(1, records.count());
+
+ shareConsumer.subscribe(Collections.singletonList(tp2.topic()));
+
+ // Waiting for heartbeat to propagate the subscription change.
+ TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
+ DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
from the updated subscription");
+
+ producer.send(record2).get();
+
+ //Starting the 3rd poll to invoke the callback
+ shareConsumer.poll(Duration.ofMillis(500));
+
+ // Verifying if the callback was invoked for the partitions in the old
subscription.
+ assertTrue(partitionExceptionMap.containsKey(tp));
+ assertNull(partitionExceptionMap.get(tp));
+
+ producer.close();
+ shareConsumer.close();
+ }
+
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void
testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) {