This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f7f376f6c16 KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190) f7f376f6c16 is described below commit f7f376f6c162717e60e143b05fbd12ea2f347e3c Author: Philip Nee <p...@confluent.io> AuthorDate: Tue Feb 28 17:36:37 2023 -0800 KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190) In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../consumer/internals/AbstractCoordinator.java | 12 +++++-- .../internals/AbstractCoordinatorTest.java | 39 ++++++++++++++++++++++ .../internals/ConsumerCoordinatorTest.java | 3 +- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 558ce2b1169..fc01b14ab08 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -501,13 +501,19 @@ public abstract class AbstractCoordinator implements Closeable { } if (exception instanceof UnknownMemberIdException || - exception instanceof IllegalGenerationException || - exception instanceof RebalanceInProgressException || - exception instanceof MemberIdRequiredException) + exception instanceof IllegalGenerationException || + exception instanceof RebalanceInProgressException || + exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; + // We need to return upon expired timer, in case if the client.poll returns immediately and the time + // has elapsed. + if (timer.isExpired()) { + return false; + } + timer.sleep(rebalanceConfig.retryBackoffMs); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 908dd7e4485..0a2b1e7ef8b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -1561,6 +1561,45 @@ public class AbstractCoordinatorTest { } } + @Test + public void testBackoffAndRetryUponRetriableError() { + this.mockTime = new MockTime(); + long currentTimeMs = System.currentTimeMillis(); + this.mockTime.setCurrentTimeMs(System.currentTimeMillis()); + + setupCoordinator(); // note: uses 100ms backoff + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + + // Retriable Exception + mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + mockClient.prepareResponse(joinGroupResponse(Errors.NONE)); // Retry w/o error + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + coordinator.joinGroupIfNeeded(mockTime.timer(REQUEST_TIMEOUT_MS)); + + assertEquals(100, mockTime.milliseconds() - currentTimeMs, 1); + } + + @Test + public void testReturnUponRetriableErrorAndExpiredTimer() throws InterruptedException { + setupCoordinator(); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + ExecutorService executor = Executors.newFixedThreadPool(1); + Timer t = mockTime.timer(500); + try { + Future<Boolean> attempt = executor.submit(() -> coordinator.joinGroupIfNeeded(t)); + mockTime.sleep(500); + mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + assertFalse(attempt.get()); + } catch (Exception e) { + fail(); + } finally { + executor.shutdownNow(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + } + private AtomicBoolean prepareFirstHeartbeat() { final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); mockClient.prepareResponse(body -> { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 36bf0ea6825..0f1256e1689 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1484,7 +1484,8 @@ public abstract class ConsumerCoordinatorTest { Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1)))); client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.poll(time.timer(0)); + assertFalse(client.hasInFlightRequests()); + coordinator.poll(time.timer(1)); assertTrue(coordinator.rejoinNeededOrPending()); client.respond(request -> {