philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373585188
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
- return new NetworkClientDelegate.UnsentRequest(
+ NetworkClientDelegate.UnsentRequest resp = new
NetworkClientDelegate.UnsentRequest(
builder,
- coordinatorRequestManager.coordinator(),
+ coordinatorRequestManager.coordinator());
+ resp.future().whenComplete(
(response, throwable) -> {
- if (throwable == null) {
- future.complete(null);
- } else {
- future.completeExceptionally(throwable);
+ try {
+ if (throwable == null) {
+ onResponse(response);
+ } else {
+ onError(throwable,
resp.handler().completionTimeMs());
+ }
+ } catch (Throwable t) {
+ log.error("Unexpected error when completing offset
commit: {}", this, t);
+ future.completeExceptionally(t);
}
});
+ return resp;
+ }
+
+ public CompletableFuture<Void> future() {
+ return future;
+ }
+
+ public void onError(final Throwable exception, final long
currentTimeMs) {
+ if (exception instanceof RetriableException) {
+ handleCoordinatorDisconnect(exception, currentTimeMs);
+ retry(currentTimeMs);
+ }
+ }
+
+ public void onResponse(final ClientResponse response) {
+ long responseTime = response.receivedTimeMs();
+ OffsetCommitResponse commitResponse = (OffsetCommitResponse)
response.responseBody();
+ Set<String> unauthorizedTopics = new HashSet<>();
+ for (OffsetCommitResponseData.OffsetCommitResponseTopic topic :
commitResponse.data().topics()) {
+ for (OffsetCommitResponseData.OffsetCommitResponsePartition
partition : topic.partitions()) {
+ TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
+ OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+ long offset = offsetAndMetadata.offset();
+ Errors error = Errors.forCode(partition.errorCode());
+ if (error == Errors.NONE) {
+ log.debug("OffsetCommit {} for partition {}", offset,
tp);
+ continue;
+ }
+
+ if (error.exception() instanceof RetriableException) {
+ log.warn("OffsetCommit failed on partition {} at
offset {}: {}", tp, offset,
+ error.message());
+ } else {
+ log.error("OffsetCommit failed on partition {} at
offset {}: {}", tp, offset, error.message());
+ }
+
+ if (!continueHandlePartitionErrors(error, tp, offset,
unauthorizedTopics, responseTime)) {
+ return;
+ }
+ }
+ }
+
+ if (!unauthorizedTopics.isEmpty()) {
+ log.error("OffsetCommit failed due to not authorized to commit
to topics {}", unauthorizedTopics);
+ future.completeExceptionally(new
TopicAuthorizationException(unauthorizedTopics));
+ } else {
+ future.complete(null);
+ }
+ }
+
+ private void retry(final long currentTimeMs) {
+ System.out.println("timeout" + currentTimeMs);
+ onFailedAttempt(currentTimeMs);
+ pendingRequests.addOffsetCommitRequest(this);
+ }
+
+ private boolean continueHandlePartitionErrors(Errors error,
TopicPartition tp, long offset,
+ Set<String>
unauthorizedTopics, long responseTime) {
+ switch (error) {
+ case GROUP_AUTHORIZATION_FAILED:
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+ return false;
+ case TOPIC_AUTHORIZATION_FAILED:
+ // Collect all unauthorized topics before failing
+ unauthorizedTopics.add(tp.topic());
+ return true;
+ case OFFSET_METADATA_TOO_LARGE:
+ case INVALID_COMMIT_OFFSET_SIZE:
+ future.completeExceptionally(error.exception());
+ return false;
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ retry(responseTime);
+ return false;
+ case COORDINATOR_NOT_AVAILABLE:
+ case NOT_COORDINATOR:
+ case REQUEST_TIMED_OUT:
Review Comment:
I wanted to keep all error handling in one place and explicitly specify the
action for these errors, because sometimes, when debugging, we want to know
what the consumer does when encountering a specific error. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]