This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7f376f6c16 KAFKA-12639: Exit upon expired timer to prevent tight
looping (#13190)
f7f376f6c16 is described below
commit f7f376f6c162717e60e143b05fbd12ea2f347e3c
Author: Philip Nee <[email protected]>
AuthorDate: Tue Feb 28 17:36:37 2023 -0800
KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190)
In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be
retried without proper backoff, due to the expired timer. This is an uncommon
scenario and possibly only appears during the testing, but I think it makes
sense to enforce the client to drive the join group via poll.
Reviewers: Guozhang Wang <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 12 +++++--
.../internals/AbstractCoordinatorTest.java | 39 ++++++++++++++++++++++
.../internals/ConsumerCoordinatorTest.java | 3 +-
3 files changed, 50 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 558ce2b1169..fc01b14ab08 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -501,13 +501,19 @@ public abstract class AbstractCoordinator implements
Closeable {
}
if (exception instanceof UnknownMemberIdException ||
- exception instanceof IllegalGenerationException ||
- exception instanceof RebalanceInProgressException ||
- exception instanceof MemberIdRequiredException)
+ exception instanceof IllegalGenerationException ||
+ exception instanceof RebalanceInProgressException ||
+ exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
+ // We need to return upon expired timer, in case if the
client.poll returns immediately and the time
+ // has elapsed.
+ if (timer.isExpired()) {
+ return false;
+ }
+
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 908dd7e4485..0a2b1e7ef8b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -1561,6 +1561,45 @@ public class AbstractCoordinatorTest {
}
}
+ @Test
+ public void testBackoffAndRetryUponRetriableError() {
+ this.mockTime = new MockTime();
+ long currentTimeMs = System.currentTimeMillis();
+ this.mockTime.setCurrentTimeMs(System.currentTimeMillis());
+
+ setupCoordinator(); // note: uses 100ms backoff
+ mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
+ coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+ // Retriable Exception
+
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+ mockClient.prepareResponse(joinGroupResponse(Errors.NONE)); // Retry
w/o error
+ mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+ coordinator.joinGroupIfNeeded(mockTime.timer(REQUEST_TIMEOUT_MS));
+
+ assertEquals(100, mockTime.milliseconds() - currentTimeMs, 1);
+ }
+
+ @Test
+ public void testReturnUponRetriableErrorAndExpiredTimer() throws
InterruptedException {
+ setupCoordinator();
+ mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
+ coordinator.ensureCoordinatorReady(mockTime.timer(0));
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Timer t = mockTime.timer(500);
+ try {
+ Future<Boolean> attempt = executor.submit(() ->
coordinator.joinGroupIfNeeded(t));
+ mockTime.sleep(500);
+
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+ assertFalse(attempt.get());
+ } catch (Exception e) {
+ fail();
+ } finally {
+ executor.shutdownNow();
+ executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ }
+ }
+
private AtomicBoolean prepareFirstHeartbeat() {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
mockClient.prepareResponse(body -> {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 36bf0ea6825..0f1256e1689 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1484,7 +1484,8 @@ public abstract class ConsumerCoordinatorTest {
Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1))));
client.respond(joinGroupFollowerResponse(1, consumerId, "leader",
Errors.NOT_COORDINATOR));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
- coordinator.poll(time.timer(0));
+ assertFalse(client.hasInFlightRequests());
+ coordinator.poll(time.timer(1));
assertTrue(coordinator.rejoinNeededOrPending());
client.respond(request -> {