This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new e72db09894c KAFKA-14208; Do not raise wakeup in consumer during 
asynchronous offset commits (#12626)
e72db09894c is described below

commit e72db09894cde8ab67704353b850d712fa626e3d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Sep 13 00:43:09 2022 -0700

    KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset 
commits (#12626)
    
    Asynchronous offset commits may throw an unexpected WakeupException 
following #11631 and #12244. This patch fixes the problem by passing through a 
flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. 
This is used to disable wakeups in the context of asynchronous offset commits. 
All other uses leave wakeups enabled.
    
    Note: this patch builds on top of #12611.
    
    Co-Authored-By: Guozhang Wang wangg...@gmail.com
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java       | 19 ++++++++++++++++---
 .../consumer/internals/ConsumerCoordinator.java       | 13 ++++++++-----
 .../consumer/internals/ConsumerNetworkClient.java     | 17 ++++++++++++++++-
 .../consumer/internals/AbstractCoordinatorTest.java   | 15 +++++++++++++++
 4 files changed, 55 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2b705f347a9..c59629f4ca7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,14 +231,27 @@ public abstract class AbstractCoordinator implements 
Closeable {
     protected void onLeavePrepare() {}
 
     /**
-     * Visible for testing.
-     *
      * Ensure that the coordinator is ready to receive requests.
      *
      * @param timer Timer bounding how long this method can block
      * @return true If coordinator discovery and initial connection succeeded, 
false otherwise
      */
     protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
+        return ensureCoordinatorReady(timer, false);
+    }
+
+    /**
+     * Ensure that the coordinator is ready to receive requests. This will 
return
+     * immediately without blocking. It is intended to be called in an 
asynchronous
+     * context when wakeups are not expected.
+     *
+     * @return true If coordinator discovery and initial connection succeeded, 
false otherwise
+     */
+    protected synchronized boolean ensureCoordinatorReadyAsync() {
+        return ensureCoordinatorReady(time.timer(0), true);
+    }
+
+    private synchronized boolean ensureCoordinatorReady(final Timer timer, 
boolean disableWakeup) {
         if (!coordinatorUnknown())
             return true;
 
@@ -249,7 +262,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+            client.poll(future, timer, disableWakeup);
 
             if (!future.isDone()) {
                 // ran out of time
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a614504704f..4b5d0aac413 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import java.time.Duration;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -489,10 +488,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
     }
 
-    private boolean coordinatorUnknownAndUnready(Timer timer) {
+    private boolean coordinatorUnknownAndUnreadySync(Timer timer) {
         return coordinatorUnknown() && !ensureCoordinatorReady(timer);
     }
 
+    private boolean coordinatorUnknownAndUnreadyAsync() {
+        return coordinatorUnknown() && !ensureCoordinatorReadyAsync();
+    }
+
     /**
      * Poll for coordinator events. This ensures that the coordinator is known 
and that the consumer
      * has joined the group (if it is using group management). This also 
handles periodic offset commits
@@ -518,7 +521,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             // Always update the heartbeat last poll time so that the 
heartbeat thread does not leave the
             // group proactively due to application inactivity even if (say) 
the coordinator cannot be found.
             pollHeartbeat(timer.currentTimeMs());
-            if (coordinatorUnknownAndUnready(timer)) {
+            if (coordinatorUnknownAndUnreadySync(timer)) {
                 return false;
             }
 
@@ -1069,7 +1072,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (offsets.isEmpty()) {
             // No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
             future = doCommitOffsetsAsync(offsets, callback);
-        } else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+        } else if (!coordinatorUnknownAndUnreadyAsync()) {
             // we need to make sure coordinator is ready before committing, 
since
             // this is for async committing we do not try to block, but just 
try once to
             // clear the previous discover-coordinator future, resend, or get 
responses;
@@ -1158,7 +1161,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             return true;
 
         do {
-            if (coordinatorUnknownAndUnready(timer)) {
+            if (coordinatorUnknownAndUnreadySync(timer)) {
                 return false;
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4b9112016e9..6646dc6c893 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -211,8 +211,23 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws InterruptException if the calling thread is interrupted
      */
     public boolean poll(RequestFuture<?> future, Timer timer) {
+        return poll(future, timer, false);
+    }
+
+    /**
+     * Block until the provided request future request has finished or the 
timeout has expired.
+     *
+     * @param future The request future to wait for
+     * @param timer Timer bounding how long this method can block
+     * @param disableWakeup true if we should not check for wakeups, false 
otherwise
+     *
+     * @return true if the future is done, false otherwise
+     * @throws WakeupException if {@link #wakeup()} is called from another 
thread and `disableWakeup` is false
+     * @throws InterruptException if the calling thread is interrupted
+     */
+    public boolean poll(RequestFuture<?> future, Timer timer, boolean 
disableWakeup) {
         do {
-            poll(timer, future);
+            poll(timer, future, disableWakeup);
         } while (!future.isDone() && timer.notExpired());
         return future.isDone();
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 69dd893e12f..0745b99749f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -271,6 +271,21 @@ public class AbstractCoordinatorTest {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testWakeupFromEnsureCoordinatorReady() {
+        setupCoordinator();
+
+        consumerClient.wakeup();
+
+        // No wakeup should occur from the async variation.
+        coordinator.ensureCoordinatorReadyAsync();
+
+        // But should wakeup in sync variation even if timer is 0.
+        assertThrows(WakeupException.class, () -> {
+            coordinator.ensureCoordinatorReady(mockTime.timer(0));
+        });
+    }
+
     @Test
     public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception {
         setupCoordinator();

Reply via email to