This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 923086dba24 KAFKA-19171: Kafka Streams crashes with 
UnsupportedOperationException (#19507)
923086dba24 is described below

commit 923086dba246297c916ee43331459ca2ba6a595f
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Thu May 15 21:37:04 2025 -0700

    KAFKA-19171: Kafka Streams crashes with UnsupportedOperationException 
(#19507)
    
    This PR fixes a regression bug introduced with KAFKA-17203. We need to
    pass in mutable collections into `closeTaskClean(...)`.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Bruno Cadonna 
<br...@confluent.io>, Lucas Brutschy <lbruts...@confluent.io>
---
 .../apache/kafka/streams/processor/internals/TaskManager.java | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7b360a7606d..8e51c4215ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -513,8 +513,14 @@ public class TaskManager {
 
     private void handleTasksPendingInitialization() {
         // All tasks pending initialization are not part of the usual 
bookkeeping
+
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+
         for (final Task task : tasks.drainPendingTasksToInit()) {
-            closeTaskClean(task, Collections.emptySet(), 
Collections.emptyMap());
+            closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
+        }
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task, false);
         }
     }
 
@@ -1245,7 +1251,6 @@ public class TaskManager {
     private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
         if (stateUpdater != null) {
             final Map<TaskId, 
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new 
LinkedHashMap<>();
-            final Map<TaskId, RuntimeException> failedTasksDuringCleanClose = 
new HashMap<>();
             final Set<Task> tasksToCloseClean = new 
HashSet<>(tasks.drainPendingActiveTasksToInit());
             final Set<Task> tasksToCloseDirty = new HashSet<>();
             for (final Task restoringTask : stateUpdater.tasks()) {
@@ -1256,7 +1261,7 @@ public class TaskManager {
 
             addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
             for (final Task task : tasksToCloseClean) {
-                closeTaskClean(task, tasksToCloseDirty, 
failedTasksDuringCleanClose);
+                closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
             }
             for (final Task task : tasksToCloseDirty) {
                 closeTaskDirty(task, false);

Reply via email to