This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06f7cb3e515 MINOR: Unwrap exceptions in CoordinatorTimerImpl and
CoordinatorExecutorImpl (#21458)
06f7cb3e515 is described below
commit 06f7cb3e5156a901620f77d6d5bcad13610a5379
Author: David Jacot <[email protected]>
AuthorDate: Thu Feb 12 20:46:51 2026 +0100
MINOR: Unwrap exceptions in CoordinatorTimerImpl and
CoordinatorExecutorImpl (#21458)
The exceptionally handlers in CoordinatorTimerImpl and
CoordinatorExecutorImpl use instanceof checks to determine how to handle
failures. However, exceptions propagated through CompletableFuture
chains may be wrapped in CompletionException, causing the instanceof
checks to fail and the exceptions to be misclassified. This could lead
to incorrect behavior such as retrying when the coordinator is no
longer active or logging at the wrong level.
This patch unwraps exceptions using Errors.maybeUnwrapException at the
top of each exceptionally handler before performing the instanceof
checks. It also adds parameterized test coverage in
CoordinatorTimerImplTest to verify correct behavior with both wrapped
and unwrapped exceptions.
Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../common/runtime/CoordinatorExecutorImpl.java | 6 +++++
.../common/runtime/CoordinatorTimerImpl.java | 6 +++++
.../common/runtime/CoordinatorTimerImplTest.java | 27 ++++++++++++++--------
3 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
index 5300a8b1583..31aeebee30a 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -85,6 +86,11 @@ public class CoordinatorExecutorImpl<U> implements
CoordinatorExecutor<U> {
return operation.onComplete(result.result(),
result.exception());
}
).exceptionally(exception -> {
+ // Exceptions may be wrapped in CompletionException when
propagated
+ // through CompletableFuture chains, so we unwrap them before
+ // checking types with instanceof.
+ exception = Errors.maybeUnwrapException(exception);
+
// Remove the task after a failure.
tasks.remove(key, task);
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
index a3cdeacf6ad..6b2eaff5ff2 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
@@ -97,6 +98,11 @@ public class CoordinatorTimerImpl<U> implements
CoordinatorTimer<U> {
return operation.generateRecords();
}
).exceptionally(ex -> {
+ // Exceptions may be wrapped in CompletionException when
propagated
+ // through CompletableFuture chains, so we unwrap them
before
+ // checking types with instanceof.
+ ex = Errors.maybeUnwrapException(ex);
+
// Remove the task after a failure.
tasks.remove(key, this);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
index 36eb152fef6..07866aa3a04 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
@@ -22,9 +22,12 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.util.timer.MockTimer;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -313,15 +316,17 @@ public class CoordinatorTimerImplTest {
assertEquals(1, callCount.get());
}
- @Test
- public void testTimerIgnoredOnNotCoordinatorException() throws
InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTimerIgnoredOnNotCoordinatorException(boolean
wrapException) throws InterruptedException {
var mockTimer = new MockTimer();
var callCount = new AtomicInteger(0);
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
operation.generate();
callCount.incrementAndGet();
- return CompletableFuture.failedFuture(new
NotCoordinatorException("Not coordinator"));
+ var ex = new NotCoordinatorException("Not coordinator");
+ return CompletableFuture.failedFuture(wrapException ? new
CompletionException(ex) : ex);
};
var timer = new CoordinatorTimerImpl<>(
@@ -354,15 +359,17 @@ public class CoordinatorTimerImplTest {
assertEquals(1, callCount.get());
}
- @Test
- public void testTimerIgnoredOnCoordinatorLoadInProgressException() throws
InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTimerIgnoredOnCoordinatorLoadInProgressException(boolean
wrapException) throws InterruptedException {
var mockTimer = new MockTimer();
var callCount = new AtomicInteger(0);
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
operation.generate();
callCount.incrementAndGet();
- return CompletableFuture.failedFuture(new
CoordinatorLoadInProgressException("Loading"));
+ var ex = new CoordinatorLoadInProgressException("Loading");
+ return CompletableFuture.failedFuture(wrapException ? new
CompletionException(ex) : ex);
};
var timer = new CoordinatorTimerImpl<>(
@@ -578,8 +585,9 @@ public class CoordinatorTimerImplTest {
assertEquals(0, timer.size());
}
- @Test
- public void testTaskCleanupOnFailedFutureWithoutOperationExecution()
throws InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTaskCleanupOnFailedFutureWithoutOperationExecution(boolean
wrapException) throws InterruptedException {
var mockTimer = new MockTimer();
var operationCalled = new AtomicBoolean(false);
@@ -588,7 +596,8 @@ public class CoordinatorTimerImplTest {
// (2) events failing before being executed.
CoordinatorShardScheduler<String> scheduler = (operationName,
operation) -> {
// Don't call operation.generate() - simulates event never being
executed
- return CompletableFuture.failedFuture(new
NotCoordinatorException("Not coordinator"));
+ NotCoordinatorException ex = new NotCoordinatorException("Not
coordinator");
+ return CompletableFuture.failedFuture(wrapException ? new
CompletionException(ex) : ex);
};
var timer = new CoordinatorTimerImpl<>(