This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8843f06841 KAFKA-16167: Disable wakeups during autocommit on close 
(#15445)
c8843f06841 is described below

commit c8843f06841d7f3c94b640ec9dbf69ec4682ec11
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Mar 1 11:14:13 2024 +0100

    KAFKA-16167: Disable wakeups during autocommit on close (#15445)
    
    When the consumer is closed, we perform a sychronous autocommit. We don't 
want to be woken up here, because we are already executing a close operation 
under a deadline. This is in line with the behavior of the old consumer.
    
    This fixes PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup which is 
flaky on trunk - because we return immediately from the synchronous commit with 
a WakeupException, which causes us to not wait for the commit to finish and 
thereby sometimes miss the committed offset when a new consumer is created.
    
    Reviewers: Matthias J. Sax <[email protected]>, Bruno Cadonna 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  8 +++--
 .../clients/consumer/internals/WakeupTrigger.java  | 11 ++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 31 +++++++++++++++--
 .../consumer/internals/WakeupTriggerTest.java      | 39 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d810c5f053b..e706898b70a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1228,6 +1228,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
 
+        // We are already closing with a timeout, don't allow wake-ups from 
here on.
+        wakeupTrigger.disableWakeups();
+
         final Timer closeTimer = time.timer(timeout);
         clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
         closeTimer.update();
@@ -1265,7 +1268,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     void prepareShutdown(final Timer timer, final AtomicReference<Throwable> 
firstException) {
         if (!groupMetadata.isPresent())
             return;
-        maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
+        maybeAutoCommitSync(autoCommitEnabled, timer);
         applicationEventHandler.add(new CommitOnCloseEvent());
         completeQuietly(
             () -> {
@@ -1277,8 +1280,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     // Visible for testing
     void maybeAutoCommitSync(final boolean shouldAutoCommit,
-                             final Timer timer,
-                             final AtomicReference<Throwable> firstException) {
+                             final Timer timer) {
         if (!shouldAutoCommit)
             return;
         Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
index 5a030f6307d..209d5e41bee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
@@ -73,6 +73,8 @@ public class WakeupTrigger {
             } else if (task instanceof WakeupFuture) {
                 currentTask.completeExceptionally(new WakeupException());
                 return null;
+            } else if (task instanceof DisabledWakeups) {
+                return task;
             }
             // last active state is still active
             throw new KafkaException("Last active task is still active");
@@ -88,6 +90,8 @@ public class WakeupTrigger {
             } else if (task instanceof WakeupFuture) {
                 throwWakeupException.set(true);
                 return null;
+            } else if (task instanceof DisabledWakeups) {
+                return task;
             }
             // last active state is still active
             throw new IllegalStateException("Last active task is still 
active");
@@ -97,6 +101,10 @@ public class WakeupTrigger {
         }
     }
 
+    public void disableWakeups() {
+        pendingTask.set(new DisabledWakeups());
+    }
+
     public void clearTask() {
         pendingTask.getAndUpdate(task -> {
             if (task == null) {
@@ -131,6 +139,9 @@ public class WakeupTrigger {
 
     interface Wakeupable { }
 
+    // Set to block wakeups from happening and pending actions to be 
registered.
+    static class DisabledWakeups implements Wakeupable { }
+
     static class ActiveFuture implements Wakeupable {
         private final CompletableFuture<?> future;
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 2d6899612f0..35e742fbd0a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
@@ -120,6 +121,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -676,7 +678,7 @@ public class AsyncKafkaConsumerTest {
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
-        consumer.maybeAutoCommitSync(true, time.timer(100), null);
+        consumer.maybeAutoCommitSync(true, time.timer(100));
         verify(applicationEventHandler).add(any(SyncCommitEvent.class));
     }
 
@@ -694,7 +696,7 @@ public class AsyncKafkaConsumerTest {
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
-        consumer.maybeAutoCommitSync(false, time.timer(100), null);
+        consumer.maybeAutoCommitSync(false, time.timer(100));
         verify(applicationEventHandler, 
never()).add(any(SyncCommitEvent.class));
     }
 
@@ -892,6 +894,31 @@ public class AsyncKafkaConsumerTest {
         assertNull(consumer.wakeupTrigger().getPendingTask());
     }
 
+    @Test
+    public void testNoWakeupInCloseCommit() {
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        consumer = newConsumer();
+        consumer.assign(Collections.singleton(tp));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        consumer.seek(tp, 10);
+        consumer.wakeup();
+
+        AtomicReference<SyncCommitEvent> capturedEvent = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            ApplicationEvent event = invocation.getArgument(0);
+            if (event instanceof SyncCommitEvent) {
+                capturedEvent.set((SyncCommitEvent) event);
+            }
+            return null;
+        }).when(applicationEventHandler).add(any());
+
+        consumer.close(Duration.ZERO);
+
+        // A commit was triggered and not completed exceptionally by the wakeup
+        assertNotNull(capturedEvent.get());
+        assertFalse(capturedEvent.get().future().isCompletedExceptionally());
+    }
+
     @Test
     public void testInterceptorAutoCommitOnClose() {
         Properties props = requiredConsumerPropertiesAndGroupId("test-id");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
index 235ec78168d..3e1518814e7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
@@ -28,12 +28,14 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -133,6 +135,43 @@ public class WakeupTriggerTest {
         }
     }
 
+    @Test
+    public void testDisableWakeupWithoutPendingTask() {
+        wakeupTrigger.disableWakeups();
+        wakeupTrigger.wakeup();
+        assertDoesNotThrow(() -> wakeupTrigger.maybeTriggerWakeup());
+    }
+
+    @Test
+    public void testDisableWakeupWithPendingTask() {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        wakeupTrigger.disableWakeups();
+        wakeupTrigger.setActiveTask(future);
+        wakeupTrigger.wakeup();
+        assertFalse(future.isCompletedExceptionally());
+        assertDoesNotThrow(() -> wakeupTrigger.maybeTriggerWakeup());
+    }
+
+    @Test
+    public void testDisableWakeupWithFetchAction() {
+        try (final FetchBuffer fetchBuffer = mock(FetchBuffer.class)) {
+            wakeupTrigger.disableWakeups();
+            wakeupTrigger.setFetchAction(fetchBuffer);
+            wakeupTrigger.wakeup();
+            verify(fetchBuffer, never()).wakeup();
+        }
+    }
+
+    @Test
+    public void testDisableWakeupPreservedByClearTask() {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        wakeupTrigger.disableWakeups();
+        wakeupTrigger.setActiveTask(future);
+        wakeupTrigger.clearTask();
+        wakeupTrigger.wakeup();
+        assertDoesNotThrow(() -> wakeupTrigger.maybeTriggerWakeup());
+    }
+
     private void assertWakeupExceptionIsThrown(final CompletableFuture<?> 
future) {
         assertTrue(future.isCompletedExceptionally());
         try {

Reply via email to