This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92672d1df81 KAFKA-17470: CommitRequestManager should record failed
request only once even if multiple errors in response (#17109)
92672d1df81 is described below
commit 92672d1df81cd608f9c360260e022c07e41d2ceb
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Sep 10 03:52:32 2024 +0800
KAFKA-17470: CommitRequestManager should record failed request only once
even if multiple errors in response (#17109)
Reviewers: Lianet Magrans <[email protected]>
---
.../consumer/internals/CommitRequestManager.java | 16 ++++-
.../internals/CommitRequestManagerTest.java | 83 +++++++++++++++++++++-
2 files changed, 93 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index e435b3acd5c..35f3cf1d861 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -616,7 +616,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
}
- private class OffsetCommitRequestState extends RetriableRequestState {
+ class OffsetCommitRequestState extends RetriableRequestState {
private Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;
private final Optional<String> groupInstanceId;
@@ -711,6 +711,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
long currentTimeMs = response.receivedTimeMs();
OffsetCommitResponse commitResponse = (OffsetCommitResponse)
response.responseBody();
Set<String> unauthorizedTopics = new HashSet<>();
+ boolean failedRequestRegistered = false;
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic :
commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition
partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
@@ -723,7 +724,11 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
continue;
}
- onFailedAttempt(currentTimeMs);
+ if (!failedRequestRegistered) {
+ onFailedAttempt(currentTimeMs);
+ failedRequestRegistered = true;
+ }
+
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
return;
@@ -1042,14 +1047,19 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
response.partitionDataMap(groupId);
Map<TopicPartition, OffsetAndMetadata> offsets = new
HashMap<>(responseData.size());
Set<TopicPartition> unstableTxnOffsetTopicPartitions = new
HashSet<>();
+ boolean failedRequestRegistered = false;
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData>
entry : responseData.entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData partitionData =
entry.getValue();
if (partitionData.hasError()) {
- onFailedAttempt(currentTimeMs);
Errors error = partitionData.error;
log.debug("Failed to fetch offset for partition {}: {}",
tp, error.message());
+ if (!failedRequestRegistered) {
+ onFailedAttempt(currentTimeMs);
+ failedRequestRegistered = true;
+ }
+
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.completeExceptionally(new KafkaException("Topic
or Partition " + tp + " does not exist"));
return;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 3c7347e6d02..f976d69c195 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -79,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -900,6 +901,33 @@ public class CommitRequestManagerTest {
assertFutureThrows(commitResult, RetriableCommitFailedException.class);
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitExceptionSupplier")
+ public void
testOffsetCommitSingleFailedAttemptPerRequestWhenPartitionErrors(final Errors
error) {
+ CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(1));
+ offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
+ offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
+
+ commitRequestManager.commitSync(offsets, time.milliseconds() +
defaultApiTimeoutMs);
+ NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
+ assertEquals(1, res.unsentRequests.size());
+
+
res.unsentRequests.get(0).handler().onComplete(mockOffsetCommitResponse("topic",
(short) 1, error, offsets.size()));
+ CommitRequestManager.OffsetCommitRequestState commitRequest =
commitRequestManager.pendingRequests.unsentOffsetCommits.peek();
+ if (error.exception() instanceof RetriableException) {
+ assertNotNull(commitRequest);
+ assertEquals(1, commitRequest.numAttempts, "Only one failed
attempt should be registered, even if the response contains multiple partition
errors");
+ time.sleep(retryBackoffMs);
+ res = commitRequestManager.poll(time.milliseconds());
+
res.unsentRequests.get(0).handler().onComplete(mockOffsetCommitResponse("topic",
(short) 1, error, offsets.size()));
+ assertEquals(2, commitRequest.numAttempts, "Only one failed
attempt should be registered, even if the response contains multiple partition
errors");
+ } else assertNull(commitRequest);
+ }
+
@Test
public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
CommitRequestManager commitRequestManager = create(true, 100);
@@ -1205,6 +1233,8 @@ public class CommitRequestManagerTest {
final Errors error
) {
futures.forEach(f -> assertFalse(f.isDone()));
+ assertEquals(1,
commitRequestManager.pendingRequests.unsentOffsetFetches.get(0).numAttempts,
+ "Only one failed attempt should be registered, even if the
response contains multiple partition errors");
// The manager should backoff before retry
time.sleep(retryBackoffMs);
@@ -1212,6 +1242,8 @@ public class CommitRequestManagerTest {
assertEquals(1, poll.unsentRequests.size());
futures.forEach(f -> assertFalse(f.isDone()));
mimicResponse(error, poll);
+ assertEquals(2,
commitRequestManager.pendingRequests.unsentOffsetFetches.get(0).numAttempts,
+ "Only one failed attempt should be registered, even if the
response contains multiple partition errors");
// Sleep util timeout
time.sleep(defaultApiTimeoutMs);
@@ -1294,8 +1326,10 @@ public class CommitRequestManagerTest {
Set<TopicPartition> partitions = new HashSet<>();
TopicPartition tp1 = new TopicPartition("t1", 2);
TopicPartition tp2 = new TopicPartition("t2", 3);
+ TopicPartition tp3 = new TopicPartition("t3", 4);
partitions.add(tp1);
partitions.add(tp2);
+ partitions.add(tp3);
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
@@ -1307,6 +1341,7 @@ public class CommitRequestManagerTest {
HashMap<TopicPartition, OffsetFetchResponse.PartitionData>
topicPartitionData = new HashMap<>();
topicPartitionData.put(tp1, new
OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
topicPartitionData.put(tp2, new
OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata",
Errors.NONE));
+ topicPartitionData.put(tp3, new
OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(
res.unsentRequests.get(0),
@@ -1464,13 +1499,14 @@ public class CommitRequestManagerTest {
}
- public ClientResponse mockOffsetCommitResponse(String topic,
+ private ClientResponse mockOffsetCommitResponse(String topic,
int partition,
short apiKeyVersion,
Errors error) {
return mockOffsetCommitResponse(topic, partition, apiKeyVersion,
time.milliseconds(), time.milliseconds(), error);
}
- public ClientResponse mockOffsetCommitResponse(String topic,
+
+ private ClientResponse mockOffsetCommitResponse(String topic,
int partition,
short apiKeyVersion,
long createdTimeMs,
@@ -1499,7 +1535,40 @@ public class CommitRequestManagerTest {
);
}
- public ClientResponse mockOffsetCommitResponseDisconnected(String topic,
int partition,
+ private ClientResponse mockOffsetCommitResponse(String topic,
+ short apiKeyVersion,
+ Errors error,
+ int partitionSize) {
+ return mockOffsetCommitResponse(topic, apiKeyVersion,
time.milliseconds(), time.milliseconds(), error, partitionSize);
+ }
+
+ private ClientResponse mockOffsetCommitResponse(String topic,
+ short apiKeyVersion,
+ long createdTimeMs,
+ long receivedTimeMs,
+ Errors error,
+ int partitionSize) {
+ OffsetCommitResponseData responseData = new OffsetCommitResponseData()
+ .setTopics(Collections.singletonList(
+ new
OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setName(topic)
+
.setPartitions(mockOffsetCommitResponseWithPartitionErrors(error,
partitionSize))));
+ OffsetCommitResponse response = mock(OffsetCommitResponse.class);
+ when(response.data()).thenReturn(responseData);
+ return new ClientResponse(
+ new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1),
+ null,
+ "-1",
+ createdTimeMs,
+ receivedTimeMs,
+ false,
+ null,
+ null,
+ new OffsetCommitResponse(responseData)
+ );
+ }
+
+ private ClientResponse mockOffsetCommitResponseDisconnected(String topic,
int partition,
short
apiKeyVersion,
NetworkClientDelegate.UnsentRequest unsentRequest) {
OffsetCommitResponseData responseData = new OffsetCommitResponseData()
@@ -1553,4 +1622,12 @@ public class CommitRequestManagerTest {
name,
CONSUMER_COORDINATOR_METRICS));
}
+
+ private List<OffsetCommitResponseData.OffsetCommitResponsePartition>
mockOffsetCommitResponseWithPartitionErrors(Errors error, int partitionSize) {
+ List<OffsetCommitResponseData.OffsetCommitResponsePartition>
partitions = new ArrayList<>(partitionSize);
+ for (int i = 0; i < partitionSize; i++) {
+ partitions.add(new
OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(error.code()).setPartitionIndex(i));
+ }
+ return partitions;
+ }
}