This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387)
693e283802 is described below

commit 693e283802590b724ef441d5bf7acb6eeced91c5
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Tue Jul 19 09:44:10 2022 -0700

    KAFKA-10199: Add RESUME in state updater (#12387)
    
    * Need to check enforceRestoreActive / transitToUpdateStandby when resuming 
a paused task.
    * Do not expose another getResumedTasks since I think its caller only need 
the getPausedTasks.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 .../processor/internals/DefaultStateUpdater.java   |  36 ++++-
 .../streams/processor/internals/StateUpdater.java  |  13 ++
 .../streams/processor/internals/TaskAndAction.java |  10 +-
 .../internals/DefaultStateUpdaterTest.java         | 158 ++++++++++++++++++++-
 .../processor/internals/TaskAndActionTest.java     |  20 +++
 5 files changed, 229 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 08959bee00..7e7ec2a6f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater {
                         case PAUSE:
                             pauseTask(taskAndAction.getTaskId());
                             break;
+                        case RESUME:
+                            resumeTask(taskAndAction.getTaskId());
+                            break;
                     }
                 }
             } finally {
@@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater {
                 final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
                 if (existingTask != null) {
                     throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
-                        "should not try to add another " + (task.isActive() ? 
"Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+                        "should not try to add another " + (task.isActive() ? 
"active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
                 }
 
                 if (task.isActive()) {
@@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater {
             }
         }
 
+        private void resumeTask(final TaskId taskId) {
+            final Task task = pausedTasks.get(taskId);
+            if (task != null) {
+                updatingTasks.put(taskId, task);
+                pausedTasks.remove(taskId);
+
+                if (task.isActive()) {
+                    log.debug("Stateful active task " + task.id() + " was 
resumed to the updating tasks of the state updater");
+                    changelogReader.enforceRestoreActive();
+                } else {
+                    log.debug("Standby task " + task.id() + " was resumed to 
the updating tasks of the state updater");
+                    if (updatingTasks.size() == 1) {
+                        changelogReader.transitToUpdateStandby();
+                    }
+                }
+            } else {
+                log.debug("Task " + taskId + " was not resumed since it is not 
paused.");
+            }
+        }
+
         private boolean isStateless(final Task task) {
             return task.changelogPartitions().isEmpty() && task.isActive();
         }
@@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater {
         }
     }
 
+    @Override
+    public void resume(final TaskId taskId) {
+        tasksAndActionsLock.lock();
+        try {
+            tasksAndActions.add(TaskAndAction.createResumeTask(taskId));
+            tasksAndActionsCondition.signalAll();
+        } finally {
+            tasksAndActionsLock.unlock();
+        }
+    }
+
     @Override
     public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
         final long timeoutMs = timeout.toMillis();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
index 516e47436b..69d521b600 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -107,6 +107,19 @@ public interface StateUpdater {
      */
     void pause(final TaskId taskId);
 
+    /**
+     * Resume restoring a task (active or standby) in the state updater.
+     *
+     * This method does not block until the task is paused.
+     *
+     * Restored tasks, removed tasks and failed tasks are not resumed so this 
action would be an no-op for them.
+     * Stateless tasks will never be resumed since they are immediately added 
to the
+     * restored active tasks.
+     *
+     * @param taskId ID of the task to remove
+     */
+    void resume(final TaskId taskId);
+
     /**
      * Drains the restored active tasks from the state updater.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
index 585374c339..cc93321a29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java
@@ -25,7 +25,8 @@ public class TaskAndAction {
     enum Action {
         ADD,
         REMOVE,
-        PAUSE
+        PAUSE,
+        RESUME
     }
 
     private final Task task;
@@ -53,6 +54,11 @@ public class TaskAndAction {
         return new TaskAndAction(null, taskId, Action.PAUSE);
     }
 
+    public static TaskAndAction createResumeTask(final TaskId taskId) {
+        Objects.requireNonNull(taskId, "Task ID of task to resume is null!");
+        return new TaskAndAction(null, taskId, Action.RESUME);
+    }
+
     public Task getTask() {
         if (action != Action.ADD) {
             throw new IllegalStateException("Action type " + action + " cannot 
have a task!");
@@ -61,7 +67,7 @@ public class TaskAndAction {
     }
 
     public TaskId getTaskId() {
-        if (action != Action.REMOVE && action != Action.PAUSE) {
+        if (action != Action.REMOVE && action != Action.PAUSE && action != 
Action.RESUME) {
             throw new IllegalStateException("Action type " + action + " cannot 
have a task ID!");
         }
         return taskId;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 14b8237fe7..a6543bd620 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -617,7 +617,7 @@ class DefaultStateUpdaterTest {
     }
 
     @Test
-    public void shouldIgnorePausingNotExistTasks() throws Exception {
+    public void shouldNotPausingNonExistTasks() throws Exception {
         stateUpdater.start();
         stateUpdater.pause(TASK_0_0);
 
@@ -723,6 +723,138 @@ class DefaultStateUpdaterTest {
         verifyPausedTasks();
     }
 
+    @Test
+    public void shouldResumeActiveStatefulTask() throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldResumeStatefulTask(task);
+        verify(changelogReader, times(2)).enforceRestoreActive();
+    }
+
+    @Test
+    public void shouldResumeStandbyTask() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldResumeStatefulTask(task);
+        verify(changelogReader, times(2)).transitToUpdateStandby();
+    }
+
+    private void shouldResumeStatefulTask(final Task task) throws Exception {
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        stateUpdater.pause(task.id());
+
+        verifyPausedTasks(task);
+        verifyUpdatingTasks();
+
+        stateUpdater.resume(task.id());
+
+        verifyPausedTasks();
+        verifyUpdatingTasks(task);
+    }
+
+    @Test
+    public void shouldNotResumeNonExistingTasks() throws Exception {
+        stateUpdater.start();
+        stateUpdater.resume(TASK_0_0);
+
+        verifyPausedTasks();
+        verifyRestoredActiveTasks();
+        verifyRemovedTasks();
+        verifyUpdatingTasks();
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() 
throws Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+
+        verifyRestoredActiveTasks(task);
+
+        stateUpdater.resume(task.id());
+        stateUpdater.resume(controlTask.id());
+
+        verifyPausedTasks();
+        verifyRestoredActiveTasks(task);
+        verifyUpdatingTasks(controlTask);
+        verifyExceptionsAndFailedTasks();
+    }
+
+    @Test
+    public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskInRemovedTasks(task);
+    }
+
+    @Test
+    public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotResumeTaskInRemovedTasks(task);
+    }
+
+    private void shouldNotResumeTaskInRemovedTasks(final Task task) throws 
Exception {
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        stateUpdater.start();
+        stateUpdater.add(task);
+
+        verifyUpdatingTasks(task);
+        verifyExceptionsAndFailedTasks();
+
+        stateUpdater.remove(task.id());
+
+        verifyRemovedTasks(task);
+        verifyUpdatingTasks();
+
+        stateUpdater.resume(task.id());
+
+        verifyUpdatingTasks();
+    }
+
+    @Test
+    public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws 
Exception {
+        final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotPauseTaskInFailedTasks(task);
+    }
+
+    @Test
+    public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
+        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        shouldNotResumeTaskInFailedTasks(task);
+    }
+
+    private void shouldNotResumeTaskInFailedTasks(final Task task) throws 
Exception {
+        final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StreamsException streamsException = new 
StreamsException("Something happened", task.id());
+        
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted()).thenReturn(false);
+        final Map<TaskId, Task> updatingTasks = mkMap(
+                mkEntry(task.id(), task),
+                mkEntry(controlTask.id(), controlTask)
+        );
+        doThrow(streamsException)
+                .doNothing()
+                .when(changelogReader).restore(updatingTasks);
+        stateUpdater.start();
+
+        stateUpdater.add(task);
+        stateUpdater.add(controlTask);
+        final ExceptionAndTasks expectedExceptionAndTasks = new 
ExceptionAndTasks(mkSet(task), streamsException);
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        verifyUpdatingTasks(controlTask);
+
+        stateUpdater.resume(task.id());
+        stateUpdater.resume(controlTask.id());
+
+        verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+        verifyUpdatingTasks(controlTask);
+    }
+
     @Test
     public void shouldDrainRemovedTasks() throws Exception {
         assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
@@ -1146,7 +1278,11 @@ class DefaultStateUpdaterTest {
 
     private void verifyRestoredActiveTasks(final StreamTask... tasks) throws 
Exception {
         if (tasks.length == 0) {
-            assertTrue(stateUpdater.getRestoredActiveTasks().isEmpty());
+            waitForCondition(
+                () -> stateUpdater.getRestoredActiveTasks().isEmpty(),
+                VERIFICATION_TIMEOUT,
+                "Did not get empty restored active task within the given 
timeout!"
+            );
         } else {
             final Set<StreamTask> expectedRestoredTasks = mkSet(tasks);
             final Set<StreamTask> restoredTasks = new HashSet<>();
@@ -1179,7 +1315,11 @@ class DefaultStateUpdaterTest {
 
     private void verifyUpdatingTasks(final Task... tasks) throws Exception {
         if (tasks.length == 0) {
-            assertTrue(stateUpdater.getUpdatingTasks().isEmpty());
+            waitForCondition(
+                () -> stateUpdater.getUpdatingTasks().isEmpty(),
+                VERIFICATION_TIMEOUT,
+                "Did not get empty updating task within the given timeout!"
+            );
         } else {
             final Set<Task> expectedUpdatingTasks = mkSet(tasks);
             final Set<Task> updatingTasks = new HashSet<>();
@@ -1211,7 +1351,11 @@ class DefaultStateUpdaterTest {
 
     private void verifyRemovedTasks(final Task... tasks) throws Exception {
         if (tasks.length == 0) {
-            assertTrue(stateUpdater.getRemovedTasks().isEmpty());
+            waitForCondition(
+                () -> stateUpdater.getRemovedTasks().isEmpty(),
+                VERIFICATION_TIMEOUT,
+                "Did not get empty removed task within the given timeout!"
+            );
         } else {
             final Set<Task> expectedRemovedTasks = mkSet(tasks);
             final Set<Task> removedTasks = new HashSet<>();
@@ -1229,7 +1373,11 @@ class DefaultStateUpdaterTest {
 
     private void verifyPausedTasks(final Task... tasks) throws Exception {
         if (tasks.length == 0) {
-            assertTrue(stateUpdater.getPausedTasks().isEmpty());
+            waitForCondition(
+                () -> stateUpdater.getPausedTasks().isEmpty(),
+                VERIFICATION_TIMEOUT,
+                "Did not get empty paused task within the given timeout!"
+            );
         } else {
             final Set<Task> expectedPausedTasks = mkSet(tasks);
             final Set<Task> pausedTasks = new HashSet<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
index f994ef75c9..2bc9d05326 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java
@@ -22,9 +22,11 @@ import org.junit.jupiter.api.Test;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.PAUSE;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE;
+import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.Action.RESUME;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createPauseTask;
 import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createRemoveTask;
+import static 
org.apache.kafka.streams.processor.internals.TaskAndAction.createResumeTask;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -68,6 +70,18 @@ class TaskAndActionTest {
         assertEquals("Action type PAUSE cannot have a task!", 
exception.getMessage());
     }
 
+    @Test
+    public void shouldCreateResumeTaskAction() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final TaskAndAction pauseTask = createResumeTask(taskId);
+
+        assertEquals(RESUME, pauseTask.getAction());
+        assertEquals(taskId, pauseTask.getTaskId());
+        final Exception exception = assertThrows(IllegalStateException.class, 
pauseTask::getTask);
+        assertEquals("Action type RESUME cannot have a task!", 
exception.getMessage());
+    }
+
     @Test
     public void shouldThrowIfAddTaskActionIsCreatedWithNullTask() {
         final Exception exception = assertThrows(NullPointerException.class, 
() -> createAddTask(null));
@@ -85,4 +99,10 @@ class TaskAndActionTest {
         final Exception exception = assertThrows(NullPointerException.class, 
() -> createPauseTask(null));
         assertTrue(exception.getMessage().contains("Task ID of task to pause 
is null!"));
     }
+
+    @Test
+    public void shouldThrowIfResumeTaskActionIsCreatedWithNullTaskId() {
+        final Exception exception = assertThrows(NullPointerException.class, 
() -> createResumeTask(null));
+        assertTrue(exception.getMessage().contains("Task ID of task to resume 
is null!"));
+    }
 }
\ No newline at end of file

Reply via email to