This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fdece9c358f MINOR: Keep pendingTask as WakeupFuture if currentTask is
completed already. (#21586)
fdece9c358f is described below
commit fdece9c358f3c60a83f086d8adac9749d0e45fba
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Mon Mar 9 15:42:22 2026 -0700
MINOR: Keep pendingTask as WakeupFuture if currentTask is completed
already. (#21586)
System tests that use VerifiableConsumer are flaky because
VerifiableConsumer isn't shutting down on request in certain situations.
There can be a race condition in the commitSync method, as the future
that we set as the active task to the wakeupTrigger can be already
completed by the time we are setting it. Which leads to the wakeup
request never being fulfilled. Added a check if the task we are
receiving in setActiveTask was triggered when we complete it
exceptionally.
Also added additional logging when a shutdown is requested to make
debugging easier.
Reviewers: Kirk True <[email protected]>, Bill Bejeck
<[email protected]>
---
.../clients/consumer/internals/WakeupTrigger.java | 8 ++++--
.../consumer/internals/WakeupTriggerTest.java | 33 ++++++++++++++++++++++
tests/kafkatest/services/verifiable_consumer.py | 2 ++
.../org/apache/kafka/tools/VerifiableConsumer.java | 9 ++++++
4 files changed, 50 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
index 7893cf29f23..b7d8c001798 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
@@ -78,8 +78,12 @@ public class WakeupTrigger {
if (task == null) {
return new ActiveFuture(currentTask);
} else if (task instanceof WakeupFuture) {
- currentTask.completeExceptionally(new WakeupException());
- return null;
+ boolean wasTriggered = currentTask.completeExceptionally(new
WakeupException());
+
+ // If the Future was *already* completed when we invoke
completeExceptionally, the WakeupException
+ // will be ignored. If it was already completed, we then need
to return a new WakeupFuture so that the
+ // next call to setActiveTask will throw the WakeupException.
+ return wasTriggered ? null : new WakeupFuture();
} else if (task instanceof DisabledWakeups) {
return task;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
index 518f1cc6978..ad3a5071480 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
@@ -219,6 +219,39 @@ public class WakeupTriggerTest {
assertThrows(WakeupException.class, () ->
wakeupTrigger.maybeTriggerWakeup());
}
+ @Test
+ public void
testExceptionTriggeredWhenTaskAsynchronouslyCompletedBeforeSet() {
+ final CompletableFuture<Void> task = new CompletableFuture<>();
+ task.complete(null);
+ wakeupTrigger.wakeup();
+ wakeupTrigger.setActiveTask(task);
+ assertNotNull(wakeupTrigger.getPendingTask());
+ assertInstanceOf(WakeupTrigger.WakeupFuture.class,
wakeupTrigger.getPendingTask());
+ assertThrows(WakeupException.class, () ->
wakeupTrigger.maybeTriggerWakeup());
+ }
+
+ @Test
+ public void testExceptionTriggeredWhenTaskAsynchronouslyFailedBeforeSet() {
+ final CompletableFuture<Void> task = new CompletableFuture<>();
+ task.completeExceptionally(new RuntimeException("Simulated error"));
+ wakeupTrigger.wakeup();
+ wakeupTrigger.setActiveTask(task);
+ assertNotNull(wakeupTrigger.getPendingTask());
+ assertInstanceOf(WakeupTrigger.WakeupFuture.class,
wakeupTrigger.getPendingTask());
+ assertThrows(WakeupException.class, () ->
wakeupTrigger.maybeTriggerWakeup());
+ }
+
+ @Test
+ public void
testExceptionTriggeredWhenTaskAsynchronouslyCancelledBeforeSet() {
+ final CompletableFuture<Void> task = new CompletableFuture<>();
+ task.cancel(true);
+ wakeupTrigger.wakeup();
+ wakeupTrigger.setActiveTask(task);
+ assertNotNull(wakeupTrigger.getPendingTask());
+ assertInstanceOf(WakeupTrigger.WakeupFuture.class,
wakeupTrigger.getPendingTask());
+ assertThrows(WakeupException.class, () ->
wakeupTrigger.maybeTriggerWakeup());
+ }
+
private void assertWakeupExceptionIsThrown(final CompletableFuture<?>
future) {
assertTrue(future.isCompletedExceptionally());
assertInstanceOf(WakeupException.class,
diff --git a/tests/kafkatest/services/verifiable_consumer.py
b/tests/kafkatest/services/verifiable_consumer.py
index dd05b693638..7ae3ee3c9dc 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -352,6 +352,8 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
handler.handle_partitions_revoked(event, node,
self.logger)
elif name == "partitions_assigned":
handler.handle_partitions_assigned(event, node,
self.logger)
+ elif name == "shutdown_requested":
+ self.logger.debug("Shutdown has been requested")
else:
self.logger.debug("%s: ignoring unknown event: %s" %
(str(node.account), event))
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index d1e51a6ae78..25c82c713b1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -259,6 +259,7 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
public void close() {
boolean interrupted = false;
try {
+ printJson(new ShutdownRequested());
consumer.wakeup();
while (true) {
try {
@@ -295,6 +296,14 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
}
}
+ private static class ShutdownRequested extends ConsumerEvent {
+
+ @Override
+ public String name() {
+ return "shutdown_requested";
+ }
+ }
+
private static class ShutdownComplete extends ConsumerEvent {
@Override