This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 923086dba24 KAFKA-19171: Kafka Streams crashes with UnsupportedOperationException (#19507) 923086dba24 is described below commit 923086dba246297c916ee43331459ca2ba6a595f Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Thu May 15 21:37:04 2025 -0700 KAFKA-19171: Kafka Streams crashes with UnsupportedOperationException (#19507) This PR fixes a regression bug introduced with KAFKA-17203. We need to pass in mutable collections into `closeTaskClean(...)`. Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Bruno Cadonna <br...@confluent.io>, Lucas Brutschy <lbruts...@confluent.io> --- .../apache/kafka/streams/processor/internals/TaskManager.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 7b360a7606d..8e51c4215ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -513,8 +513,14 @@ public class TaskManager { private void handleTasksPendingInitialization() { // All tasks pending initialization are not part of the usual bookkeeping + + final Set<Task> tasksToCloseDirty = new HashSet<>(); + for (final Task task : tasks.drainPendingTasksToInit()) { - closeTaskClean(task, Collections.emptySet(), Collections.emptyMap()); + closeTaskClean(task, tasksToCloseDirty, new HashMap<>()); + } + for (final Task task : tasksToCloseDirty) { + closeTaskDirty(task, false); } } @@ -1245,7 +1251,6 @@ public class TaskManager { private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() { if (stateUpdater != null) { final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>(); - final Map<TaskId, RuntimeException> failedTasksDuringCleanClose = new HashMap<>(); final Set<Task> tasksToCloseClean = new HashSet<>(tasks.drainPendingActiveTasksToInit()); final Set<Task> tasksToCloseDirty = new HashSet<>(); for (final Task restoringTask : stateUpdater.tasks()) { @@ -1256,7 +1261,7 @@ public class TaskManager { addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty); for (final Task task : tasksToCloseClean) { - closeTaskClean(task, tasksToCloseDirty, failedTasksDuringCleanClose); + closeTaskClean(task, tasksToCloseDirty, new HashMap<>()); } for (final Task task : tasksToCloseDirty) { closeTaskDirty(task, false);