This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3255a732298 KAFKA-19831: removed the futureless remove task as it's no
longer used. (#20843)
3255a732298 is described below
commit 3255a73229808a6154c077206fac9ec8d00d34df
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Nov 7 09:40:23 2025 -0800
KAFKA-19831: removed the futureless remove task as it's no longer used.
(#20843)
The usage of the futureless remove task was removed in PR #15896 This
PR cleans up the old code that's no longer used.
Reviewers: Matthias J. Sax <[email protected]>
---
.../processor/internals/DefaultStateUpdater.java | 33 +---------------------
.../streams/processor/internals/TaskAndAction.java | 7 +----
2 files changed, 2 insertions(+), 38 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index a3a44f6f02d..880980a17c5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -208,11 +208,7 @@ public class DefaultStateUpdater implements StateUpdater {
addTask(taskAndAction.task());
break;
case REMOVE:
- if (taskAndAction.futureForRemove() == null) {
- removeTask(taskAndAction.taskId());
- } else {
- removeTask(taskAndAction.taskId(),
taskAndAction.futureForRemove());
- }
+ removeTask(taskAndAction.taskId(),
taskAndAction.futureForRemove());
break;
default:
throw new IllegalStateException("Unknown action
type " + action);
@@ -607,33 +603,6 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
- private void removeTask(final TaskId taskId) {
- final Task task;
- if (updatingTasks.containsKey(taskId)) {
- task = updatingTasks.get(taskId);
- measureCheckpointLatency(() -> task.maybeCheckpoint(true));
- final Collection<TopicPartition> changelogPartitions =
task.changelogPartitions();
- changelogReader.unregister(changelogPartitions);
- removedTasks.add(task);
- updatingTasks.remove(taskId);
- if (task.isActive()) {
- transitToUpdateStandbysIfOnlyStandbysLeft();
- }
- log.info((task.isActive() ? "Active" : "Standby")
- + " task " + task.id() + " was removed from the updating
tasks and added to the removed tasks.");
- } else if (pausedTasks.containsKey(taskId)) {
- task = pausedTasks.get(taskId);
- final Collection<TopicPartition> changelogPartitions =
task.changelogPartitions();
- changelogReader.unregister(changelogPartitions);
- removedTasks.add(task);
- pausedTasks.remove(taskId);
- log.info((task.isActive() ? "Active" : "Standby")
- + " task " + task.id() + " was removed from the paused
tasks and added to the removed tasks.");
- } else {
- log.info("Task " + taskId + " was not removed since it is not
updating or paused.");
- }
- }
-
private void pauseTask(final Task task) {
final TaskId taskId = task.id();
// do not need to unregister changelog partitions for paused tasks
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
index b9c07151cfa..ec6c6830bbd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
@@ -55,11 +55,6 @@ public class TaskAndAction {
return new TaskAndAction(null, taskId, Action.REMOVE, future);
}
- public static TaskAndAction createRemoveTask(final TaskId taskId) {
- Objects.requireNonNull(taskId, "Task ID of task to remove is null!");
- return new TaskAndAction(null, taskId, Action.REMOVE, null);
- }
-
public Task task() {
if (action != Action.ADD) {
throw new IllegalStateException("Action type " + action + " cannot
have a task!");
@@ -84,4 +79,4 @@ public class TaskAndAction {
public Action action() {
return action;
}
-}
\ No newline at end of file
+}