This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 309e0f986e KAFKA-10199: Add PAUSE in state updater (#12386)
309e0f986e is described below

commit 309e0f986e97be966c797f7729eb1e94ef5400a9
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Mon Jul 18 16:42:48 2022 -0700

    KAFKA-10199: Add PAUSE in state updater (#12386)
    
    * Add pause action to task-updater.
    * When removing a task, also check in the paused tasks in addition to 
removed tasks.
    * Also I realized we do not check if tasks with the same id are added, so I 
add that check in this PR as well.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 .../processor/internals/DefaultStateUpdater.java   |  62 ++++-
 .../streams/processor/internals/StateUpdater.java  |  13 +
 .../processor/internals/StoreChangelogReader.java  |   2 +-
 .../streams/processor/internals/TaskAndAction.java |  10 +-
 .../internals/DefaultStateUpdaterTest.java         | 282 ++++++++++++++++++++-
 .../processor/internals/TaskAndActionTest.java     |  20 ++
 6 files changed, 379 insertions(+), 10 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 22fd48a4ab..08959bee00 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -83,7 +83,7 @@ public class DefaultStateUpdater implements StateUpdater {
         }
 
         public boolean onlyStandbyTasksLeft() {
-            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
+            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);
         }
 
         @Override
@@ -125,6 +125,9 @@ public class DefaultStateUpdater implements StateUpdater {
                         case REMOVE:
                             removeTask(taskAndAction.getTaskId());
                             break;
+                        case PAUSE:
+                            pauseTask(taskAndAction.getTaskId());
+                            break;
                     }
                 }
             } finally {
@@ -243,7 +246,12 @@ public class DefaultStateUpdater implements StateUpdater {
                 addToRestoredTasks((StreamTask) task);
                 log.debug("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
             } else {
-                updatingTasks.put(task.id(), task);
+                final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
+                if (existingTask != null) {
+                    throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
+                        "should not try to add another " + (task.isActive() ? 
"Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+                }
+
                 if (task.isActive()) {
                     log.debug("Stateful active task " + task.id() + " was 
added to the updating tasks of the state updater");
                     changelogReader.enforceRestoreActive();
@@ -257,8 +265,9 @@ public class DefaultStateUpdater implements StateUpdater {
         }
 
         private void removeTask(final TaskId taskId) {
-            final Task task = updatingTasks.get(taskId);
-            if (task != null) {
+            final Task task;
+            if (updatingTasks.containsKey(taskId)) {
+                task = updatingTasks.get(taskId);
                 task.maybeCheckpoint(true);
                 final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
                 changelogReader.unregister(changelogPartitions);
@@ -267,8 +276,31 @@ public class DefaultStateUpdater implements StateUpdater {
                 transitToUpdateStandbysIfOnlyStandbysLeft();
                 log.debug((task.isActive() ? "Active" : "Standby")
                     + " task " + task.id() + " was removed from the updating 
tasks and added to the removed tasks.");
+            } else if (pausedTasks.containsKey(taskId)) {
+                task = pausedTasks.get(taskId);
+                final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
+                changelogReader.unregister(changelogPartitions);
+                removedTasks.add(task);
+                pausedTasks.remove(taskId);
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was removed from the paused 
tasks and added to the removed tasks.");
+            } else {
+                log.debug("Task " + taskId + " was not removed since it is not 
updating or paused.");
+            }
+        }
+
+        private void pauseTask(final TaskId taskId) {
+            final Task task = updatingTasks.get(taskId);
+            if (task != null) {
+                // do not need to unregister changelog partitions for paused 
tasks
+                task.maybeCheckpoint(true);
+                pausedTasks.put(taskId, task);
+                updatingTasks.remove(taskId);
+                transitToUpdateStandbysIfOnlyStandbysLeft();
+                log.debug((task.isActive() ? "Active" : "Standby")
+                    + " task " + task.id() + " was paused from the updating 
tasks and added to the paused tasks.");
             } else {
-                log.debug("Task " + taskId + " was not removed since it is not 
updating.");
+                log.debug("Task " + taskId + " was not paused since it is not 
updating.");
             }
         }
 
@@ -333,6 +365,7 @@ public class DefaultStateUpdater implements StateUpdater {
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
     private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
     private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
+    private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>();
 
     private final long commitIntervalMs;
     private long lastCommitMs;
@@ -407,6 +440,17 @@ public class DefaultStateUpdater implements StateUpdater {
         }
     }
 
+    @Override
+    public void pause(final TaskId taskId) {
+        tasksAndActionsLock.lock();
+        try {
+            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
+            tasksAndActionsCondition.signalAll();
+        } finally {
+            tasksAndActionsLock.unlock();
+        }
+    }
+
     @Override
     public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
         final long timeoutMs = timeout.toMillis();
@@ -478,6 +522,10 @@ public class DefaultStateUpdater implements StateUpdater {
         return Collections.unmodifiableSet(new HashSet<>(removedTasks));
     }
 
+    public Set<Task> getPausedTasks() {
+        return Collections.unmodifiableSet(new 
HashSet<>(pausedTasks.values()));
+    }
+
     @Override
     public Set<Task> getTasks() {
         return executeWithQueuesLocked(() -> 
getStreamOfTasks().collect(Collectors.toSet()));
@@ -520,6 +568,8 @@ public class DefaultStateUpdater implements StateUpdater {
                         restoredActiveTasks.stream(),
                         Stream.concat(
                             
exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> 
exceptionAndTasks.getTasks().stream()),
-                            removedTasks.stream()))));
+                            Stream.concat(
+                                getPausedTasks().stream(),
+                                removedTasks.stream())))));
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 1b229bc818..516e47436b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -94,6 +94,19 @@ public interface StateUpdater {
      */
     void remove(final TaskId taskId);
 
+    /**
+     * Pause a task (active or standby) from restoring in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * Restored tasks, removed tasks and failed tasks are not paused so this 
action would be an no-op for them.
+     * Stateless tasks will never be paused since they are immediately added 
to the
+     * restored active tasks.
+     *
+     * @param taskId ID of the task to remove
+     */
+    void pause(final TaskId taskId);
+
     /**
      * Drains the restored active tasks from the state updater.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5240534ce7..f8926e70bb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -481,7 +481,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
     }
 
     private void pauseResumePartitions(final Map<TaskId, Task> tasks,
-        final Set<TopicPartition> restoringChangelogs) {
+                                       final Set<TopicPartition> 
restoringChangelogs) {
         if (state == ChangelogReaderState.ACTIVE_RESTORING) {
             updatePartitionsByType(tasks, restoringChangelogs, 
TaskType.ACTIVE);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
index 4c4316a864..585374c339 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
@@ -24,7 +24,8 @@ public class TaskAndAction {
 
     enum Action {
         ADD,
-        REMOVE
+        REMOVE,
+        PAUSE
     }
 
     private final Task task;
@@ -47,6 +48,11 @@ public class TaskAndAction {
         return new TaskAndAction(null, taskId, Action.REMOVE);
     }
 
+    public static TaskAndAction createPauseTask(final TaskId taskId) {
+        Objects.requireNonNull(taskId, "Task ID of task to pause is null!");
+        return new TaskAndAction(null, taskId, Action.PAUSE);
+    }
+
     public Task getTask() {
         if (action != Action.ADD) {
             throw new IllegalStateException("Action type " + action + " cannot 
have a task!");
@@ -55,7 +61,7 @@ public class TaskAndAction {
     }
 
     public TaskId getTaskId() {
-        if (action != Action.REMOVE) {
+        if (action != Action.REMOVE && action != Action.PAUSE) {
             throw new IllegalStateException("Action type " + action + " cannot 
have a task ID!");
         }
         return taskId;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 465ae4a1c5..14b8237fe7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -83,7 +83,6 @@ class DefaultStateUpdaterTest {
     private final Time time = new MockTime(1L);
     private final StreamsConfig config = new StreamsConfig(configProps());
     private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
-    private final java.util.function.Consumer<Set<TopicPartition>> 
offsetResetter = topicPartitions -> { };
     private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, time);
 
     @AfterEach
@@ -152,6 +151,42 @@ class DefaultStateUpdaterTest {
         }
     }
 
+    @Test
+    public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
+        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws 
Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task1, task2);
+    }
+
+    @Test
+    public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws 
Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        shouldThrowIfAddingTasksWithSameId(task2, task1);
+    }
+
+    private void shouldThrowIfAddingTasksWithSameId(final Task task1, final 
Task task2) throws Exception {
+        stateUpdater.start();
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+
+        verifyFailedTasks(IllegalStateException.class, task1);
+    }
+
     @Test
     public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() 
throws Exception {
         final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
@@ -177,6 +212,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
     }
 
     @Test
@@ -200,6 +236,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
         verify(changelogReader, times(1)).enforceRestoreActive();
         verify(changelogReader, atLeast(3)).restore(anyMap());
         verify(changelogReader, never()).transitToUpdateStandby();
@@ -231,6 +268,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
         verify(changelogReader, times(3)).enforceRestoreActive();
         verify(changelogReader, atLeast(4)).restore(anyMap());
         verify(changelogReader, never()).transitToUpdateStandby();
@@ -286,6 +324,7 @@ class DefaultStateUpdaterTest {
         verifyRestoredActiveTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
         verify(changelogReader, times(1)).transitToUpdateStandby();
         verify(changelogReader, 
timeout(VERIFICATION_TIMEOUT).atLeast(1)).restore(anyMap());
         verify(changelogReader, never()).enforceRestoreActive();
@@ -314,6 +353,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingStandbyTasks(task4, task3);
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
         verify(changelogReader, atLeast(3)).restore(anyMap());
         final InOrder orderVerifier = inOrder(changelogReader, task1, task2);
         orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive();
@@ -424,10 +464,37 @@ class DefaultStateUpdaterTest {
         verifyCheckpointTasks(true, task);
         verifyRestoredActiveTasks();
         verifyUpdatingTasks();
+        verifyPausedTasks();
         verifyExceptionsAndFailedTasks();
         verify(changelogReader).unregister(task.changelogPartitions());
     }
 
+    @Test
+    public void shouldRemovePausedTask() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+
+        stateUpdater.start();
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+
+        stateUpdater.pause(task1.id());
+        stateUpdater.pause(task2.id());
+
+        verifyPausedTasks(task1, task2);
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+
+        stateUpdater.remove(task1.id());
+        stateUpdater.remove(task2.id());
+
+        verifyRemovedTasks(task1, task2);
+        verifyPausedTasks();
+        verifyCheckpointTasks(true, task1, task2);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
         final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
@@ -455,6 +522,7 @@ class DefaultStateUpdaterTest {
         verifyRemovedTasks(controlTask);
         verifyRestoredActiveTasks(task);
         verifyUpdatingTasks();
+        verifyPausedTasks();
         verifyExceptionsAndFailedTasks();
     }
 
@@ -493,11 +561,168 @@ class DefaultStateUpdaterTest {
         stateUpdater.remove(controlTask.id());
 
         verifyRemovedTasks(controlTask);
+        verifyPausedTasks();
         verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
         verifyUpdatingTasks();
         verifyRestoredActiveTasks();
     }
 
+    @Test
+    public void shouldPauseActiveStatefulTask() throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, never()).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldPauseStatefulTask(task);
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    @Test
+    public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws 
Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+
+        stateUpdater.start();
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+
+        stateUpdater.pause(task1.id());
+
+        verifyPausedTasks(task1);
+        verifyCheckpointTasks(true, task1);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks(task2);
+        verifyExceptionsAndFailedTasks();
+        verify(changelogReader, times(1)).enforceRestoreActive();
+        verify(changelogReader, times(1)).transitToUpdateStandby();
+    }
+
+    private void shouldPauseStatefulTask(final Task task) throws Exception {
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        stateUpdater.pause(task.id());
+
+        verifyPausedTasks(task);
+        verifyCheckpointTasks(true, task);
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldIgnorePausingNotExistTasks() throws Exception {
+        stateUpdater.start();
+        stateUpdater.pause(TASK_0_0);
+
+        verifyPausedTasks();
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.pause(task.id());
+        stateUpdater.pause(controlTask.id());
+
+        verifyPausedTasks(controlTask);
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskInFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskInFailedTasks(task);
+    }
+
+    private void shouldNotPauseTaskInFailedTasks(final Task task) throws 
Exception {
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new 
StreamsException("Something happened", task.id());
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        final Map<TaskId, Task> updatingTasks = mkMap(
+            mkEntry(task.id(), task),
+            mkEntry(controlTask.id(), controlTask)
+        );
+        doThrow(streamsException)
+            .doNothing()
+            .when(changelogReader).restore(updatingTasks);
+        stateUpdater.start();
+
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        final ExceptionAndTasks expectedExceptionAndTasks = new 
ExceptionAndTasks(mkSet(task), streamsException);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+        stateUpdater.pause(task.id());
+        stateUpdater.pause(controlTask.id());
+
+        verifyPausedTasks(controlTask);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        verifyUpdatingTasks();
+        verifyRestoredActiveTasks();
+    }
+
+    @Test
+    public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskInRemovedTasks(task);
+    }
+
+    @Test
+    public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskInRemovedTasks(task);
+    }
+
+    private void shouldNotPauseTaskInRemovedTasks(final Task task) throws 
Exception {
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        stateUpdater.remove(task.id());
+
+        verifyRemovedTasks(task);
+        verifyCheckpointTasks(true, task);
+        verifyRestoredActiveTasks();
+        verifyUpdatingTasks();
+        verifyPausedTasks();
+        verifyExceptionsAndFailedTasks();
+        verify(changelogReader).unregister(task.changelogPartitions());
+
+        stateUpdater.pause(task.id());
+
+        verifyRemovedTasks(task);
+        verifyUpdatingTasks();
+        verifyPausedTasks();
+    }
+
     @Test
     public void shouldDrainRemovedTasks() throws Exception {
         assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
@@ -543,6 +768,7 @@ class DefaultStateUpdaterTest {
         final ExceptionAndTasks expectedExceptionAndTasks = new 
ExceptionAndTasks(mkSet(task1, task2), streamsException);
         verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
         verifyRemovedTasks();
+        verifyPausedTasks();
         verifyUpdatingTasks();
         verifyRestoredActiveTasks();
     }
@@ -582,6 +808,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingTasks(task2);
         verifyRestoredActiveTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
     }
 
     @Test
@@ -630,6 +857,7 @@ class DefaultStateUpdaterTest {
         verifyUpdatingTasks();
         verifyRestoredActiveTasks();
         verifyRemovedTasks();
+        verifyPausedTasks();
     }
 
     @Test
@@ -882,6 +1110,22 @@ class DefaultStateUpdaterTest {
         verifyGetTasks(mkSet(), mkSet());
     }
 
+    @Test
+    public void shouldGetTasksFromPausedTasks() throws Exception {
+        final StreamTask activeTask = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StandbyTask standbyTask = 
createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        stateUpdater.start();
+        stateUpdater.add(activeTask);
+        stateUpdater.add(standbyTask);
+
+        stateUpdater.pause(activeTask.id());
+        stateUpdater.pause(standbyTask.id());
+
+        verifyPausedTasks(activeTask, standbyTask);
+
+        verifyGetTasks(mkSet(activeTask), mkSet(standbyTask));
+    }
+
     private void verifyGetTasks(final Set<StreamTask> expectedActiveTasks,
                                 final Set<StandbyTask> expectedStandbyTasks) {
         final Set<Task> tasks = stateUpdater.getTasks();
@@ -983,6 +1227,24 @@ class DefaultStateUpdaterTest {
         }
     }
 
+    private void verifyPausedTasks(final Task... tasks) throws Exception {
+        if (tasks.length == 0) {
+            assertTrue(stateUpdater.getPausedTasks().isEmpty());
+        } else {
+            final Set<Task> expectedPausedTasks = mkSet(tasks);
+            final Set<Task> pausedTasks = new HashSet<>();
+            waitForCondition(
+                () -> {
+                    pausedTasks.addAll(stateUpdater.getPausedTasks());
+                    return pausedTasks.containsAll(expectedPausedTasks)
+                        && pausedTasks.size() == expectedPausedTasks.size();
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not get all paused task within the given timeout!"
+            );
+        }
+    }
+
     private void verifyDrainingRemovedTasks(final Task... tasks) throws 
Exception {
         final Set<Task> expectedRemovedTasks = mkSet(tasks);
         final Set<Task> removedTasks = new HashSet<>();
@@ -1012,6 +1274,24 @@ class DefaultStateUpdaterTest {
         );
     }
 
+    private void verifyFailedTasks(final Class<? extends RuntimeException> 
clazz, final Task... tasks) throws Exception {
+        final List<Task> expectedFailedTasks = Arrays.asList(tasks);
+        final Set<Task> failedTasks = new HashSet<>();
+        waitForCondition(
+                () -> {
+                    for (final ExceptionAndTasks exceptionsAndTasks : 
stateUpdater.getExceptionsAndFailedTasks()) {
+                        if (clazz.isInstance(exceptionsAndTasks.exception())) {
+                            failedTasks.addAll(exceptionsAndTasks.getTasks());
+                        }
+                    }
+                    return failedTasks.containsAll(expectedFailedTasks)
+                            && failedTasks.size() == 
expectedFailedTasks.size();
+                },
+                VERIFICATION_TIMEOUT,
+                "Did not get all exceptions and failed tasks within the given 
timeout!"
+        );
+    }
+
     private void verifyDrainingExceptionsAndFailedTasks(final 
ExceptionAndTasks... exceptionsAndTasks) throws Exception {
         final List<ExceptionAndTasks> expectedExceptionAndTasks = 
Arrays.asList(exceptionsAndTasks);
         final List<ExceptionAndTasks> failedTasks = new ArrayList<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
index 39b927ee09..f994ef75c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
@@ -20,8 +20,10 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD;
+import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.PAUSE;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask;
+import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createPauseTask;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createRemoveTask;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -54,6 +56,18 @@ class TaskAndActionTest {
         assertEquals("Action type REMOVE cannot have a task!", 
exception.getMessage());
     }
 
+    @Test
+    public void shouldCreatePauseTaskAction() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final TaskAndAction pauseTask = createPauseTask(taskId);
+
+        assertEquals(PAUSE, pauseTask.getAction());
+        assertEquals(taskId, pauseTask.getTaskId());
+        final Exception exception = assertThrows(IllegalStateException.class, 
pauseTask::getTask);
+        assertEquals("Action type PAUSE cannot have a task!", 
exception.getMessage());
+    }
+
     @Test
     public void shouldThrowIfAddTaskActionIsCreatedWithNullTask() {
         final Exception exception = assertThrows(NullPointerException.class, 
() -> createAddTask(null));
@@ -65,4 +79,10 @@ class TaskAndActionTest {
         final Exception exception = assertThrows(NullPointerException.class, 
() -> createRemoveTask(null));
         assertTrue(exception.getMessage().contains("Task ID of task to remove 
is null!"));
     }
+
+    @Test
+    public void shouldThrowIfPauseTaskActionIsCreatedWithNullTaskId() {
+        final Exception exception = assertThrows(NullPointerException.class, 
() -> createPauseTask(null));
+        assertTrue(exception.getMessage().contains("Task ID of task to pause 
is null!"));
+    }
 }
\ No newline at end of file

Reply via email to