This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 5f46ff85547 KAFKA-18469;KAFKA-18036: AsyncConsumer should request metadata update if ListOffsetRequest encounters a retriable error (#18475) 5f46ff85547 is described below commit 5f46ff855475c23f8dfba9ec77df4de9dd184ebb Author: 陳昱霖(Yu-Lin Chen) <chenyulin0...@apache.org> AuthorDate: Tue Jan 14 02:03:52 2025 +0800 KAFKA-18469;KAFKA-18036: AsyncConsumer should request metadata update if ListOffsetRequest encounters a retriable error (#18475) Reviewers: Lianet Magrans <lmagr...@confluent.io> --- .../kafka/clients/consumer/internals/OffsetsRequestManager.java | 1 + .../kafka/clients/consumer/internals/OffsetsRequestManagerTest.java | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 7870caec1ba..4c8d10ad323 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -578,6 +578,7 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou listOffsetsRequestState.globalResult.complete(listOffsetResult); } else { requestsToRetry.add(listOffsetsRequestState); + metadata.requestUpdate(false); } } else { log.debug("ListOffsets request failed with error", error); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index dbfcb6cd46a..2f92740c414 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -286,6 +286,8 @@ public class OffsetsRequestManagerTest { assertFalse(fetchOffsetsFuture.isDone()); assertEquals(1, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); + // A retriable error should be followed by a metadata update request + verify(metadata).requestUpdate(false); // Cluster metadata update. Failed requests should be retried and succeed mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); @@ -384,6 +386,8 @@ public class OffsetsRequestManagerTest { assertFalse(fetchOffsetsFuture.isDone()); assertEquals(1, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); + // A retriable error should be followed by a metadata update request + verify(metadata).requestUpdate(false); // Cluster metadata update. Failed requests should be retried mockSuccessfulRequest(partitionLeaders);