This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push: new e72db09894c KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626) e72db09894c is described below commit e72db09894cde8ab67704353b850d712fa626e3d Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Tue Sep 13 00:43:09 2022 -0700 KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangg...@gmail.com Reviewers: Luke Chen <show...@gmail.com> --- .../consumer/internals/AbstractCoordinator.java | 19 ++++++++++++++++--- .../consumer/internals/ConsumerCoordinator.java | 13 ++++++++----- .../consumer/internals/ConsumerNetworkClient.java | 17 ++++++++++++++++- .../consumer/internals/AbstractCoordinatorTest.java | 15 +++++++++++++++ 4 files changed, 55 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 2b705f347a9..c59629f4ca7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -231,14 +231,27 @@ public abstract class AbstractCoordinator implements Closeable { protected void onLeavePrepare() {} /** - * Visible for testing. - * * Ensure that the coordinator is ready to receive requests. * * @param timer Timer bounding how long this method can block * @return true If coordinator discovery and initial connection succeeded, false otherwise */ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { + return ensureCoordinatorReady(timer, false); + } + + /** + * Ensure that the coordinator is ready to receive requests. This will return + * immediately without blocking. It is intended to be called in an asynchronous + * context when wakeups are not expected. + * + * @return true If coordinator discovery and initial connection succeeded, false otherwise + */ + protected synchronized boolean ensureCoordinatorReadyAsync() { + return ensureCoordinatorReady(time.timer(0), true); + } + + private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean disableWakeup) { if (!coordinatorUnknown()) return true; @@ -249,7 +262,7 @@ public abstract class AbstractCoordinator implements Closeable { throw fatalException; } final RequestFuture<Void> future = lookupCoordinator(); - client.poll(future, timer); + client.poll(future, timer, disableWakeup); if (!future.isDone()) { // ran out of time diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index a614504704f..4b5d0aac413 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.time.Duration; import java.util.SortedSet; import java.util.TreeSet; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -489,10 +488,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - private boolean coordinatorUnknownAndUnready(Timer timer) { + private boolean coordinatorUnknownAndUnreadySync(Timer timer) { return coordinatorUnknown() && !ensureCoordinatorReady(timer); } + private boolean coordinatorUnknownAndUnreadyAsync() { + return coordinatorUnknown() && !ensureCoordinatorReadyAsync(); + } + /** * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits @@ -518,7 +521,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. pollHeartbeat(timer.currentTimeMs()); - if (coordinatorUnknownAndUnready(timer)) { + if (coordinatorUnknownAndUnreadySync(timer)) { return false; } @@ -1069,7 +1072,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (offsets.isEmpty()) { // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. future = doCommitOffsetsAsync(offsets, callback); - } else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) { + } else if (!coordinatorUnknownAndUnreadyAsync()) { // we need to make sure coordinator is ready before committing, since // this is for async committing we do not try to block, but just try once to // clear the previous discover-coordinator future, resend, or get responses; @@ -1158,7 +1161,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return true; do { - if (coordinatorUnknownAndUnready(timer)) { + if (coordinatorUnknownAndUnreadySync(timer)) { return false; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 4b9112016e9..6646dc6c893 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -211,8 +211,23 @@ public class ConsumerNetworkClient implements Closeable { * @throws InterruptException if the calling thread is interrupted */ public boolean poll(RequestFuture<?> future, Timer timer) { + return poll(future, timer, false); + } + + /** + * Block until the provided request future request has finished or the timeout has expired. + * + * @param future The request future to wait for + * @param timer Timer bounding how long this method can block + * @param disableWakeup true if we should not check for wakeups, false otherwise + * + * @return true if the future is done, false otherwise + * @throws WakeupException if {@link #wakeup()} is called from another thread and `disableWakeup` is false + * @throws InterruptException if the calling thread is interrupted + */ + public boolean poll(RequestFuture<?> future, Timer timer, boolean disableWakeup) { do { - poll(timer, future); + poll(timer, future, disableWakeup); } while (!future.isDone() && timer.notExpired()); return future.isDone(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 69dd893e12f..0745b99749f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -271,6 +271,21 @@ public class AbstractCoordinatorTest { assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); } + @Test + public void testWakeupFromEnsureCoordinatorReady() { + setupCoordinator(); + + consumerClient.wakeup(); + + // No wakeup should occur from the async variation. + coordinator.ensureCoordinatorReadyAsync(); + + // But should wakeup in sync variation even if timer is 0. + assertThrows(WakeupException.class, () -> { + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + }); + } + @Test public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { setupCoordinator();