This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new cae1e17807a KAFKA-19857: Fix CoordinatorExecutorImpl.cancelAll
implementation (#20808)
cae1e17807a is described below
commit cae1e17807addbc82d6bae58acedecbc1f1fcf2a
Author: Sean Quah <[email protected]>
AuthorDate: Fri Nov 14 19:15:44 2025 +0000
KAFKA-19857: Fix CoordinatorExecutorImpl.cancelAll implementation (#20808)
The previous implementation was buggy and always threw an
IllegalStateException when there were scheduled tasks because it called
Iterator.remove() without a preceding call to Iterator.next().
Reviewers: David Jacot <[email protected]>
---
.../common/runtime/CoordinatorExecutorImpl.java | 6 +-
.../runtime/CoordinatorExecutorImplTest.java | 70 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 5 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
index f9a417b0e86..efdc521cfa7 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.time.Duration;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -139,9 +138,6 @@ public class CoordinatorExecutorImpl<S extends
CoordinatorShard<U>, U> implement
}
public void cancelAll() {
- Iterator<String> iterator = tasks.keySet().iterator();
- while (iterator.hasNext()) {
- iterator.remove();
- }
+ tasks.clear();
}
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index d5ac1be7820..cdd6933d9a5 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -23,11 +23,14 @@ import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -36,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -314,4 +318,70 @@ public class CoordinatorExecutorImplTest {
assertFalse(operationCalled.get());
assertFalse(executor.isScheduled(TASK_KEY));
}
+
+ @Test
+ public void testCancelAllTasks() {
+ CoordinatorShard<String> coordinatorShard =
mock(CoordinatorShard.class);
+ CoordinatorRuntime<CoordinatorShard<String>, String> runtime =
mock(CoordinatorRuntime.class);
+ ExecutorService executorService = mock(ExecutorService.class);
+ CoordinatorExecutorImpl<CoordinatorShard<String>, String> executor =
new CoordinatorExecutorImpl<>(
+ LOG_CONTEXT,
+ SHARD_PARTITION,
+ runtime,
+ executorService,
+ WRITE_TIMEOUT
+ );
+
+
List<CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>,
Void, String>> writeOperations = new ArrayList<>();
+ List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
+ when(runtime.scheduleWriteOperation(
+ anyString(),
+ eq(SHARD_PARTITION),
+ eq(WRITE_TIMEOUT),
+ any()
+ )).thenAnswer(args -> {
+ writeOperations.add(args.getArgument(3));
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+ writeFutures.add(writeFuture);
+ return writeFuture;
+ });
+
+ when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
+ Runnable op = args.getArgument(0);
+ op.run();
+ return CompletableFuture.completedFuture(null);
+ });
+
+ AtomicInteger taskCallCount = new AtomicInteger(0);
+ CoordinatorExecutor.TaskRunnable<String> taskRunnable = () -> {
+ taskCallCount.incrementAndGet();
+ return "Hello!";
+ };
+
+ AtomicInteger operationCallCount = new AtomicInteger(0);
+ CoordinatorExecutor.TaskOperation<String, String> taskOperation =
(result, exception) -> {
+ operationCallCount.incrementAndGet();
+ return null;
+ };
+
+ for (int i = 0; i < 2; i++) {
+ executor.schedule(
+ TASK_KEY + i,
+ taskRunnable,
+ taskOperation
+ );
+ }
+
+ executor.cancelAll();
+
+ for (int i = 0; i < writeOperations.size(); i++) {
+
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> writeOperation = writeOperations.get(i);
+ CompletableFuture<Void> writeFuture = writeFutures.get(i);
+ Throwable ex = assertThrows(RejectedExecutionException.class, ()
-> writeOperation.generateRecordsAndResult(coordinatorShard));
+ writeFuture.completeExceptionally(ex);
+ }
+
+ assertEquals(2, taskCallCount.get());
+ assertEquals(0, operationCallCount.get());
+ }
}