This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3255a732298 KAFKA-19831: removed the futureless remove task as it's no 
longer used. (#20843)
3255a732298 is described below

commit 3255a73229808a6154c077206fac9ec8d00d34df
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Nov 7 09:40:23 2025 -0800

    KAFKA-19831: removed the futureless remove task as it's no longer used. 
(#20843)
    
    The usage of the futureless remove task was removed in PR #15896  This
    PR cleans up the old code that's no longer used.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../processor/internals/DefaultStateUpdater.java   | 33 +---------------------
 .../streams/processor/internals/TaskAndAction.java |  7 +----
 2 files changed, 2 insertions(+), 38 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index a3a44f6f02d..880980a17c5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -208,11 +208,7 @@ public class DefaultStateUpdater implements StateUpdater {
                             addTask(taskAndAction.task());
                             break;
                         case REMOVE:
-                            if (taskAndAction.futureForRemove() == null) {
-                                removeTask(taskAndAction.taskId());
-                            } else {
-                                removeTask(taskAndAction.taskId(), 
taskAndAction.futureForRemove());
-                            }
+                            removeTask(taskAndAction.taskId(), 
taskAndAction.futureForRemove());
                             break;
                         default:
                             throw new IllegalStateException("Unknown action 
type " + action);
@@ -607,33 +603,6 @@ public class DefaultStateUpdater implements StateUpdater {
             }
         }
 
-        private void removeTask(final TaskId taskId) {
-            final Task task;
-            if (updatingTasks.containsKey(taskId)) {
-                task = updatingTasks.get(taskId);
-                measureCheckpointLatency(() -> task.maybeCheckpoint(true));
-                final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
-                changelogReader.unregister(changelogPartitions);
-                removedTasks.add(task);
-                updatingTasks.remove(taskId);
-                if (task.isActive()) {
-                    transitToUpdateStandbysIfOnlyStandbysLeft();
-                }
-                log.info((task.isActive() ? "Active" : "Standby")
-                    + " task " + task.id() + " was removed from the updating 
tasks and added to the removed tasks.");
-            } else if (pausedTasks.containsKey(taskId)) {
-                task = pausedTasks.get(taskId);
-                final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
-                changelogReader.unregister(changelogPartitions);
-                removedTasks.add(task);
-                pausedTasks.remove(taskId);
-                log.info((task.isActive() ? "Active" : "Standby")
-                    + " task " + task.id() + " was removed from the paused 
tasks and added to the removed tasks.");
-            } else {
-                log.info("Task " + taskId + " was not removed since it is not 
updating or paused.");
-            }
-        }
-
         private void pauseTask(final Task task) {
             final TaskId taskId = task.id();
             // do not need to unregister changelog partitions for paused tasks
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
index b9c07151cfa..ec6c6830bbd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
@@ -55,11 +55,6 @@ public class TaskAndAction {
         return new TaskAndAction(null, taskId, Action.REMOVE, future);
     }
 
-    public static TaskAndAction createRemoveTask(final TaskId taskId) {
-        Objects.requireNonNull(taskId, "Task ID of task to remove is null!");
-        return new TaskAndAction(null, taskId, Action.REMOVE, null);
-    }
-
     public Task task() {
         if (action != Action.ADD) {
             throw new IllegalStateException("Action type " + action + " cannot 
have a task!");
@@ -84,4 +79,4 @@ public class TaskAndAction {
     public Action action() {
         return action;
     }
-}
\ No newline at end of file
+}

Reply via email to