This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0e40b80c866 KAFKA-18769: Improve leadership changes handling in
ShareConsumeRequestManager. (#18851)
0e40b80c866 is described below
commit 0e40b80c86631f10d221e1ce8e43055d91451564
Author: ShivsundarR <[email protected]>
AuthorDate: Wed Feb 12 10:54:01 2025 -0500
KAFKA-18769: Improve leadership changes handling in
ShareConsumeRequestManager. (#18851)
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 163 ++++++---
.../internals/ShareConsumeRequestManagerTest.java | 371 ++++++++++++++++++++-
2 files changed, 475 insertions(+), 59 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 42087e96eef..07782ed771a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -191,8 +191,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
// Iterate over the session handlers to see if there are
acknowledgements to be sent for partitions
- // which are no longer part of the current subscription, or whose
records were fetched from a
- // previous leader.
+ // which are no longer part of the current subscription.
+ // We fail acknowledgements for records fetched from a previous leader.
Cluster cluster = metadata.fetch();
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
@@ -203,14 +203,20 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Map<TopicIdPartition, Acknowledgements>
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
if (nodeAcksFromFetchMap != null) {
nodeAcksFromFetchMap.forEach((tip, acks) -> {
-
metricsManager.recordAcknowledgementSent(acks.size());
-
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new
HashMap<>()).put(tip, acks);
+ if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
+
metricsManager.recordAcknowledgementSent(acks.size());
+
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new
HashMap<>()).put(tip, acks);
- sessionHandler.addPartitionToAcknowledgeOnly(tip,
acks);
- handlerMap.put(node, sessionHandler);
+
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
+ handlerMap.put(node, sessionHandler);
- topicNamesMap.putIfAbsent(new
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
- log.debug("Added fetch request for previously
subscribed partition {} to node {}", tip, nodeId);
+ topicNamesMap.putIfAbsent(new
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
+ log.debug("Added fetch request for previously
subscribed partition {} to node {}", tip, nodeId);
+ } else {
+ log.debug("Leader for the partition is down or
has changed, failing Acknowledgements for partition {}", tip);
+
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip,
acks));
+ }
});
nodeAcksFromFetchMap.clear();
@@ -475,11 +481,16 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
NodeAcknowledgements nodeAcknowledgements =
acknowledgementsMap.get(tip);
if ((nodeAcknowledgements != null) &&
(nodeAcknowledgements.nodeId() == node.id())) {
- acknowledgementsMapForNode.put(tip,
nodeAcknowledgements.acknowledgements());
+ if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
+ acknowledgementsMapForNode.put(tip,
nodeAcknowledgements.acknowledgements());
-
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
- log.debug("Added sync acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
- resultCount.incrementAndGet();
+
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
+ log.debug("Added sync acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
+ resultCount.incrementAndGet();
+ } else {
+
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip,
nodeAcknowledgements.acknowledgements()));
+ }
}
}
@@ -523,29 +534,34 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
NodeAcknowledgements nodeAcknowledgements =
acknowledgementsMap.get(tip);
if ((nodeAcknowledgements != null) &&
(nodeAcknowledgements.nodeId() == node.id())) {
- Acknowledgements acknowledgements =
nodeAcknowledgements.acknowledgements();
- acknowledgementsMapForNode.put(tip, acknowledgements);
-
-
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
- AcknowledgeRequestState asyncRequestState =
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
- if (asyncRequestState == null) {
-
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new
AcknowledgeRequestState(logContext,
-
ShareConsumeRequestManager.class.getSimpleName() + ":2",
- Long.MAX_VALUE,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- resultHandler,
- AcknowledgeRequestType.COMMIT_ASYNC
- ));
- } else {
- Acknowledgements prevAcks =
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
- if (prevAcks != null) {
-
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
+ if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
+ Acknowledgements acknowledgements =
nodeAcknowledgements.acknowledgements();
+ acknowledgementsMapForNode.put(tip,
acknowledgements);
+
+
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+ log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
+ AcknowledgeRequestState asyncRequestState =
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+ if (asyncRequestState == null) {
+
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new
AcknowledgeRequestState(logContext,
+
ShareConsumeRequestManager.class.getSimpleName() + ":2",
+ Long.MAX_VALUE,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ sessionHandler,
+ nodeId,
+ acknowledgementsMapForNode,
+ resultHandler,
+ AcknowledgeRequestType.COMMIT_ASYNC
+ ));
+ } else {
+ Acknowledgements prevAcks =
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
+ if (prevAcks != null) {
+
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
+ }
}
+ } else {
+
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip,
nodeAcknowledgements.acknowledgements()));
}
}
}
@@ -572,40 +588,57 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
final ResultHandler resultHandler = new ResultHandler(resultCount,
Optional.empty());
closing = true;
+ Map<Integer, Map<TopicIdPartition, Acknowledgements>>
acknowledgementsMapAllNodes = new HashMap<>();
+
+ acknowledgementsMap.forEach((tip, nodeAcks) -> {
+ if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+ Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new
HashMap<>());
+ Acknowledgements prevAcks = acksMap.putIfAbsent(tip,
nodeAcks.acknowledgements());
+ if (prevAcks != null) {
+ acksMap.get(tip).merge(nodeAcks.acknowledgements());
+ }
+ } else {
+
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip,
nodeAcks.acknowledgements()));
+ }
+ });
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
- Map<TopicIdPartition, Acknowledgements>
acknowledgementsMapForNode = new HashMap<>();
-
- acknowledgementsMap.forEach((tip, nodeAcks) -> {
- Acknowledgements acknowledgements =
Acknowledgements.empty();
- Map<TopicIdPartition, Acknowledgements>
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
- if (nodeAcksFromFetchMap != null) {
- Acknowledgements acksFromFetchMap =
nodeAcksFromFetchMap.remove(tip);
- if (acksFromFetchMap != null) {
- acknowledgements.merge(acksFromFetchMap);
+ //Add any waiting piggyback acknowledgements for the node.
+ Map<TopicIdPartition, Acknowledgements> fetchAcks =
fetchAcknowledgementsToSend.remove(nodeId);
+ if (fetchAcks != null) {
+ fetchAcks.forEach((tip, acks) -> {
+ if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
+ Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
+ Acknowledgements prevAcks =
acksMap.putIfAbsent(tip, acks);
+ if (prevAcks != null) {
+ acksMap.get(tip).merge(acks);
+ }
+ } else {
+
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(tip,
acks));
}
- }
-
- if (nodeAcks.nodeId() == node.id()) {
- acknowledgements.merge(nodeAcks.acknowledgements());
- }
-
- if (!acknowledgements.isEmpty()) {
- acknowledgementsMapForNode.put(tip, acknowledgements);
+ });
+ }
+ Map<TopicIdPartition, Acknowledgements>
acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId);
+ if (acknowledgementsMapForNode != null) {
+ acknowledgementsMapForNode.forEach((tip, acknowledgements)
-> {
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added closing acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
- }
- });
+ });
+ } else {
+ acknowledgementsMapForNode = new HashMap<>();
+ }
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
// Ensure there is no close() request already present as they
are blocking calls
// and only one request can be active at a time.
- if (acknowledgeRequestStates.get(nodeId).getCloseRequest() !=
null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
+ if (acknowledgeRequestStates.get(nodeId).getCloseRequest() !=
null &&
isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest()))
{
log.error("Attempt to call close() when there is an
existing close request for node {}-{}", node.id(),
acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
closeFuture.completeExceptionally(
new IllegalStateException("Attempt to call close()
when there is an existing close request for node : " + node.id()));
@@ -630,6 +663,28 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
return closeFuture;
}
+ /**
+ * The method checks whether the leader for a topicIdPartition has changed.
+ * @param nodeId The previous leader for the partition.
+ * @param topicIdPartition The TopicIdPartition to check.
+ * @return Returns true if leader information is available and leader has
changed.
+ * If the leader information is not available or if the leader has not
changed, it returns false.
+ */
+ private boolean isLeaderKnownToHaveChanged(int nodeId, TopicIdPartition
topicIdPartition) {
+ Optional<Node> leaderNode =
metadata.currentLeader(topicIdPartition.topicPartition()).leader;
+ if (leaderNode.isPresent()) {
+ if (leaderNode.get().id() != nodeId) {
+ log.debug("Node {} is no longer the leader for partition {},
failing acknowledgements", nodeId, topicIdPartition);
+ return true;
+ }
+ } else {
+ log.debug("No leader found for partition {}", topicIdPartition);
+ metadata.requestUpdate(false);
+ return false;
+ }
+ return false;
+ }
+
private void handleShareFetchSuccess(Node fetchTarget,
@SuppressWarnings("unused")
ShareFetchRequestData requestData,
ClientResponse resp) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index ede3f5415fc..999e7bbe0cc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -848,18 +848,97 @@ public class ShareConsumeRequestManagerTest {
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);
- assignFromSubscribed(singleton(tp1));
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+ subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1,
singletonMap(topicName2, 1),
+ tp -> validLeaderEpoch, topicIds, false));
shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0,
new NodeAcknowledgements(0, acknowledgements)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
- client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+ client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+
+ // We should send a fetch to the newly subscribed partition.
+ assertEquals(1, sendFetches());
+
+ }
+
+ @Test
+ public void testCommitSyncWithSubscriptionChange() {
+ buildRequestManager();
+
+ assignFromSubscribed(singleton(tp0));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+ subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1,
singletonMap(topicName2, 1),
+ tp -> validLeaderEpoch, topicIds, false));
+
+ shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0,
new NodeAcknowledgements(0, acknowledgements)),
+ calculateDeadlineMs(time.timer(100)));
+
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
// We should send a fetch to the newly subscribed partition.
assertEquals(1, sendFetches());
+ }
+
+ @Test
+ public void testCloseWithSubscriptionChange() {
+ buildRequestManager();
+
+ assignFromSubscribed(singleton(tp0));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+ subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1,
singletonMap(topicName2, 1),
+ tp -> validLeaderEpoch, topicIds, false));
+
+
shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0,
new NodeAcknowledgements(0, acknowledgements)),
+ calculateDeadlineMs(time.timer(100)));
+
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+
+ // As we are closing, we would not send any more fetches.
+ assertEquals(0, sendFetches());
}
@Test
@@ -884,7 +963,12 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, new
NodeAcknowledgements(0, acknowledgements)));
fetchRecords();
// Subscription changes.
- assignFromSubscribed(singleton(tp1));
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
+ subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1,
singletonMap(topicName2, 1),
+ tp -> validLeaderEpoch, topicIds, false));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -1738,6 +1822,7 @@ public class ShareConsumeRequestManagerTest {
@EnumSource(value = Errors.class, names = {"FENCED_LEADER_EPOCH",
"NOT_LEADER_OR_FOLLOWER"})
public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors
error) {
buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
Set<TopicPartition> partitions = new HashSet<>();
@@ -1796,9 +1881,11 @@ public class ShareConsumeRequestManagerTest {
assertNotEquals(startingClusterMetadata, metadata.fetch());
// Even though the partitions are on the same leader, records were
fetched on the previous leader.
- // A fetch is sent to the previous leader to remove the partition from
the share session and get the acknowledge error code.
- assertEquals(2, sendFetches());
+ // We do not send those acknowledgements to the previous leader, we
fail them with NOT_LEADER_OR_FOLLOWER exception.
+ assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+ assertEquals(acknowledgements,
completedAcknowledgements.get(0).get(tip0));
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
partitionData.clear();
partitionData.put(tip0,
@@ -1835,6 +1922,280 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1, fetchedRecords.size());
}
+ @Test
+ void testLeadershipChangeAfterFetchBeforeCommitAsync() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp0);
+ partitions.add(tp1);
+ subscriptions.assignFromSubscribed(partitions);
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(2,
singletonMap(topicName, 2),
+ tp -> validLeaderEpoch, topicIds, false));
+ Node nodeId0 = metadata.fetch().nodeById(0);
+ Node nodeId1 = metadata.fetch().nodeById(1);
+
+ Cluster startingClusterMetadata = metadata.fetch();
+ assertFalse(metadata.updateRequested());
+
+ assertEquals(2, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = new LinkedHashMap<>();
+ partitionData.put(tip0,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(tip0.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setRecords(records)
+
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
+ .setAcknowledgeErrorCode(Errors.NONE.code()));
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ partitionData.clear();
+ partitionData.put(tip1,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(tip1.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setRecords(records)
+
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
+ .setAcknowledgeErrorCode(Errors.NONE.code()));
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.containsKey(tp0));
+ assertTrue(partitionRecords.containsKey(tp1));
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords =
partitionRecords.get(tp0);
+ assertEquals(1, fetchedRecords.size());
+
+ fetchedRecords = partitionRecords.get(tp1);
+ assertEquals(2, fetchedRecords.size());
+
+ Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+ acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+ Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
+ acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
+
+ Map<TopicIdPartition, NodeAcknowledgements> commitAcks = new
HashMap<>();
+ commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
+ commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1));
+
+ // Move the leadership of tp0 onto node 1
+ HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders =
new HashMap<>();
+ partitionLeaders.put(tp0, new
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()),
Optional.of(validLeaderEpoch + 1)));
+ metadata.updatePartitionLeadership(partitionLeaders, List.of());
+
+ assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+ // We fail the acknowledgements for records which were received from
node0 with NOT_LEADER_OR_FOLLOWER exception.
+ shareConsumeRequestManager.commitAsync(commitAcks);
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp0,
completedAcknowledgements.get(0).get(tip0));
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+
+ // We only send acknowledgements for tip1 to node1.
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(1, completedAcknowledgements.get(1).size());
+ assertEquals(acknowledgementsTp1,
completedAcknowledgements.get(1).get(tip1));
+
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+
+ }
+
+ @Test
+ void testLeadershipChangeAfterFetchBeforeCommitSync() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp0);
+ partitions.add(tp1);
+ subscriptions.assignFromSubscribed(partitions);
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(2,
singletonMap(topicName, 2),
+ tp -> validLeaderEpoch, topicIds, false));
+ Node nodeId0 = metadata.fetch().nodeById(0);
+ Node nodeId1 = metadata.fetch().nodeById(1);
+
+ Cluster startingClusterMetadata = metadata.fetch();
+ assertFalse(metadata.updateRequested());
+
+ assertEquals(2, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = new LinkedHashMap<>();
+ partitionData.put(tip0,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(tip0.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setRecords(records)
+
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
+ .setAcknowledgeErrorCode(Errors.NONE.code()));
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ partitionData.clear();
+ partitionData.put(tip1,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(tip1.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setRecords(records)
+
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
+ .setAcknowledgeErrorCode(Errors.NONE.code()));
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.containsKey(tp0));
+ assertTrue(partitionRecords.containsKey(tp1));
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords =
partitionRecords.get(tp0);
+ assertEquals(1, fetchedRecords.size());
+
+ fetchedRecords = partitionRecords.get(tp1);
+ assertEquals(2, fetchedRecords.size());
+
+ Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+ acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+ Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
+ acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
+
+ Map<TopicIdPartition, NodeAcknowledgements> commitAcks = new
HashMap<>();
+ commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
+ commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1));
+
+ // Move the leadership of tp0 onto node 1
+ HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders =
new HashMap<>();
+ partitionLeaders.put(tp0, new
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()),
Optional.of(validLeaderEpoch + 1)));
+ metadata.updatePartitionLeadership(partitionLeaders, List.of());
+
+ assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+ // We fail the acknowledgements for records which were received from
node0 with NOT_LEADER_OR_FOLLOWER exception.
+ shareConsumeRequestManager.commitSync(commitAcks,
calculateDeadlineMs(time.timer(100)));
+
+ // Verify if the callback was invoked with the failed acknowledgements.
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp0,
completedAcknowledgements.get(0).get(tip0));
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+
+ // We only send acknowledgements for tip1 to node1.
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(1, completedAcknowledgements.get(1).size());
+ assertEquals(acknowledgementsTp1,
completedAcknowledgements.get(1).get(tip1));
+
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+ }
+
+ @Test
+ void testLeadershipChangeAfterFetchBeforeClose() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp0);
+ partitions.add(tp1);
+ subscriptions.assignFromSubscribed(partitions);
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(2,
singletonMap(topicName, 2),
+ tp -> validLeaderEpoch, topicIds, false));
+ Node nodeId0 = metadata.fetch().nodeById(0);
+ Node nodeId1 = metadata.fetch().nodeById(1);
+
+ Cluster startingClusterMetadata = metadata.fetch();
+ assertFalse(metadata.updateRequested());
+
+ assertEquals(2, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData = new LinkedHashMap<>();
+ partitionData.put(tip0,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(tip0.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setRecords(records)
+
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
+ .setAcknowledgeErrorCode(Errors.NONE.code()));
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId0);
+ partitionData.clear();
+ partitionData.put(tip1,
+ new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(tip1.topicPartition().partition())
+ .setErrorCode(Errors.NONE.code())
+ .setRecords(records)
+
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2))
+ .setAcknowledgeErrorCode(Errors.NONE.code()));
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList()), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.containsKey(tp0));
+ assertTrue(partitionRecords.containsKey(tp1));
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords =
partitionRecords.get(tp0);
+ assertEquals(1, fetchedRecords.size());
+
+ fetchedRecords = partitionRecords.get(tp1);
+ assertEquals(2, fetchedRecords.size());
+
+ Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+ acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+ Acknowledgements acknowledgementsTp1 = Acknowledgements.empty();
+ acknowledgementsTp1.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgementsTp1.add(2L, AcknowledgeType.ACCEPT);
+
+ shareConsumeRequestManager.fetch(Collections.singletonMap(tip1, new
NodeAcknowledgements(1, acknowledgementsTp1)));
+
+ // Move the leadership of tp0 onto node 1
+ HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders =
new HashMap<>();
+ partitionLeaders.put(tp0, new
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()),
Optional.of(validLeaderEpoch + 1)));
+ metadata.updatePartitionLeadership(partitionLeaders, List.of());
+
+ assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+ // We fail the acknowledgements for records which were received from
node0 with NOT_LEADER_OR_FOLLOWER exception.
+
shareConsumeRequestManager.acknowledgeOnClose(Collections.singletonMap(tip0,
new NodeAcknowledgements(0, acknowledgementsTp0)),
calculateDeadlineMs(time.timer(100)));
+
+ // Verify if the callback was invoked with the failed acknowledgements.
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ completedAcknowledgements.clear();
+
+ // As we are closing, we still send the request to both the nodes, but
with empty acknowledgements to node0, as it is no longer the leader.
+ assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponseFrom(fullAcknowledgeResponse(tip1, Errors.NONE),
nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+
+ client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0);
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp1,
completedAcknowledgements.get(0).get(tip1));
+
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
+ }
+
@Test
void testWhenLeadershipChangedAfterDisconnected() {
buildRequestManager();