This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2880e041294 KAFKA-18779: Validate responses from broker in client for
ShareFetch and ShareAcknowledge RPCs. (#18939)
2880e041294 is described below
commit 2880e041294639c41525767e644e5ba2564ed6f6
Author: Shivsundar R <[email protected]>
AuthorDate: Mon Feb 24 05:27:24 2025 -0500
KAFKA-18779: Validate responses from broker in client for ShareFetch and
ShareAcknowledge RPCs. (#18939)
- Currently if we received extraneous topic partitions in the response
or if the response was missing some partitions requested, we were
processing the response as it came and even populated the callback with
these partitions.
- These invalid responses should be parsed at the
`ShareConsumeRequestManager`.
- If the response missed any acknowledgements for partitions that were
requested, then we fail the request with `InvalidRecordStateException`
and populate the callbacks.
- For any extraneous partitions in the response, we log an error and
ignore them.
Some refactors are also done in this PR in ShareConsumeRequestManager to
make the code more readable.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 252 +++++++++++++--------
.../internals/ShareConsumeRequestManagerTest.java | 100 +++++++-
2 files changed, 254 insertions(+), 98 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 9e8add41a1a..375a99fdf51 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -28,9 +28,11 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
+import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -96,6 +98,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
private final CompletableFuture<Void> closeFuture;
private boolean isAcknowledgementCommitCallbackRegistered = false;
private final Map<IdAndPartition, String> topicNamesMap = new HashMap<>();
+ private static final String INVALID_RESPONSE = "Acknowledgement not
successful due to invalid response from broker";
ShareConsumeRequestManager(final Time time,
final LogContext logContext,
@@ -366,7 +369,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
AtomicBoolean
isAsyncSent) {
boolean asyncSent = true;
try {
- if (acknowledgeRequestState == null ||
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
+ if (acknowledgeRequestState == null ||
(!acknowledgeRequestState.isCloseRequest() &&
acknowledgeRequestState.isEmpty())) {
return Optional.empty();
}
@@ -441,7 +444,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
private boolean isRequestStateInProgress(AcknowledgeRequestState
acknowledgeRequestState) {
if (acknowledgeRequestState == null) {
return false;
- } else if (acknowledgeRequestState.onClose()) {
+ } else if (acknowledgeRequestState.isCloseRequest()) {
return !acknowledgeRequestState.isProcessed;
} else {
return !(acknowledgeRequestState.isEmpty());
@@ -716,11 +719,12 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
final Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
responseData = new LinkedHashMap<>();
response.data().responses().forEach(topicResponse ->
- topicResponse.partitions().forEach(partition ->
- responseData.put(new
TopicIdPartition(topicResponse.topicId(),
- partition.partitionIndex(),
-
metadata.topicNames().getOrDefault(topicResponse.topicId(),
- topicNamesMap.remove(new
IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))),
partition))
+ topicResponse.partitions().forEach(partition -> {
+ TopicIdPartition tip =
lookupTopicId(topicResponse.topicId(), partition.partitionIndex());
+ if (tip != null) {
+ responseData.put(tip, partition);
+ }
+ })
);
final Set<TopicPartition> partitions =
responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet());
@@ -749,7 +753,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Errors partitionError =
Errors.forCode(partitionData.errorCode());
if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER ||
partitionError == Errors.FENCED_LEADER_EPOCH) {
- log.debug("For {}, received error {}, with
leaderIdAndEpoch {}", tip, partitionError, partitionData.currentLeader());
+ log.debug("For {}, received error {}, with
leaderIdAndEpoch {} in ShareFetch", tip, partitionError,
partitionData.currentLeader());
if (partitionData.currentLeader().leaderId() != -1 &&
partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(tip.topicPartition(), new
Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())));
@@ -771,6 +775,14 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
}
+ // Handle any acknowledgements which were not received in the
response for this node.
+ if (fetchAcknowledgementsInFlight.get(fetchTarget.id()) != null) {
+
fetchAcknowledgementsInFlight.remove(fetchTarget.id()).forEach((partition,
acknowledgements) -> {
+ acknowledgements.complete(new
InvalidRecordStateException(INVALID_RESPONSE));
+
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition,
acknowledgements));
+ });
+ }
+
if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
List<Node> leaderNodes =
response.data().nodeEndpoints().stream()
.map(e -> new Node(e.nodeId(), e.host(), e.port(),
e.rack()))
@@ -797,13 +809,15 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
requestData.topics().forEach(topic ->
topic.partitions().forEach(partition -> {
- TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
- partition.partitionIndex(),
- metadata.topicNames().get(topic.topicId()));
+ TopicIdPartition tip = lookupTopicId(topic.topicId(),
partition.partitionIndex());
+ if (tip == null) {
+ return;
+ }
Map<TopicIdPartition, Acknowledgements>
nodeAcknowledgementsInFlight =
fetchAcknowledgementsInFlight.get(fetchTarget.id());
if (nodeAcknowledgementsInFlight != null) {
Acknowledgements acks =
nodeAcknowledgementsInFlight.remove(tip);
+
if (acks != null) {
metricsManager.recordFailedAcknowledgements(acks.size());
if (error instanceof KafkaException) {
@@ -833,20 +847,21 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Map<TopicPartition, Metadata.LeaderIdAndEpoch>
partitionsWithUpdatedLeaderInfo = new HashMap<>();
- if (acknowledgeRequestState.onClose()) {
- response.data().responses().forEach(topic ->
topic.partitions().forEach(partition -> {
- TopicIdPartition tip = new
TopicIdPartition(topic.topicId(),
- partition.partitionIndex(),
- metadata.topicNames().get(topic.topicId()));
- if (partition.errorCode() != Errors.NONE.code()) {
+ if (acknowledgeRequestState.isCloseRequest()) {
+ response.data().responses().forEach(topicResponse ->
topicResponse.partitions().forEach(partitionData -> {
+ TopicIdPartition tip =
lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
+ if (tip == null) {
+ return;
+ }
+
+ if (partitionData.errorCode() != Errors.NONE.code()) {
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
}
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forCode(partition.errorCode()));
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forCode(partitionData.errorCode()));
}));
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();
-
} else {
if
(!acknowledgeRequestState.sessionHandler.handleResponse(response,
resp.requestHeader().apiVersion())) {
// Received a response-level error code.
@@ -856,59 +871,23 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
// We retry the request until the timer expires,
unless we are closing.
acknowledgeRequestState.moveAllToIncompleteAcks();
} else {
-
response.data().responses().forEach(shareAcknowledgeTopicResponse ->
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
- TopicIdPartition tip = new
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
- partitionData.partitionIndex(),
-
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
-
-
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
- }));
+
acknowledgeRequestState.processPendingInFlightAcknowledgements(response.error().exception());
acknowledgeRequestState.processingComplete();
}
} else {
AtomicBoolean shouldRetry = new AtomicBoolean(false);
// Check all partition level error codes
-
response.data().responses().forEach(shareAcknowledgeTopicResponse ->
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
+ response.data().responses().forEach(topicResponse ->
topicResponse.partitions().forEach(partitionData -> {
Errors partitionError =
Errors.forCode(partitionData.errorCode());
- TopicIdPartition tip = new
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
- partitionData.partitionIndex(),
-
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
- if (partitionError.exception() != null) {
- boolean retry = false;
-
- if (partitionError ==
Errors.NOT_LEADER_OR_FOLLOWER || partitionError == Errors.FENCED_LEADER_EPOCH) {
- // If the leader has changed, there's no point
in retrying the operation because the acquisition locks
- // will have been released.
- TopicPartition tp = new
TopicPartition(metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()),
partitionData.partitionIndex());
-
- log.debug("For {}, received error {}, with
leaderIdAndEpoch {}", tp, partitionError, partitionData.currentLeader());
- if (partitionData.currentLeader().leaderId()
!= -1 && partitionData.currentLeader().leaderEpoch() != -1) {
- partitionsWithUpdatedLeaderInfo.put(tp,
new Metadata.LeaderIdAndEpoch(
-
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())));
- }
- } else if (partitionError.exception() instanceof
RetriableException) {
- retry = true;
- }
-
- if (retry) {
- // Move to incomplete acknowledgements to retry
-
acknowledgeRequestState.moveToIncompleteAcks(tip);
- shouldRetry.set(true);
- } else {
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
-
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
- }
- } else {
-
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError);
+ TopicIdPartition tip =
lookupTopicId(topicResponse.topicId(), partitionData.partitionIndex());
+ if (tip == null) {
+ return;
}
+
+ handlePartitionError(partitionData,
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip,
shouldRetry);
}));
- if (shouldRetry.get()) {
-
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
- } else {
-
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
- acknowledgeRequestState.processingComplete();
- }
+ processRetryLogic(acknowledgeRequestState, shouldRetry,
responseCompletionTimeMs);
}
}
@@ -927,7 +906,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
log.debug("Removing pending request for node {} - success",
fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
- if (acknowledgeRequestState.onClose()) {
+ if (acknowledgeRequestState.isCloseRequest()) {
log.debug("Removing node from ShareSession {}",
fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
@@ -945,9 +924,11 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
requestData.topics().forEach(topic ->
topic.partitions().forEach(partition -> {
- TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
- partition.partitionIndex(),
- metadata.topicNames().get(topic.topicId()));
+ TopicIdPartition tip = lookupTopicId(topic.topicId(),
partition.partitionIndex());
+ if (tip == null) {
+ return;
+ }
+
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forException(error));
}));
@@ -957,13 +938,81 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
log.debug("Removing pending request for node {} - failed",
fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
- if (acknowledgeRequestState.onClose()) {
+ if (acknowledgeRequestState.isCloseRequest()) {
log.debug("Removing node from ShareSession {}",
fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
}
}
+ private void
handlePartitionError(ShareAcknowledgeResponseData.PartitionData partitionData,
+ Map<TopicPartition,
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+ AcknowledgeRequestState
acknowledgeRequestState,
+ Errors partitionError,
+ TopicIdPartition tip,
+ AtomicBoolean shouldRetry) {
+ if (partitionError.exception() != null) {
+ boolean retry = false;
+ if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER ||
partitionError == Errors.FENCED_LEADER_EPOCH) {
+ // If the leader has changed, there's no point in retrying the
operation because the acquisition locks
+ // will have been released.
+ updateLeaderInfoMap(partitionData,
partitionsWithUpdatedLeaderInfo, partitionError, tip.topicPartition());
+ } else if (partitionError.exception() instanceof
RetriableException) {
+ retry = true;
+ }
+
+ if (retry) {
+ if (acknowledgeRequestState.moveToIncompleteAcks(tip)) {
+ shouldRetry.set(true);
+ }
+ } else {
+
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
partitionError);
+ }
+ } else {
+ acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
partitionError);
+ }
+ }
+
+ private void processRetryLogic(AcknowledgeRequestState
acknowledgeRequestState,
+ AtomicBoolean shouldRetry,
+ long responseCompletionTimeMs) {
+ if (shouldRetry.get()) {
+ acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
+
+ // Check for any acknowledgements that did not receive a response.
+ // These acknowledgements are failed with
InvalidRecordStateException.
+ acknowledgeRequestState.processPendingInFlightAcknowledgements(new
InvalidRecordStateException(INVALID_RESPONSE));
+ } else {
+
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
+ acknowledgeRequestState.processingComplete();
+ }
+ }
+
+ private void
updateLeaderInfoMap(ShareAcknowledgeResponseData.PartitionData partitionData,
+ Map<TopicPartition,
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+ Errors partitionError,
+ TopicPartition tp) {
+
+ log.debug("For {}, received error {}, with leaderIdAndEpoch {} in
ShareAcknowledge", tp, partitionError, partitionData.currentLeader());
+ if (partitionData.currentLeader().leaderId() != -1 &&
partitionData.currentLeader().leaderEpoch() != -1) {
+ partitionsWithUpdatedLeaderInfo.put(tp, new
Metadata.LeaderIdAndEpoch(
+ Optional.of(partitionData.currentLeader().leaderId()),
+ Optional.of(partitionData.currentLeader().leaderEpoch())
+ ));
+ }
+ }
+
+ private TopicIdPartition lookupTopicId(Uuid topicId, int partitionIndex) {
+ String topicName = metadata.topicNames().getOrDefault(topicId,
+ topicNamesMap.remove(new IdAndPartition(topicId,
partitionIndex)));
+ if (topicName == null) {
+ log.error("Topic name not found in metadata for topicId {} and
partitionIndex {}", topicId, partitionIndex);
+ return null;
+ }
+ return new TopicIdPartition(topicId, partitionIndex, topicName);
+ }
+
private List<TopicPartition> partitionsToFetch() {
return subscriptions.fetchablePartitions(tp -> true);
}
@@ -1061,7 +1110,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
UnsentRequest buildRequest() {
// If this is the closing request, close the share session by
setting the final epoch
- if (onClose()) {
+ if (isCloseRequest()) {
sessionHandler.notifyClose();
}
@@ -1144,11 +1193,13 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* through a background event.
*/
void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors
acknowledgeErrorCode) {
- Acknowledgements acks = inFlightAcknowledgements.get(tip);
+ Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
acks.complete(acknowledgeErrorCode.exception());
+ resultHandler.complete(tip, acks, requestType);
+ } else {
+ log.error("Invalid partition {} received in ShareAcknowledge
response", tip);
}
- resultHandler.complete(tip, acks, onCommitAsync());
}
/**
@@ -1159,8 +1210,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Acknowledgements acks = incompleteAcknowledgements.get(tip);
if (acks != null) {
acks.complete(Errors.REQUEST_TIMED_OUT.exception());
+ resultHandler.complete(tip, acks, requestType);
}
- resultHandler.complete(tip, acks, onCommitAsync());
}
/**
@@ -1176,7 +1227,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (acks != null) {
acks.complete(errorCode.exception());
}
- resultHandler.complete(tip, acks, onCommitAsync());
+ resultHandler.complete(tip, acks, requestType);
});
acknowledgementsMapToClear.clear();
processingComplete();
@@ -1187,11 +1238,26 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
void processingComplete() {
- inFlightAcknowledgements.clear();
+ // If there are any pending inFlightAcknowledgements after
processing the response, we fail them with an InvalidRecordStateException.
+ processPendingInFlightAcknowledgements(new
InvalidRecordStateException(INVALID_RESPONSE));
resultHandler.completeIfEmpty();
isProcessed = true;
}
+ /**
+ * Fail any existing in-flight acknowledgements with the given
exception and clear the map.
+ * We also send a background event to update {@link
org.apache.kafka.clients.consumer.AcknowledgementCommitCallback }
+ */
+ private void processPendingInFlightAcknowledgements(KafkaException
exception) {
+ if (!inFlightAcknowledgements.isEmpty()) {
+ inFlightAcknowledgements.forEach((partition, acknowledgements)
-> {
+ acknowledgements.complete(exception);
+ resultHandler.complete(partition, acknowledgements,
requestType);
+ });
+ inFlightAcknowledgements.clear();
+ }
+ }
+
/**
* Moves all the in-flight acknowledgements to incomplete
acknowledgements to retry
* in the next request.
@@ -1208,21 +1274,25 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
/**
* Moves the in-flight acknowledgements for a given partition to
incomplete acknowledgements to retry
* in the next request.
+ *
+ * @param tip The TopicIdPartition for which we move the
acknowledgements.
+ * @return True if the partition was sent in the request.
+ * <p> False if the partition was not part of the request, we log an
error and ignore such partitions. </p>
*/
- public void moveToIncompleteAcks(TopicIdPartition tip) {
+ public boolean moveToIncompleteAcks(TopicIdPartition tip) {
Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
incompleteAcknowledgements.put(tip, acks);
+ return true;
+ } else {
+ log.error("Invalid partition {} received in ShareAcknowledge
response", tip);
+ return false;
}
}
- public boolean onClose() {
+ public boolean isCloseRequest() {
return requestType == AcknowledgeRequestType.CLOSE;
}
-
- public boolean onCommitAsync() {
- return requestType == AcknowledgeRequestType.COMMIT_ASYNC;
- }
}
/**
@@ -1251,21 +1321,19 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* Handle the result of a ShareAcknowledge request sent to one or more
nodes and
* signal the completion when all results are known.
*/
- public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, boolean isCommitAsync) {
- if (!isCommitAsync && acknowledgements != null) {
- result.put(partition, acknowledgements);
- }
-
- // For commitAsync, we do not wait for other results to complete,
we prepare a background event
- // for every ShareAcknowledgeResponse.
- // For commitAsync, we send out a background event for every
TopicIdPartition, so we use a singletonMap each time.
- if (isCommitAsync) {
+ public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, AcknowledgeRequestType type) {
+ if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
if (acknowledgements != null) {
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(partition,
acknowledgements));
}
- } else if (remainingResults != null &&
remainingResults.decrementAndGet() == 0) {
- maybeSendShareAcknowledgeCommitCallbackEvent(result);
- future.ifPresent(future -> future.complete(result));
+ } else {
+ if (acknowledgements != null) {
+ result.put(partition, acknowledgements);
+ }
+ if (remainingResults != null &&
remainingResults.decrementAndGet() == 0) {
+ maybeSendShareAcknowledgeCommitCallbackEvent(result);
+ future.ifPresent(future -> future.complete(result));
+ }
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 017491090b0..2ce7c9c5da3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
@@ -567,16 +568,16 @@ public class ShareConsumeRequestManagerTest {
ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
// Passing null acknowledgements should mean we do not send the
background event at all.
- resultHandler.complete(tip0, null, true);
+ resultHandler.complete(tip0, null,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
assertEquals(0, completedAcknowledgements.size());
// Setting isCommitAsync to false should still not send any background
event
// as we have initialized remainingResults to null.
- resultHandler.complete(tip0, acknowledgements, false);
+ resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
// Sending non-null acknowledgements means we do send the background
event
- resultHandler.complete(tip0, acknowledgements, true);
+ resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
}
@@ -599,16 +600,16 @@ public class ShareConsumeRequestManagerTest {
ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
// We only send the background event after all results have been
completed.
- resultHandler.complete(tip0, acknowledgements, false);
+ resultHandler.complete(tip0, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());
- resultHandler.complete(t2ip0, null, false);
+ resultHandler.complete(t2ip0, null,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());
// After third response is received, we send the background event.
- resultHandler.complete(tip1, acknowledgements, false);
+ resultHandler.complete(tip1, acknowledgements,
ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
assertEquals(1, completedAcknowledgements.size());
assertEquals(2, completedAcknowledgements.get(0).size());
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
@@ -1315,6 +1316,87 @@ public class ShareConsumeRequestManagerTest {
assertThrows(NullPointerException.class, (Executable)
shareFetch.records().get(t2p0));
}
+ @Test
+ public void testShareFetchInvalidResponse() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
+ tp -> validLeaderEpoch, topicIds, false));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(t2ip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+ }
+
+ @Test
+ public void testShareAcknowledgeInvalidResponse() throws
InterruptedException {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
+ tp -> validLeaderEpoch, topicIds, false));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ fetchRecords();
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+
+ shareConsumeRequestManager.commitAsync(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)));
+
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ // If a top-level error is received, we still retry the
acknowledgements independent of the topic-partitions received in the response.
+ client.prepareResponse(acknowledgeResponseWithTopLevelError(t2ip0,
Errors.LEADER_NOT_AVAILABLE));
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(1,
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
+
+ TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1,
shareConsumeRequestManager.sendAcknowledgements()));
+
+ client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+
+ // If we do not get the expected partitions in the response, we fail
these acknowledgements with InvalidRecordStateException.
+ assertEquals(InvalidRecordStateException.class,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException().getClass());
+ completedAcknowledgements.clear();
+
+ // Send remaining acknowledgements through piggybacking on the next
fetch.
+ Acknowledgements acknowledgements1 = Acknowledgements.empty();
+ acknowledgements1.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements1.add(3L, AcknowledgeType.REJECT);
+
+ shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements1)));
+
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(t2ip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ // If we do not get the expected partitions in the response, we fail
these acknowledgements with InvalidRecordStateException.
+ assertEquals(InvalidRecordStateException.class,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException().getClass());
+ }
+
@Test
public void testCloseShouldBeIdempotent() {
buildRequestManager();
@@ -2378,6 +2460,12 @@ public class ShareConsumeRequestManagerTest {
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
}
+ private ShareAcknowledgeResponse
acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
+ Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
+ partitionDataForAcknowledge(tp, Errors.NONE));
+ return ShareAcknowledgeResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList());
+ }
+
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition
tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
partitionDataForAcknowledge(tp, error));