lucasbru commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373051849


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -148,28 +167,35 @@ Queue<OffsetCommitRequestState> 
unsentOffsetCommitRequests() {
         return pendingRequests.unsentOffsetCommits;
     }
 
+    private List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
+        return pendingRequests.unsentOffsetFetches;
+    }
+
     // Visible for testing
     CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> allConsumedOffsets) {
         log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
-        return this.addOffsetCommitRequest(allConsumedOffsets)
-                .whenComplete((response, throwable) -> {
-                    this.autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
+        return 
addOffsetCommitRequest(allConsumedOffsets).future().whenComplete((response, 
throwable) -> {
+            autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
+            if (throwable == null) {
+                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+            } else {
+                if (throwable instanceof RetriableCommitFailedException) {

Review Comment:
   nit: could become `else if` and remove one identation level.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -127,7 +146,7 @@ public void maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets
      * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
      * {@link OffsetCommitRequestState} and enqueue it to send later.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+    public OffsetCommitRequestState addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   You seem to expose the whole internal `OffsetCommitRequestState` here. It 
took me a minute to track all the references to realize that you are only 
exposing this for testing? 
   
   If so, I'd suggest to either avoid exposing internal state for unit testing 
(that's a smell), or at least introduce a separate method that is 
package-private and clearly marked as "visible for testing".



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {
+                        log.error("Unexpected error when completing offset 
commit: {}", this, t);
+                        future.completeExceptionally(t);
                     }
                 });
+            return resp;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+
+        public void onError(final Throwable exception, final long 
currentTimeMs) {
+            if (exception instanceof RetriableException) {
+                handleCoordinatorDisconnect(exception, currentTimeMs);
+                retry(currentTimeMs);
+            }
+        }
+
+        public void onResponse(final ClientResponse response) {
+            long responseTime = response.receivedTimeMs();
+            OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+            Set<String> unauthorizedTopics = new HashSet<>();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                    long offset = offsetAndMetadata.offset();
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+                        continue;
+                    }
+
+                    if (error.exception() instanceof RetriableException) {
+                        log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+                            error.message());
+                    } else {
+                        log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+                    }
+
+                    if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+                        return;
+                    }
+                }
+            }
+
+            if (!unauthorizedTopics.isEmpty()) {
+                log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else {
+                future.complete(null);
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            System.out.println("timeout" + currentTimeMs);
+            onFailedAttempt(currentTimeMs);
+            pendingRequests.addOffsetCommitRequest(this);
+        }
+
+        private boolean continueHandlePartitionErrors(Errors error, 
TopicPartition tp, long offset,

Review Comment:
   offset parameter not used



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -226,25 +238,115 @@ public void 
testOffsetFetchRequest_EnsureDuplicatedRequestSucceed() {
     }
 
     @ParameterizedTest
-    @MethodSource("exceptionSupplier")
+    @MethodSource("offsetFetchExceptionSupplier")
     public void testOffsetFetchRequest_ErroredRequests(final Errors error, 
final boolean isRetriable) {
         CommitRequestManager commitRequestManger = create(true, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedRequests(
-                commitRequestManger,
-                partitions,
-                5,
-                error);
+        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+            commitRequestManger,
+            partitions,
+            5,
+            error);
         // we only want to make sure to purge the outbound buffer for 
non-retriables, so retriable will be re-queued.
         if (isRetriable)
             testRetriable(commitRequestManger, futures);
         else {
             testNonRetriable(futures);
             assertEmptyPendingRequests(commitRequestManger);
         }
+
+        assertCoordinatorDisconnect(error);
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetCommitExceptionSupplier")
+    public void testOffsetCommitRequest_ErroredRequests(final Errors error, 
final boolean isRetriable) {
+        CommitRequestManager commitRequestManger = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
+            new OffsetAndMetadata(0));
+
+        CommitRequestManager.OffsetCommitRequestState requestState = 
sendAndVerifyOffsetCommitRequests(
+            commitRequestManger,
+            offsets,
+            error);
+
+        assertExceptionHandling(commitRequestManger, requestState, error);
+        assertCoordinatorDisconnect(error);
+    }
+
+    @Test
+    public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
+        CommitRequestManager commitRequestManger = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
+            new OffsetAndMetadata(0));
+
+        commitRequestManger.addOffsetCommitRequest(offsets);
+        NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+        res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new 
TimeoutException());
+
+        assertTrue(commitRequestManger.pendingRequests.hasUnsentRequests());
+        assertEquals(1, 
commitRequestManger.unsentOffsetCommitRequests().size());
+        long retryBackoffMs = 
commitRequestManger.unsentOffsetCommitRequests().peek().remainingBackoffMs(time.milliseconds());
+        assertRetryBackOff(commitRequestManger, retryBackoffMs);
+    }
+
+    private void assertCoordinatorDisconnect(final Errors error) {
+        if (error.exception() instanceof DisconnectException) {
+            verify(coordinatorRequestManager).markCoordinatorUnknown(any(), 
any());
+        }
+    }
+
+    private void assertExceptionHandling(CommitRequestManager 
commitRequestManger, CommitRequestManager.OffsetCommitRequestState 
requestState, Errors errors) {
+        long remainBackoffMs = 
requestState.remainingBackoffMs(time.milliseconds());
+        switch (errors) {
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+            case REQUEST_TIMED_OUT:
+                
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
+                assertPollDoesNotReturn(commitRequestManger, remainBackoffMs);
+                break;
+            case UNKNOWN_TOPIC_OR_PARTITION:
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                assertRetryBackOff(commitRequestManger, remainBackoffMs);
+                break;
+            case GROUP_AUTHORIZATION_FAILED:
+                // failed
+                break;
+            case TOPIC_AUTHORIZATION_FAILED:
+            case OFFSET_METADATA_TOO_LARGE:
+            case INVALID_COMMIT_OFFSET_SIZE:
+                // fail
+                // TODO: maybe we should retry
+                assertPollDoesNotReturn(commitRequestManger, Long.MAX_VALUE);
+                break;
+            case FENCED_INSTANCE_ID:
+                // what should we do

Review Comment:
   Maybe make this a TODO if it's still unclear what to do. But then that 
should go into the main source code, not the test, right? Similar above.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {
+                        log.error("Unexpected error when completing offset 
commit: {}", this, t);
+                        future.completeExceptionally(t);
                     }
                 });
+            return resp;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+
+        public void onError(final Throwable exception, final long 
currentTimeMs) {
+            if (exception instanceof RetriableException) {
+                handleCoordinatorDisconnect(exception, currentTimeMs);
+                retry(currentTimeMs);
+            }
+        }
+
+        public void onResponse(final ClientResponse response) {
+            long responseTime = response.receivedTimeMs();
+            OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+            Set<String> unauthorizedTopics = new HashSet<>();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                    long offset = offsetAndMetadata.offset();
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+                        continue;
+                    }
+
+                    if (error.exception() instanceof RetriableException) {
+                        log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+                            error.message());
+                    } else {
+                        log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+                    }
+
+                    if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+                        return;
+                    }
+                }
+            }
+
+            if (!unauthorizedTopics.isEmpty()) {
+                log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else {
+                future.complete(null);
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            System.out.println("timeout" + currentTimeMs);

Review Comment:
   I don't think we want to commit printlns?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {
+                        log.error("Unexpected error when completing offset 
commit: {}", this, t);
+                        future.completeExceptionally(t);
                     }
                 });
+            return resp;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+
+        public void onError(final Throwable exception, final long 
currentTimeMs) {
+            if (exception instanceof RetriableException) {
+                handleCoordinatorDisconnect(exception, currentTimeMs);
+                retry(currentTimeMs);
+            }
+        }
+
+        public void onResponse(final ClientResponse response) {
+            long responseTime = response.receivedTimeMs();
+            OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+            Set<String> unauthorizedTopics = new HashSet<>();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                    long offset = offsetAndMetadata.offset();
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+                        continue;
+                    }
+
+                    if (error.exception() instanceof RetriableException) {
+                        log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+                            error.message());
+                    } else {
+                        log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+                    }
+
+                    if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+                        return;
+                    }
+                }
+            }
+
+            if (!unauthorizedTopics.isEmpty()) {
+                log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else {
+                future.complete(null);
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            System.out.println("timeout" + currentTimeMs);
+            onFailedAttempt(currentTimeMs);
+            pendingRequests.addOffsetCommitRequest(this);
+        }
+
+        private boolean continueHandlePartitionErrors(Errors error, 
TopicPartition tp, long offset,
+                                                      Set<String> 
unauthorizedTopics, long responseTime) {
+            switch (error) {
+                case GROUP_AUTHORIZATION_FAILED:
+                    
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+                    return false;
+                case TOPIC_AUTHORIZATION_FAILED:
+                    // Collect all unauthorized topics before failing
+                    unauthorizedTopics.add(tp.topic());
+                    return true;
+                case OFFSET_METADATA_TOO_LARGE:
+                case INVALID_COMMIT_OFFSET_SIZE:
+                    future.completeExceptionally(error.exception());
+                    return false;
+                case COORDINATOR_LOAD_IN_PROGRESS:
+                case UNKNOWN_TOPIC_OR_PARTITION:
+                    retry(responseTime);
+                    return false;
+                case COORDINATOR_NOT_AVAILABLE:
+                case NOT_COORDINATOR:
+                case REQUEST_TIMED_OUT:

Review Comment:
   Could I also handle these in a branch `error.exception() instanceof 
RetriableException`, like in `onError`? Haven't checked, but if possible that 
would be clearer. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -382,9 +522,49 @@ private ClientResponse buildOffsetFetchClientResponse(
         return buildOffsetFetchClientResponse(request, topicPartitionData, 
error);
     }
 
+    private ClientResponse buildOffsetCommitClientResponse(final 
OffsetCommitResponse commitResponse,
+                                                           final Errors error) 
{
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        OffsetCommitResponse response = new OffsetCommitResponse(data);
+        short apiVersion = 1;
+        return new ClientResponse(
+            new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1),
+            null,
+            "-1",
+            time.milliseconds(),
+            time.milliseconds(),
+            false,
+            null,
+            null,
+            commitResponse
+        );
+    }
+
+    public ClientResponse mockOffsetCommitResponse(String topic, int 
partition, short apiKeyVersion, Errors error) {

Review Comment:
   Out of interest, why not use mockito here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -148,28 +167,35 @@ Queue<OffsetCommitRequestState> 
unsentOffsetCommitRequests() {
         return pendingRequests.unsentOffsetCommits;
     }
 
+    private List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
+        return pendingRequests.unsentOffsetFetches;
+    }
+
     // Visible for testing
     CompletableFuture<Void> sendAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> allConsumedOffsets) {
         log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets);
-        return this.addOffsetCommitRequest(allConsumedOffsets)
-                .whenComplete((response, throwable) -> {
-                    this.autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
+        return 
addOffsetCommitRequest(allConsumedOffsets).future().whenComplete((response, 
throwable) -> {
+            autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
+            if (throwable == null) {
+                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+            } else {
+                if (throwable instanceof RetriableCommitFailedException) {
+                    log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error: {}",
+                        allConsumedOffsets, throwable.getMessage());
+                } else {
+                    log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                }
+            }
+        });
+    }
 
-                    if (throwable == null) {
-                        log.debug("Completed asynchronous auto-commit of 
offsets {}", allConsumedOffsets);
-                    }
-                })
-                .exceptionally(t -> {
-                    if (t instanceof RetriableCommitFailedException) {
-                        log.debug("Asynchronous auto-commit of offsets {} 
failed due to retriable error: {}", allConsumedOffsets, t.getMessage());
-                    } else {
-                        log.warn("Asynchronous auto-commit of offsets {} 
failed", allConsumedOffsets, t);
-                    }
-                    return null;
-                });
+    private void handleCoordinatorDisconnect(Throwable exception, long 
currentTimeMs) {
+        if (exception instanceof DisconnectException) {
+            
coordinatorRequestManager.markCoordinatorUnknown(exception.getMessage(), 
currentTimeMs);
+        }
     }
 
-    private class OffsetCommitRequestState {
+    public class OffsetCommitRequestState extends RequestState {

Review Comment:
   This could remain package private



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {
+                        log.error("Unexpected error when completing offset 
commit: {}", this, t);
+                        future.completeExceptionally(t);
                     }
                 });
+            return resp;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+
+        public void onError(final Throwable exception, final long 
currentTimeMs) {
+            if (exception instanceof RetriableException) {
+                handleCoordinatorDisconnect(exception, currentTimeMs);
+                retry(currentTimeMs);
+            }
+        }
+
+        public void onResponse(final ClientResponse response) {
+            long responseTime = response.receivedTimeMs();
+            OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+            Set<String> unauthorizedTopics = new HashSet<>();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                    long offset = offsetAndMetadata.offset();
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+                        continue;
+                    }
+
+                    if (error.exception() instanceof RetriableException) {
+                        log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+                            error.message());

Review Comment:
   nit: inconsistent code style with the else branch (wrapping)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {
+                        log.error("Unexpected error when completing offset 
commit: {}", this, t);
+                        future.completeExceptionally(t);
                     }
                 });
+            return resp;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+
+        public void onError(final Throwable exception, final long 
currentTimeMs) {
+            if (exception instanceof RetriableException) {
+                handleCoordinatorDisconnect(exception, currentTimeMs);
+                retry(currentTimeMs);
+            }
+        }
+
+        public void onResponse(final ClientResponse response) {
+            long responseTime = response.receivedTimeMs();
+            OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+            Set<String> unauthorizedTopics = new HashSet<>();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                    long offset = offsetAndMetadata.offset();
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+                        continue;
+                    }
+
+                    if (error.exception() instanceof RetriableException) {
+                        log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+                            error.message());
+                    } else {
+                        log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+                    }
+
+                    if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+                        return;
+                    }
+                }
+            }
+
+            if (!unauthorizedTopics.isEmpty()) {
+                log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else {
+                future.complete(null);
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            System.out.println("timeout" + currentTimeMs);
+            onFailedAttempt(currentTimeMs);
+            pendingRequests.addOffsetCommitRequest(this);
+        }
+
+        private boolean continueHandlePartitionErrors(Errors error, 
TopicPartition tp, long offset,
+                                                      Set<String> 
unauthorizedTopics, long responseTime) {
+            switch (error) {
+                case GROUP_AUTHORIZATION_FAILED:
+                    
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+                    return false;
+                case TOPIC_AUTHORIZATION_FAILED:
+                    // Collect all unauthorized topics before failing
+                    unauthorizedTopics.add(tp.topic());
+                    return true;

Review Comment:
   I think the logic around this method would almost be clearer if you had just 
handled the `TOPIC_AUTHORIZATION_FAILED` error in `onError` (and keep 
unauthorizedTopics there) and make clear that everything that enters this 
switch will either retry or completeExceptionally and return. Just an idea, you 
can pick.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +227,113 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            // TODO: KAFKA-15592
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {

Review Comment:
   I'd say it's good defensive programming, even if it seems unlikely that 
onResponse or onError will throw. The exception would otherwise be swallowed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to