This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4bef376efc MINOR: Ensure CoordinatorRuntime does not throw 
synchronous exceptions when returning futures (#21437)
e4bef376efc is described below

commit e4bef376efc9b99f2843d027b3650078ef67ad52
Author: David Jacot <[email protected]>
AuthorDate: Thu Feb 12 06:37:12 2026 +0100

    MINOR: Ensure CoordinatorRuntime does not throw synchronous exceptions when 
returning futures (#21437)
    
    Methods returning CompletableFuture should not throw synchronous
    exceptions. Callers expect all errors to be delivered through the
    future, not as synchronous exceptions.
    
    This change wraps the method bodies of scheduleWriteOperation,
    scheduleReadOperation, scheduleTransactionalWriteOperation, and
    scheduleTransactionCompletion in try-catch blocks to ensure all
    exceptions are captured in failed futures.
    
    For scheduleWriteAllOperation and scheduleReadAllOperation, the
    throwIfNotRunning() call is removed since they delegate to the
    single-operation methods which now handle errors properly.
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         | 136 +++++++--------
 .../common/runtime/CoordinatorRuntimeTest.java     | 182 +++++++++++++++++++++
 2 files changed, 251 insertions(+), 67 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index c09ae65e4e4..cf55ffbbd29 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -482,32 +482,20 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
             this.timer = new CoordinatorTimerImpl<>(
                 logContext,
                 CoordinatorRuntime.this.timer,
-                (operationName, operation) -> {
-                    try {
-                        return scheduleWriteOperation(
-                            operationName,
-                            tp,
-                            coordinator -> operation.generate()
-                        );
-                    } catch (Throwable t) {
-                        return CompletableFuture.failedFuture(t);
-                    }
-                }
+                (operationName, operation) -> scheduleWriteOperation(
+                    operationName,
+                    tp,
+                    coordinator -> operation.generate()
+                )
             );
             this.executor = new CoordinatorExecutorImpl<>(
                 logContext,
                 executorService,
-                (operationName, operation) -> {
-                    try {
-                        return scheduleWriteOperation(
-                            operationName,
-                            tp,
-                            coordinator -> operation.generate()
-                        );
-                    } catch (Throwable t) {
-                        return CompletableFuture.failedFuture(t);
-                    }
-                }
+                (operationName, operation) -> scheduleWriteOperation(
+                    operationName,
+                    tp,
+                    coordinator -> operation.generate()
+                )
             );
             this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
             this.cachedBufferSize = new AtomicLong(0);
@@ -2072,11 +2060,15 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         TopicPartition tp,
         CoordinatorWriteOperation<S, T, U> op
     ) {
-        throwIfNotRunning();
-        log.debug("Scheduled execution of write operation {}.", name);
-        CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(name, tp, 
writeTimeout, op);
-        enqueueLast(event);
-        return event.future;
+        try {
+            throwIfNotRunning();
+            log.debug("Scheduled execution of write operation {}.", name);
+            CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(name, 
tp, writeTimeout, op);
+            enqueueLast(event);
+            return event.future;
+        } catch (Throwable t) {
+            return CompletableFuture.failedFuture(t);
+        }
     }
 
     /**
@@ -2094,7 +2086,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         String name,
         CoordinatorWriteOperation<S, T, U> op
     ) {
-        throwIfNotRunning();
         log.debug("Scheduled execution of write all operation {}.", name);
         return coordinators
             .keySet()
@@ -2128,28 +2119,32 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         CoordinatorWriteOperation<S, T, U> op,
         int apiVersion
     ) {
-        throwIfNotRunning();
-        log.debug("Scheduled execution of transactional write operation {}.", 
name);
-        return partitionWriter.maybeStartTransactionVerification(
-            tp,
-            transactionalId,
-            producerId,
-            producerEpoch,
-            apiVersion
-        ).thenCompose(verificationGuard -> {
-            CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(
-                name,
+        try {
+            throwIfNotRunning();
+            log.debug("Scheduled execution of transactional write operation 
{}.", name);
+            return partitionWriter.maybeStartTransactionVerification(
                 tp,
                 transactionalId,
                 producerId,
                 producerEpoch,
-                verificationGuard,
-                writeTimeout,
-                op
-            );
-            enqueueLast(event);
-            return event.future;
-        });
+                apiVersion
+            ).thenCompose(verificationGuard -> {
+                CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(
+                    name,
+                    tp,
+                    transactionalId,
+                    producerId,
+                    producerEpoch,
+                    verificationGuard,
+                    writeTimeout,
+                    op
+                );
+                enqueueLast(event);
+                return event.future;
+            });
+        } catch (Throwable t) {
+            return CompletableFuture.failedFuture(t);
+        }
     }
 
     /**
@@ -2175,21 +2170,25 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         TransactionResult result,
         short transactionVersion
     ) {
-        throwIfNotRunning();
-        log.debug("Scheduled execution of transaction completion for {} with 
producer id={}, producer epoch={}, " +
-            "coordinator epoch={}, transaction version={} and transaction 
result={}.", tp, producerId, producerEpoch, coordinatorEpoch, 
transactionVersion, result);
-        CoordinatorCompleteTransactionEvent event = new 
CoordinatorCompleteTransactionEvent(
-            name,
-            tp,
-            producerId,
-            producerEpoch,
-            coordinatorEpoch,
-            result,
-            transactionVersion,
-            writeTimeout
-        );
-        enqueueLast(event);
-        return event.future;
+        try {
+            throwIfNotRunning();
+            log.debug("Scheduled execution of transaction completion for {} 
with producer id={}, producer epoch={}, " +
+                "coordinator epoch={}, transaction version={} and transaction 
result={}.", tp, producerId, producerEpoch, coordinatorEpoch, 
transactionVersion, result);
+            CoordinatorCompleteTransactionEvent event = new 
CoordinatorCompleteTransactionEvent(
+                name,
+                tp,
+                producerId,
+                producerEpoch,
+                coordinatorEpoch,
+                result,
+                transactionVersion,
+                writeTimeout
+            );
+            enqueueLast(event);
+            return event.future;
+        } catch (Throwable t) {
+            return CompletableFuture.failedFuture(t);
+        }
     }
 
     /**
@@ -2209,11 +2208,15 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         TopicPartition tp,
         CoordinatorReadOperation<S, T> op
     ) {
-        throwIfNotRunning();
-        log.debug("Scheduled execution of read operation {}.", name);
-        CoordinatorReadEvent<T> event = new CoordinatorReadEvent<>(name, tp, 
op);
-        enqueueLast(event);
-        return event.future;
+        try {
+            throwIfNotRunning();
+            log.debug("Scheduled execution of read operation {}.", name);
+            CoordinatorReadEvent<T> event = new CoordinatorReadEvent<>(name, 
tp, op);
+            enqueueLast(event);
+            return event.future;
+        } catch (Throwable t) {
+            return CompletableFuture.failedFuture(t);
+        }
     }
 
     /**
@@ -2231,7 +2234,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         String name,
         CoordinatorReadOperation<S, T> op
     ) {
-        throwIfNotRunning();
         log.debug("Scheduled execution of read all operation {}.", name);
         return coordinators
             .keySet()
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index cac48518f59..21faf09ca2b 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -1986,6 +1986,188 @@ public class CoordinatorRuntimeTest {
         verify(executorService).shutdown();
     }
 
+    @Test
+    public void testScheduleWriteOpWhenClosed() throws Exception {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withWriteTimeout(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
+                .build();
+
+        // Close the runtime.
+        runtime.close();
+
+        // Scheduling a write after close returns a failed future instead of 
throwing.
+        CompletableFuture<String> future = 
runtime.scheduleWriteOperation("write", TP,
+            state -> new CoordinatorResult<>(List.of(), "response"));
+        assertFutureThrows(NotCoordinatorException.class, future);
+    }
+
+    @Test
+    public void testScheduleReadOpWhenClosed() throws Exception {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withWriteTimeout(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
+                .build();
+
+        // Close the runtime.
+        runtime.close();
+
+        // Scheduling a read after close returns a failed future instead of 
throwing.
+        CompletableFuture<String> future = 
runtime.scheduleReadOperation("read", TP,
+            (state, offset) -> "response");
+        assertFutureThrows(NotCoordinatorException.class, future);
+    }
+
+    @Test
+    public void testScheduleTransactionalWriteOpWhenClosed() throws Exception {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withWriteTimeout(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
+                .build();
+
+        // Close the runtime.
+        runtime.close();
+
+        // Scheduling a transactional write after close returns a failed 
future instead of throwing.
+        CompletableFuture<String> future = 
runtime.scheduleTransactionalWriteOperation(
+            "txn-write",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 50,
+            state -> new CoordinatorResult<>(List.of(), "response"),
+            TXN_OFFSET_COMMIT_LATEST_VERSION
+        );
+        assertFutureThrows(NotCoordinatorException.class, future);
+    }
+
+    @Test
+    public void testScheduleTransactionCompletionWhenClosed() throws Exception 
{
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withWriteTimeout(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
+                .build();
+
+        // Close the runtime.
+        runtime.close();
+
+        // Scheduling a transaction completion after close returns a failed 
future instead of throwing.
+        CompletableFuture<Void> future = runtime.scheduleTransactionCompletion(
+            "txn-completion",
+            TP,
+            100L,
+            (short) 50,
+            10,
+            TransactionResult.COMMIT,
+            (short) 1
+        );
+        assertFutureThrows(NotCoordinatorException.class, future);
+    }
+
+    @Test
+    public void testScheduleWriteAllOpWhenClosed() throws Exception {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withWriteTimeout(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
+                .build();
+
+        // Close the runtime.
+        runtime.close();
+
+        // Scheduling a write all operation after close returns an empty list 
instead of throwing.
+        List<CompletableFuture<String>> futures = 
runtime.scheduleWriteAllOperation("write-all",
+            state -> new CoordinatorResult<>(List.of(), "response"));
+        assertEquals(List.of(), futures);
+    }
+
+    @Test
+    public void testScheduleReadAllOpWhenClosed() throws Exception {
+        MockTimer timer = new MockTimer();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withWriteTimeout(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(new MockPartitionWriter())
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withExecutorService(mock(ExecutorService.class))
+                .withCachedBufferMaxBytesSupplier(() -> 
CACHED_BUFFER_MAX_BYTES)
+                .build();
+
+        // Close the runtime.
+        runtime.close();
+
+        // Scheduling a read all operation after close returns an empty list 
instead of throwing.
+        List<CompletableFuture<String>> futures = 
runtime.scheduleReadAllOperation("read-all",
+            (state, offset) -> "response");
+        assertEquals(List.of(), futures);
+    }
+
     @Test
     public void testOnMetadataUpdate() {
         TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);

Reply via email to