This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8843f06841 KAFKA-16167: Disable wakeups during autocommit on close
(#15445)
c8843f06841 is described below
commit c8843f06841d7f3c94b640ec9dbf69ec4682ec11
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Mar 1 11:14:13 2024 +0100
KAFKA-16167: Disable wakeups during autocommit on close (#15445)
When the consumer is closed, we perform a sychronous autocommit. We don't
want to be woken up here, because we are already executing a close operation
under a deadline. This is in line with the behavior of the old consumer.
This fixes PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup which is
flaky on trunk - because we return immediately from the synchronous commit with
a WakeupException, which causes us to not wait for the commit to finish and
thereby sometimes miss the committed offset when a new consumer is created.
Reviewers: Matthias J. Sax <[email protected]>, Bruno Cadonna
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 8 +++--
.../clients/consumer/internals/WakeupTrigger.java | 11 ++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 31 +++++++++++++++--
.../consumer/internals/WakeupTriggerTest.java | 39 ++++++++++++++++++++++
4 files changed, 84 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d810c5f053b..e706898b70a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1228,6 +1228,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
+ // We are already closing with a timeout, don't allow wake-ups from
here on.
+ wakeupTrigger.disableWakeups();
+
final Timer closeTimer = time.timer(timeout);
clientTelemetryReporter.ifPresent(reporter ->
reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
@@ -1265,7 +1268,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
void prepareShutdown(final Timer timer, final AtomicReference<Throwable>
firstException) {
if (!groupMetadata.isPresent())
return;
- maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
+ maybeAutoCommitSync(autoCommitEnabled, timer);
applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(
() -> {
@@ -1277,8 +1280,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// Visible for testing
void maybeAutoCommitSync(final boolean shouldAutoCommit,
- final Timer timer,
- final AtomicReference<Throwable> firstException) {
+ final Timer timer) {
if (!shouldAutoCommit)
return;
Map<TopicPartition, OffsetAndMetadata> allConsumed =
subscriptions.allConsumed();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
index 5a030f6307d..209d5e41bee 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
@@ -73,6 +73,8 @@ public class WakeupTrigger {
} else if (task instanceof WakeupFuture) {
currentTask.completeExceptionally(new WakeupException());
return null;
+ } else if (task instanceof DisabledWakeups) {
+ return task;
}
// last active state is still active
throw new KafkaException("Last active task is still active");
@@ -88,6 +90,8 @@ public class WakeupTrigger {
} else if (task instanceof WakeupFuture) {
throwWakeupException.set(true);
return null;
+ } else if (task instanceof DisabledWakeups) {
+ return task;
}
// last active state is still active
throw new IllegalStateException("Last active task is still
active");
@@ -97,6 +101,10 @@ public class WakeupTrigger {
}
}
+ public void disableWakeups() {
+ pendingTask.set(new DisabledWakeups());
+ }
+
public void clearTask() {
pendingTask.getAndUpdate(task -> {
if (task == null) {
@@ -131,6 +139,9 @@ public class WakeupTrigger {
interface Wakeupable { }
+ // Set to block wakeups from happening and pending actions to be
registered.
+ static class DisabledWakeups implements Wakeupable { }
+
static class ActiveFuture implements Wakeupable {
private final CompletableFuture<?> future;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 2d6899612f0..35e742fbd0a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
@@ -120,6 +121,7 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -676,7 +678,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
subscriptions.seek(new TopicPartition("topic", 0), 100);
- consumer.maybeAutoCommitSync(true, time.timer(100), null);
+ consumer.maybeAutoCommitSync(true, time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
}
@@ -694,7 +696,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
subscriptions.seek(new TopicPartition("topic", 0), 100);
- consumer.maybeAutoCommitSync(false, time.timer(100), null);
+ consumer.maybeAutoCommitSync(false, time.timer(100));
verify(applicationEventHandler,
never()).add(any(SyncCommitEvent.class));
}
@@ -892,6 +894,31 @@ public class AsyncKafkaConsumerTest {
assertNull(consumer.wakeupTrigger().getPendingTask());
}
+ @Test
+ public void testNoWakeupInCloseCommit() {
+ TopicPartition tp = new TopicPartition("topic1", 0);
+ consumer = newConsumer();
+ consumer.assign(Collections.singleton(tp));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ consumer.seek(tp, 10);
+ consumer.wakeup();
+
+ AtomicReference<SyncCommitEvent> capturedEvent = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ ApplicationEvent event = invocation.getArgument(0);
+ if (event instanceof SyncCommitEvent) {
+ capturedEvent.set((SyncCommitEvent) event);
+ }
+ return null;
+ }).when(applicationEventHandler).add(any());
+
+ consumer.close(Duration.ZERO);
+
+ // A commit was triggered and not completed exceptionally by the wakeup
+ assertNotNull(capturedEvent.get());
+ assertFalse(capturedEvent.get().future().isCompletedExceptionally());
+ }
+
@Test
public void testInterceptorAutoCommitOnClose() {
Properties props = requiredConsumerPropertiesAndGroupId("test-id");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
index 235ec78168d..3e1518814e7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
@@ -28,12 +28,14 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -133,6 +135,43 @@ public class WakeupTriggerTest {
}
}
+ @Test
+ public void testDisableWakeupWithoutPendingTask() {
+ wakeupTrigger.disableWakeups();
+ wakeupTrigger.wakeup();
+ assertDoesNotThrow(() -> wakeupTrigger.maybeTriggerWakeup());
+ }
+
+ @Test
+ public void testDisableWakeupWithPendingTask() {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ wakeupTrigger.disableWakeups();
+ wakeupTrigger.setActiveTask(future);
+ wakeupTrigger.wakeup();
+ assertFalse(future.isCompletedExceptionally());
+ assertDoesNotThrow(() -> wakeupTrigger.maybeTriggerWakeup());
+ }
+
+ @Test
+ public void testDisableWakeupWithFetchAction() {
+ try (final FetchBuffer fetchBuffer = mock(FetchBuffer.class)) {
+ wakeupTrigger.disableWakeups();
+ wakeupTrigger.setFetchAction(fetchBuffer);
+ wakeupTrigger.wakeup();
+ verify(fetchBuffer, never()).wakeup();
+ }
+ }
+
+ @Test
+ public void testDisableWakeupPreservedByClearTask() {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+ wakeupTrigger.disableWakeups();
+ wakeupTrigger.setActiveTask(future);
+ wakeupTrigger.clearTask();
+ wakeupTrigger.wakeup();
+ assertDoesNotThrow(() -> wakeupTrigger.maybeTriggerWakeup());
+ }
+
private void assertWakeupExceptionIsThrown(final CompletableFuture<?>
future) {
assertTrue(future.isCompletedExceptionally());
try {