This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92672d1df81 KAFKA-17470: CommitRequestManager should record failed 
request only once even if multiple errors in response (#17109)
92672d1df81 is described below

commit 92672d1df81cd608f9c360260e022c07e41d2ceb
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Sep 10 03:52:32 2024 +0800

    KAFKA-17470: CommitRequestManager should record failed request only once 
even if multiple errors in response (#17109)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   | 16 ++++-
 .../internals/CommitRequestManagerTest.java        | 83 +++++++++++++++++++++-
 2 files changed, 93 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index e435b3acd5c..35f3cf1d861 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -616,7 +616,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
     }
 
-    private class OffsetCommitRequestState extends RetriableRequestState {
+    class OffsetCommitRequestState extends RetriableRequestState {
         private Map<TopicPartition, OffsetAndMetadata> offsets;
         private final String groupId;
         private final Optional<String> groupInstanceId;
@@ -711,6 +711,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             long currentTimeMs = response.receivedTimeMs();
             OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
             Set<String> unauthorizedTopics = new HashSet<>();
+            boolean failedRequestRegistered = false;
             for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
                 for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
                     TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
@@ -723,7 +724,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                         continue;
                     }
 
-                    onFailedAttempt(currentTimeMs);
+                    if (!failedRequestRegistered) {
+                        onFailedAttempt(currentTimeMs);
+                        failedRequestRegistered = true;
+                    }
+
                     if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                         
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
                         return;
@@ -1042,14 +1047,19 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                     response.partitionDataMap(groupId);
             Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
             Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            boolean failedRequestRegistered = false;
             for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
                 TopicPartition tp = entry.getKey();
                 OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
                 if (partitionData.hasError()) {
-                    onFailedAttempt(currentTimeMs);
                     Errors error = partitionData.error;
                     log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
 
+                    if (!failedRequestRegistered) {
+                        onFailedAttempt(currentTimeMs);
+                        failedRequestRegistered = true;
+                    }
+
                     if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does not exist"));
                         return;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 3c7347e6d02..f976d69c195 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -79,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -900,6 +901,33 @@ public class CommitRequestManagerTest {
         assertFutureThrows(commitResult, RetriableCommitFailedException.class);
     }
 
+    @ParameterizedTest
+    @MethodSource("offsetCommitExceptionSupplier")
+    public void 
testOffsetCommitSingleFailedAttemptPerRequestWhenPartitionErrors(final Errors 
error) {
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(1));
+        offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
+        offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
+
+        commitRequestManager.commitSync(offsets, time.milliseconds() + 
defaultApiTimeoutMs);
+        NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+
+        
res.unsentRequests.get(0).handler().onComplete(mockOffsetCommitResponse("topic",
 (short) 1, error, offsets.size()));
+        CommitRequestManager.OffsetCommitRequestState commitRequest = 
commitRequestManager.pendingRequests.unsentOffsetCommits.peek();
+        if (error.exception() instanceof RetriableException) {
+            assertNotNull(commitRequest);
+            assertEquals(1, commitRequest.numAttempts, "Only one failed 
attempt should be registered, even if the response contains multiple partition 
errors");
+            time.sleep(retryBackoffMs);
+            res = commitRequestManager.poll(time.milliseconds());
+            
res.unsentRequests.get(0).handler().onComplete(mockOffsetCommitResponse("topic",
 (short) 1, error, offsets.size()));
+            assertEquals(2, commitRequest.numAttempts, "Only one failed 
attempt should be registered, even if the response contains multiple partition 
errors");
+        } else assertNull(commitRequest);
+    }
+
     @Test
     public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
         CommitRequestManager commitRequestManager = create(true, 100);
@@ -1205,6 +1233,8 @@ public class CommitRequestManagerTest {
                                final Errors error
     ) {
         futures.forEach(f -> assertFalse(f.isDone()));
+        assertEquals(1, 
commitRequestManager.pendingRequests.unsentOffsetFetches.get(0).numAttempts,
+                "Only one failed attempt should be registered, even if the 
response contains multiple partition errors");
 
         // The manager should backoff before retry
         time.sleep(retryBackoffMs);
@@ -1212,6 +1242,8 @@ public class CommitRequestManagerTest {
         assertEquals(1, poll.unsentRequests.size());
         futures.forEach(f -> assertFalse(f.isDone()));
         mimicResponse(error, poll);
+        assertEquals(2, 
commitRequestManager.pendingRequests.unsentOffsetFetches.get(0).numAttempts,
+                "Only one failed attempt should be registered, even if the 
response contains multiple partition errors");
 
         // Sleep util timeout
         time.sleep(defaultApiTimeoutMs);
@@ -1294,8 +1326,10 @@ public class CommitRequestManagerTest {
         Set<TopicPartition> partitions = new HashSet<>();
         TopicPartition tp1 = new TopicPartition("t1", 2);
         TopicPartition tp2 = new TopicPartition("t2", 3);
+        TopicPartition tp3 = new TopicPartition("t3", 4);
         partitions.add(tp1);
         partitions.add(tp2);
+        partitions.add(tp3);
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
                 commitRequestManager.fetchOffsets(partitions, deadlineMs);
@@ -1307,6 +1341,7 @@ public class CommitRequestManagerTest {
         HashMap<TopicPartition, OffsetFetchResponse.PartitionData> 
topicPartitionData = new HashMap<>();
         topicPartitionData.put(tp1, new 
OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
         topicPartitionData.put(tp2, new 
OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", 
Errors.NONE));
+        topicPartitionData.put(tp3, new 
OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
 
         
res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(
                 res.unsentRequests.get(0),
@@ -1464,13 +1499,14 @@ public class CommitRequestManagerTest {
     }
 
 
-    public ClientResponse mockOffsetCommitResponse(String topic,
+    private ClientResponse mockOffsetCommitResponse(String topic,
                                                    int partition,
                                                    short apiKeyVersion,
                                                    Errors error) {
         return mockOffsetCommitResponse(topic, partition, apiKeyVersion, 
time.milliseconds(), time.milliseconds(), error);
     }
-    public ClientResponse mockOffsetCommitResponse(String topic,
+
+    private ClientResponse mockOffsetCommitResponse(String topic,
                                                    int partition,
                                                    short apiKeyVersion,
                                                    long createdTimeMs,
@@ -1499,7 +1535,40 @@ public class CommitRequestManagerTest {
         );
     }
 
-    public ClientResponse mockOffsetCommitResponseDisconnected(String topic, 
int partition,
+    private ClientResponse mockOffsetCommitResponse(String topic,
+                                                    short apiKeyVersion,
+                                                    Errors error,
+                                                    int partitionSize) {
+        return mockOffsetCommitResponse(topic, apiKeyVersion, 
time.milliseconds(), time.milliseconds(), error, partitionSize);
+    }
+
+    private ClientResponse mockOffsetCommitResponse(String topic,
+                                                    short apiKeyVersion,
+                                                    long createdTimeMs,
+                                                    long receivedTimeMs,
+                                                    Errors error,
+                                                    int partitionSize) {
+        OffsetCommitResponseData responseData = new OffsetCommitResponseData()
+                .setTopics(Collections.singletonList(
+                        new 
OffsetCommitResponseData.OffsetCommitResponseTopic()
+                                .setName(topic)
+                                
.setPartitions(mockOffsetCommitResponseWithPartitionErrors(error, 
partitionSize))));
+        OffsetCommitResponse response = mock(OffsetCommitResponse.class);
+        when(response.data()).thenReturn(responseData);
+        return new ClientResponse(
+                new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1),
+                null,
+                "-1",
+                createdTimeMs,
+                receivedTimeMs,
+                false,
+                null,
+                null,
+                new OffsetCommitResponse(responseData)
+        );
+    }
+
+    private ClientResponse mockOffsetCommitResponseDisconnected(String topic, 
int partition,
                                                                short 
apiKeyVersion,
                                                                
NetworkClientDelegate.UnsentRequest unsentRequest) {
         OffsetCommitResponseData responseData = new OffsetCommitResponseData()
@@ -1553,4 +1622,12 @@ public class CommitRequestManagerTest {
             name,
             CONSUMER_COORDINATOR_METRICS));
     }
+
+    private List<OffsetCommitResponseData.OffsetCommitResponsePartition> 
mockOffsetCommitResponseWithPartitionErrors(Errors error, int partitionSize) {
+        List<OffsetCommitResponseData.OffsetCommitResponsePartition> 
partitions = new ArrayList<>(partitionSize);
+        for (int i = 0; i < partitionSize; i++) {
+            partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(error.code()).setPartitionIndex(i));
+        }
+        return partitions;
+    }
 }

Reply via email to