This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new e76f273cc70 KAFKA-19485: Added check before sending acknowledgements
on initial epoch. (#20135)(Cherry-pick) (#20670)
e76f273cc70 is described below
commit e76f273cc7049889e5e5d2bb3adbd0a4cc680efa
Author: Shivsundar R <[email protected]>
AuthorDate: Mon Oct 13 08:28:35 2025 -0400
KAFKA-19485: Added check before sending acknowledgements on initial epoch.
(#20135)(Cherry-pick) (#20670)
*What*
https://issues.apache.org/jira/browse/KAFKA-19485
**Bug :**
There is a bug in `ShareConsumeRequestManager` where we are adding
acknowledgements on initial `ShareSession` epoch even after checking for
it.
Added fix to only include acknowledgements in the request if we have to,
PR also adds the check at another point in the code where we could
potentially be sending such acknowledgements. One of the cases could be
when metadata is refreshed with empty topic IDs after a broker restart.
This means leader information would not be available on the node.
- Consumer subscribed to a partition whose leader was node-0.
- Broker restart happens and node-0 is elected leader again. Broker
starts a new `ShareSession`.
- Background thread sends a fetch request with **non-zero** epoch.
- Broker responds with `SHARE_SESSION_NOT_FOUND`.
- Client updates session epoch to 0 once it receives this error.
- Client updates metadata but receives empty metadata response. (Leader
unavailable)
- Application thread processing the previous fetch, completes and sends
acks to piggyback on next fetch.
- Next fetch will send the piggyback acknowledgements on the fetch for
previously subscribed partitions resulting in error from broker
("`Acknowledge data present on initial epoch`"). (Currently we attempt
to send even if leader is unavailable).
**Fix** : Add a check before sending out acknowledgments if we are on
an initial epoch.
Added unit test covering the above scenario.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 47 +++++++++++++++++-----
.../internals/ShareConsumeRequestManagerTest.java | 41 +++++++++++++++++++
2 files changed, 78 insertions(+), 10 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index b36b778546b..e8a40bd66f3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -175,22 +175,25 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
TopicIdPartition tip = new TopicIdPartition(topicId,
partition);
Acknowledgements acknowledgementsToSend = null;
+ boolean canSendAcknowledgements = true;
+
Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap =
fetchAcknowledgementsToSend.get(node.id());
if (nodeAcksFromFetchMap != null) {
acknowledgementsToSend = nodeAcksFromFetchMap.remove(tip);
+
if (acknowledgementsToSend != null) {
- if (handler.isNewSession()) {
- // Failing the acknowledgements as we cannot have
piggybacked acknowledgements in the initial ShareFetchRequest.
-
acknowledgementsToSend.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
-
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
acknowledgementsToSend));
- } else {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new
HashMap<>()).put(tip, acknowledgementsToSend);
+ // Check if the share session epoch is valid for
sending acknowledgements.
+ if (!maybeAddAcknowledgements(handler, node, tip,
acknowledgementsToSend)) {
+ canSendAcknowledgements = false;
}
}
}
- handler.addPartitionToFetch(tip, acknowledgementsToSend);
+ if (canSendAcknowledgements) {
+ handler.addPartitionToFetch(tip, acknowledgementsToSend);
+ } else {
+ handler.addPartitionToFetch(tip, null);
+ }
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(),
tip.partition()), tip.topic());
log.debug("Added fetch request for partition {} to node {}",
tip, node.id());
@@ -212,8 +215,10 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (nodeAcksFromFetchMap != null) {
nodeAcksFromFetchMap.forEach((tip, acks) -> {
if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
-
metricsManager.recordAcknowledgementSent(acks.size());
-
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new
HashMap<>()).put(tip, acks);
+ // Check if the share session epoch is valid
for sending acknowledgements.
+ if (!maybeAddAcknowledgements(sessionHandler,
node, tip, acks)) {
+ return;
+ }
sessionHandler.addPartitionToAcknowledgeOnly(tip, acks);
handlerMap.put(node, sessionHandler);
@@ -256,6 +261,28 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
return new PollResult(requests);
}
+ /**
+ *
+ * @return True if we can add acknowledgements to the share session.
+ * If we cannot add acknowledgements, they are completed with {@link
Errors#INVALID_SHARE_SESSION_EPOCH} exception.
+ */
+ private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
+ Node node,
+ TopicIdPartition tip,
+ Acknowledgements
acknowledgements) {
+ if (handler.isNewSession()) {
+ // Failing the acknowledgements as we cannot have piggybacked
acknowledgements in the initial ShareFetchRequest.
+ log.debug("Cannot send acknowledgements on initial epoch for
ShareSession for partition {}", tip);
+
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
+ maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip,
acknowledgements));
+ return false;
+ } else {
+ metricsManager.recordAcknowledgementSent(acknowledgements.size());
+ fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new
HashMap<>()).put(tip, acknowledgements);
+ return true;
+ }
+ }
+
public void fetch(Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap,
Map<TopicIdPartition, NodeAcknowledgements>
controlRecordAcknowledgements) {
if (!fetchMoreRecords) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 5cdde2df6ab..0ce3a524880 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1407,8 +1407,49 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+ NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
+ assertEquals(1, pollResult.unsentRequests.size());
+ ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder)
pollResult.unsentRequests.get(0).requestBuilder();
+ assertEquals(1, builder.data().topics().size());
+ // We should not add the acknowledgements as part of the request.
+ assertEquals(0,
builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size());
+
+ assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+ assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ }
+
+ @Test
+ public void
testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionChange() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ assignFromSubscribed(singleton(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
+ fetchRecords();
+
+ // Simulate a broker restart, but no leader change, this resets share
session epoch to 0.
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+ client.prepareResponse(fetchResponseWithTopLevelError(tip0,
Errors.SHARE_SESSION_NOT_FOUND));
+ networkClientDelegate.poll(time.timer(0));
+
+ // Simulate a metadata update with no topics in the response.
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1,
Collections.emptyMap(),
+ tp -> validLeaderEpoch, null, false));
+
+ // The acknowledgements for the initial fetch from tip0 are processed
now and sent to the background thread.
+ Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
+
+ assertEquals(0, completedAcknowledgements.size());
+
+ // Next fetch would not include any acknowledgements.
+ NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
+ assertEquals(0, pollResult.unsentRequests.size());
+
+ // We should fail any waiting acknowledgements for tip-0 as it would
have a share session epoch equal to 0.
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}