This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9462c277b8 KAFKA-19857: Fix CoordinatorExecutorImpl.cancelAll 
implementation (#20808)
d9462c277b8 is described below

commit d9462c277b885e54b9ff7d8ef31aff09ff270c5e
Author: Sean Quah <[email protected]>
AuthorDate: Fri Nov 14 19:15:44 2025 +0000

    KAFKA-19857: Fix CoordinatorExecutorImpl.cancelAll implementation (#20808)
    
    The previous implementation was buggy and always threw an
    IllegalStateException when there were scheduled tasks because it called
    Iterator.remove() without a preceding call to Iterator.next().
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorExecutorImpl.java    |  6 +-
 .../runtime/CoordinatorExecutorImplTest.java       | 69 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 5 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
index 986bcd1c5b0..28d1c4a0394 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
 import java.time.Duration;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -128,9 +127,6 @@ public class CoordinatorExecutorImpl<S extends 
CoordinatorShard<U>, U> implement
     }
 
     public void cancelAll() {
-        Iterator<String> iterator = tasks.keySet().iterator();
-        while (iterator.hasNext()) {
-            iterator.remove();
-        }
+        tasks.clear();
     }
 }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index 4f5e917f179..b2a82a6d707 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -23,11 +23,13 @@ import org.apache.kafka.server.util.FutureUtils;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -36,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -314,4 +317,70 @@ public class CoordinatorExecutorImplTest {
         assertFalse(operationCalled.get());
         assertFalse(executor.isScheduled(TASK_KEY));
     }
+
+    @Test
+    public void testCancelAllTasks() {
+        CoordinatorShard<String> coordinatorShard = 
mock(CoordinatorShard.class);
+        CoordinatorRuntime<CoordinatorShard<String>, String> runtime = 
mock(CoordinatorRuntime.class);
+        ExecutorService executorService = mock(ExecutorService.class);
+        CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor = 
new CoordinatorExecutorImpl<>(
+            LOG_CONTEXT,
+            SHARD_PARTITION,
+            runtime,
+            executorService,
+            WRITE_TIMEOUT
+        );
+
+        
List<CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, 
Void, String>> writeOperations = new ArrayList<>();
+        List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
+        when(runtime.scheduleWriteOperation(
+            anyString(),
+            eq(SHARD_PARTITION),
+            eq(WRITE_TIMEOUT),
+            any()
+        )).thenAnswer(args -> {
+            writeOperations.add(args.getArgument(3));
+            CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+            writeFutures.add(writeFuture);
+            return writeFuture;
+        });
+
+        when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
+            Runnable op = args.getArgument(0);
+            op.run();
+            return CompletableFuture.completedFuture(null);
+        });
+
+        AtomicInteger taskCallCount = new AtomicInteger(0);
+        CoordinatorExecutor.TaskRunnable<String> taskRunnable = () -> {
+            taskCallCount.incrementAndGet();
+            return "Hello!";
+        };
+
+        AtomicInteger operationCallCount = new AtomicInteger(0);
+        CoordinatorExecutor.TaskOperation<String, String> taskOperation = 
(result, exception) -> {
+            operationCallCount.incrementAndGet();
+            return null;
+        };
+
+        for (int i = 0; i < 2; i++) {
+            executor.schedule(
+                TASK_KEY + i,
+                taskRunnable,
+                taskOperation
+            );
+        }
+
+        executor.cancelAll();
+
+        for (int i = 0; i < writeOperations.size(); i++) {
+            
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> writeOperation = writeOperations.get(i);
+            CompletableFuture<Void> writeFuture = writeFutures.get(i);
+            Throwable ex = assertThrows(RejectedExecutionException.class, () 
-> writeOperation.generateRecordsAndResult(coordinatorShard));
+            writeFuture.completeExceptionally(ex);
+        }
+
+        assertEquals(2, taskCallCount.get());
+        assertEquals(0, operationCallCount.get());
+    }
 }

Reply via email to