This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06f7cb3e515 MINOR: Unwrap exceptions in CoordinatorTimerImpl and 
CoordinatorExecutorImpl (#21458)
06f7cb3e515 is described below

commit 06f7cb3e5156a901620f77d6d5bcad13610a5379
Author: David Jacot <[email protected]>
AuthorDate: Thu Feb 12 20:46:51 2026 +0100

    MINOR: Unwrap exceptions in CoordinatorTimerImpl and 
CoordinatorExecutorImpl (#21458)
    
    The exceptionally handlers in CoordinatorTimerImpl and
    CoordinatorExecutorImpl use instanceof checks to determine how to handle
    failures. However, exceptions propagated through CompletableFuture
    chains may be wrapped in CompletionException,  causing the instanceof
    checks to fail and the exceptions to be  misclassified. This could lead
    to incorrect behavior such as retrying  when the coordinator is no
    longer active or logging at the wrong level.
    
    This patch unwraps exceptions using Errors.maybeUnwrapException at the
    top of each exceptionally handler before performing the instanceof
    checks. It also adds parameterized test coverage in
    CoordinatorTimerImplTest to verify correct behavior with both wrapped
    and unwrapped exceptions.
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../common/runtime/CoordinatorExecutorImpl.java    |  6 +++++
 .../common/runtime/CoordinatorTimerImpl.java       |  6 +++++
 .../common/runtime/CoordinatorTimerImplTest.java   | 27 ++++++++++++++--------
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
index 5300a8b1583..31aeebee30a 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
 
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
@@ -85,6 +86,11 @@ public class CoordinatorExecutorImpl<U> implements 
CoordinatorExecutor<U> {
                     return operation.onComplete(result.result(), 
result.exception());
                 }
             ).exceptionally(exception -> {
+                // Exceptions may be wrapped in CompletionException when 
propagated
+                // through CompletableFuture chains, so we unwrap them before
+                // checking types with instanceof.
+                exception = Errors.maybeUnwrapException(exception);
+
                 // Remove the task after a failure.
                 tasks.remove(key, task);
 
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
index a3cdeacf6ad..6b2eaff5ff2 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
 
 import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.server.util.timer.TimerTask;
@@ -97,6 +98,11 @@ public class CoordinatorTimerImpl<U> implements 
CoordinatorTimer<U> {
                         return operation.generateRecords();
                     }
                 ).exceptionally(ex -> {
+                    // Exceptions may be wrapped in CompletionException when 
propagated
+                    // through CompletableFuture chains, so we unwrap them 
before
+                    // checking types with instanceof.
+                    ex = Errors.maybeUnwrapException(ex);
+
                     // Remove the task after a failure.
                     tasks.remove(key, this);
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
index 36eb152fef6..07866aa3a04 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImplTest.java
@@ -22,9 +22,12 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.server.util.timer.MockTimer;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -313,15 +316,17 @@ public class CoordinatorTimerImplTest {
         assertEquals(1, callCount.get());
     }
 
-    @Test
-    public void testTimerIgnoredOnNotCoordinatorException() throws 
InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTimerIgnoredOnNotCoordinatorException(boolean 
wrapException) throws InterruptedException {
         var mockTimer = new MockTimer();
         var callCount = new AtomicInteger(0);
 
         CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
             operation.generate();
             callCount.incrementAndGet();
-            return CompletableFuture.failedFuture(new 
NotCoordinatorException("Not coordinator"));
+            var ex = new NotCoordinatorException("Not coordinator");
+            return CompletableFuture.failedFuture(wrapException ? new 
CompletionException(ex) : ex);
         };
 
         var timer = new CoordinatorTimerImpl<>(
@@ -354,15 +359,17 @@ public class CoordinatorTimerImplTest {
         assertEquals(1, callCount.get());
     }
 
-    @Test
-    public void testTimerIgnoredOnCoordinatorLoadInProgressException() throws 
InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTimerIgnoredOnCoordinatorLoadInProgressException(boolean 
wrapException) throws InterruptedException {
         var mockTimer = new MockTimer();
         var callCount = new AtomicInteger(0);
 
         CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
             operation.generate();
             callCount.incrementAndGet();
-            return CompletableFuture.failedFuture(new 
CoordinatorLoadInProgressException("Loading"));
+            var ex = new CoordinatorLoadInProgressException("Loading");
+            return CompletableFuture.failedFuture(wrapException ? new 
CompletionException(ex) : ex);
         };
 
         var timer = new CoordinatorTimerImpl<>(
@@ -578,8 +585,9 @@ public class CoordinatorTimerImplTest {
         assertEquals(0, timer.size());
     }
 
-    @Test
-    public void testTaskCleanupOnFailedFutureWithoutOperationExecution() 
throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTaskCleanupOnFailedFutureWithoutOperationExecution(boolean 
wrapException) throws InterruptedException {
         var mockTimer = new MockTimer();
         var operationCalled = new AtomicBoolean(false);
 
@@ -588,7 +596,8 @@ public class CoordinatorTimerImplTest {
         // (2) events failing before being executed.
         CoordinatorShardScheduler<String> scheduler = (operationName, 
operation) -> {
             // Don't call operation.generate() - simulates event never being 
executed
-            return CompletableFuture.failedFuture(new 
NotCoordinatorException("Not coordinator"));
+            NotCoordinatorException ex = new NotCoordinatorException("Not 
coordinator");
+            return CompletableFuture.failedFuture(wrapException ? new 
CompletionException(ex) : ex);
         };
 
         var timer = new CoordinatorTimerImpl<>(

Reply via email to