This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8637b6a0ffb KAFKA-17711: Minor cleanup changes in
ShareConsumeRequestManager (#17392)
8637b6a0ffb is described below
commit 8637b6a0ffb04d539c7d681753e4c49d5b04e21a
Author: ShivsundarR <[email protected]>
AuthorDate: Mon Oct 7 12:33:48 2024 -0400
KAFKA-17711: Minor cleanup changes in ShareConsumeRequestManager (#17392)
What
Minor cleanup and javadoc changes in ShareConsumeRequestManager.
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 63 +++++++++++++---------
1 file changed, 39 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index b7f77027b2c..1ca5f02a7d7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -208,6 +208,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
fetchMoreRecords = true;
}
+ // The acknowledgements sent via ShareFetch are stored in this map.
acknowledgementsMap.forEach((tip, acks) ->
fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge));
}
@@ -220,38 +221,38 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
*/
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
- AtomicBoolean isAsyncDone = new AtomicBoolean();
+ AtomicBoolean isAsyncSent = new AtomicBoolean();
for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates
: acknowledgeRequestStates.entrySet()) {
int nodeId = requestStates.getKey();
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous
request to {} has not been processed, so acks are not sent", nodeId);
} else {
- isAsyncDone.set(false);
- // For commitAsync
- maybeBuildRequest(requestStates.getValue().getAsyncRequest(),
currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
+ isAsyncSent.set(false);
+ // First, the acknowledgements from commitAsync is sent.
+ maybeBuildRequest(requestStates.getValue().getAsyncRequest(),
currentTimeMs, true, isAsyncSent).ifPresent(unsentRequests::add);
+
// Check to ensure we start processing commitSync/close only
if there are no commitAsync requests left to process.
- if (isAsyncDone.get()) {
+ if (isAsyncSent.get()) {
+ if (!isNodeFree(nodeId)) {
+ log.trace("Skipping acknowledge request because
previous request to {} has not been processed, so acks are not sent", nodeId);
+ continue;
+ }
+
// We try to process the close request only if we have
processed the async and the sync requests for the node.
if (requestStates.getValue().getSyncRequestQueue() ==
null) {
- if (!isNodeFree(nodeId)) {
- log.trace("Skipping acknowledge request because
previous request to {} has not been processed, so acks are not sent", nodeId);
- } else {
- AcknowledgeRequestState closeRequestState =
requestStates.getValue().getCloseRequest();
+ AcknowledgeRequestState closeRequestState =
requestStates.getValue().getCloseRequest();
- maybeBuildRequest(closeRequestState,
currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
- }
+ maybeBuildRequest(closeRequestState, currentTimeMs,
false, isAsyncSent).ifPresent(unsentRequests::add);
} else {
+ // Processing the acknowledgements from commitSync
for (AcknowledgeRequestState acknowledgeRequestState :
requestStates.getValue().getSyncRequestQueue()) {
- if (!isNodeFree(nodeId)) {
- log.trace("Skipping acknowledge request
because previous request to {} has not been processed, so acks are not sent",
nodeId);
- break;
- }
- maybeBuildRequest(acknowledgeRequestState,
currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
+ maybeBuildRequest(acknowledgeRequestState,
currentTimeMs, false, isAsyncSent).ifPresent(unsentRequests::add);
}
}
}
}
+
}
PollResult pollResult = null;
@@ -284,11 +285,20 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
}
+ /**
+ *
+ * @param acknowledgeRequestState Contains the acknowledgements to be sent.
+ * @param currentTimeMs The current time in ms.
+ * @param onCommitAsync Boolean to denote if the acknowledgements came
from a commitAsync or not.
+ * @param isAsyncSent Boolean to indicate if the async request has been
sent.
+ *
+ * @return Returns the request if it was built.
+ */
private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
long currentTimeMs,
boolean onCommitAsync,
- AtomicBoolean
isAsyncDone) {
- boolean asyncDone = true;
+ AtomicBoolean
isAsyncSent) {
+ boolean asyncSent = true;
try {
if (acknowledgeRequestState == null ||
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
return Optional.empty();
@@ -306,13 +316,13 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
// We wait for the backoff before we can send this request.
- asyncDone = false;
+ asyncSent = false;
return Optional.empty();
}
UnsentRequest request = acknowledgeRequestState.buildRequest();
if (request == null) {
- asyncDone = false;
+ asyncSent = false;
return Optional.empty();
}
@@ -320,14 +330,15 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
return Optional.of(request);
} finally {
if (onCommitAsync) {
- isAsyncDone.set(asyncDone);
+ isAsyncSent.set(asyncSent);
}
}
}
/**
- * Prunes the empty acknowledgementRequestStates.
- * Returns true if there are still any acknowledgements left to be
processed.
+ * Prunes the empty acknowledgementRequestStates in {@link
#acknowledgeRequestStates}
+ *
+ * @return Returns true if there are still any acknowledgements left to be
processed.
*/
private boolean checkAndRemoveCompletedAcknowledgements() {
boolean areAnyAcksLeft = false;
@@ -340,10 +351,12 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
areAsyncAcksLeft = false;
}
+
if
(!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue()))
{
acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue();
areSyncAcksLeft = false;
}
+
if
(!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest()))
{
acknowledgeRequestStatePair.getValue().setCloseRequest(null);
}
@@ -879,8 +892,10 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
private final AcknowledgeRequestType requestType;
/**
- * Boolean to indicate if the request has been processed,
+ * Boolean to indicate if the request has been processed.
+ * <p>
* Set to true once we process the response and do not retry the
request.
+ * <p>
* Initialized to false every time we build a request.
*/
private boolean isProcessed;