This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 693e283802 KAFKA-10199: Add RESUME in state updater (#12387) 693e283802 is described below commit 693e283802590b724ef441d5bf7acb6eeced91c5 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Tue Jul 19 09:44:10 2022 -0700 KAFKA-10199: Add RESUME in state updater (#12387) * Need to check enforceRestoreActive / transitToUpdateStandby when resuming a paused task. * Do not expose another getResumedTasks since I think its caller only need the getPausedTasks. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../processor/internals/DefaultStateUpdater.java | 36 ++++- .../streams/processor/internals/StateUpdater.java | 13 ++ .../streams/processor/internals/TaskAndAction.java | 10 +- .../internals/DefaultStateUpdaterTest.java | 158 ++++++++++++++++++++- .../processor/internals/TaskAndActionTest.java | 20 +++ 5 files changed, 229 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 08959bee00..7e7ec2a6f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -128,6 +128,9 @@ public class DefaultStateUpdater implements StateUpdater { case PAUSE: pauseTask(taskAndAction.getTaskId()); break; + case RESUME: + resumeTask(taskAndAction.getTaskId()); + break; } } } finally { @@ -249,7 +252,7 @@ public class DefaultStateUpdater implements StateUpdater { final Task existingTask = updatingTasks.putIfAbsent(task.id(), task); if (existingTask != null) { throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " + - "should not try to add another " + (task.isActive() ? "Active" : "Standby") + " task with the same id. " + BUG_ERROR_MESSAGE); + "should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE); } if (task.isActive()) { @@ -304,6 +307,26 @@ public class DefaultStateUpdater implements StateUpdater { } } + private void resumeTask(final TaskId taskId) { + final Task task = pausedTasks.get(taskId); + if (task != null) { + updatingTasks.put(taskId, task); + pausedTasks.remove(taskId); + + if (task.isActive()) { + log.debug("Stateful active task " + task.id() + " was resumed to the updating tasks of the state updater"); + changelogReader.enforceRestoreActive(); + } else { + log.debug("Standby task " + task.id() + " was resumed to the updating tasks of the state updater"); + if (updatingTasks.size() == 1) { + changelogReader.transitToUpdateStandby(); + } + } + } else { + log.debug("Task " + taskId + " was not resumed since it is not paused."); + } + } + private boolean isStateless(final Task task) { return task.changelogPartitions().isEmpty() && task.isActive(); } @@ -451,6 +474,17 @@ public class DefaultStateUpdater implements StateUpdater { } } + @Override + public void resume(final TaskId taskId) { + tasksAndActionsLock.lock(); + try { + tasksAndActions.add(TaskAndAction.createResumeTask(taskId)); + tasksAndActionsCondition.signalAll(); + } finally { + tasksAndActionsLock.unlock(); + } + } + @Override public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) { final long timeoutMs = timeout.toMillis(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index 516e47436b..69d521b600 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -107,6 +107,19 @@ public interface StateUpdater { */ void pause(final TaskId taskId); + /** + * Resume restoring a task (active or standby) in the state updater. + * + * This method does not block until the task is paused. + * + * Restored tasks, removed tasks and failed tasks are not resumed so this action would be an no-op for them. + * Stateless tasks will never be resumed since they are immediately added to the + * restored active tasks. + * + * @param taskId ID of the task to remove + */ + void resume(final TaskId taskId); + /** * Drains the restored active tasks from the state updater. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java index 585374c339..cc93321a29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAndAction.java @@ -25,7 +25,8 @@ public class TaskAndAction { enum Action { ADD, REMOVE, - PAUSE + PAUSE, + RESUME } private final Task task; @@ -53,6 +54,11 @@ public class TaskAndAction { return new TaskAndAction(null, taskId, Action.PAUSE); } + public static TaskAndAction createResumeTask(final TaskId taskId) { + Objects.requireNonNull(taskId, "Task ID of task to resume is null!"); + return new TaskAndAction(null, taskId, Action.RESUME); + } + public Task getTask() { if (action != Action.ADD) { throw new IllegalStateException("Action type " + action + " cannot have a task!"); @@ -61,7 +67,7 @@ public class TaskAndAction { } public TaskId getTaskId() { - if (action != Action.REMOVE && action != Action.PAUSE) { + if (action != Action.REMOVE && action != Action.PAUSE && action != Action.RESUME) { throw new IllegalStateException("Action type " + action + " cannot have a task ID!"); } return taskId; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 14b8237fe7..a6543bd620 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -617,7 +617,7 @@ class DefaultStateUpdaterTest { } @Test - public void shouldIgnorePausingNotExistTasks() throws Exception { + public void shouldNotPausingNonExistTasks() throws Exception { stateUpdater.start(); stateUpdater.pause(TASK_0_0); @@ -723,6 +723,138 @@ class DefaultStateUpdaterTest { verifyPausedTasks(); } + @Test + public void shouldResumeActiveStatefulTask() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldResumeStatefulTask(task); + verify(changelogReader, times(2)).enforceRestoreActive(); + } + + @Test + public void shouldResumeStandbyTask() throws Exception { + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldResumeStatefulTask(task); + verify(changelogReader, times(2)).transitToUpdateStandby(); + } + + private void shouldResumeStatefulTask(final Task task) throws Exception { + stateUpdater.start(); + stateUpdater.add(task); + + stateUpdater.pause(task.id()); + + verifyPausedTasks(task); + verifyUpdatingTasks(); + + stateUpdater.resume(task.id()); + + verifyPausedTasks(); + verifyUpdatingTasks(task); + } + + @Test + public void shouldNotResumeNonExistingTasks() throws Exception { + stateUpdater.start(); + stateUpdater.resume(TASK_0_0); + + verifyPausedTasks(); + verifyRestoredActiveTasks(); + verifyRemovedTasks(); + verifyUpdatingTasks(); + verifyExceptionsAndFailedTasks(); + } + + @Test + public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + stateUpdater.start(); + stateUpdater.add(task); + stateUpdater.add(controlTask); + + verifyRestoredActiveTasks(task); + + stateUpdater.resume(task.id()); + stateUpdater.resume(controlTask.id()); + + verifyPausedTasks(); + verifyRestoredActiveTasks(task); + verifyUpdatingTasks(controlTask); + verifyExceptionsAndFailedTasks(); + } + + @Test + public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotPauseTaskInRemovedTasks(task); + } + + @Test + public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception { + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotResumeTaskInRemovedTasks(task); + } + + private void shouldNotResumeTaskInRemovedTasks(final Task task) throws Exception { + when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + stateUpdater.start(); + stateUpdater.add(task); + + verifyUpdatingTasks(task); + verifyExceptionsAndFailedTasks(); + + stateUpdater.remove(task.id()); + + verifyRemovedTasks(task); + verifyUpdatingTasks(); + + stateUpdater.resume(task.id()); + + verifyUpdatingTasks(); + } + + @Test + public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception { + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotPauseTaskInFailedTasks(task); + } + + @Test + public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception { + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + shouldNotResumeTaskInFailedTasks(task); + } + + private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception { + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamsException streamsException = new StreamsException("Something happened", task.id()); + when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()).thenReturn(false); + final Map<TaskId, Task> updatingTasks = mkMap( + mkEntry(task.id(), task), + mkEntry(controlTask.id(), controlTask) + ); + doThrow(streamsException) + .doNothing() + .when(changelogReader).restore(updatingTasks); + stateUpdater.start(); + + stateUpdater.add(task); + stateUpdater.add(controlTask); + final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task), streamsException); + verifyExceptionsAndFailedTasks(expectedExceptionAndTasks); + verifyUpdatingTasks(controlTask); + + stateUpdater.resume(task.id()); + stateUpdater.resume(controlTask.id()); + + verifyExceptionsAndFailedTasks(expectedExceptionAndTasks); + verifyUpdatingTasks(controlTask); + } + @Test public void shouldDrainRemovedTasks() throws Exception { assertTrue(stateUpdater.drainRemovedTasks().isEmpty()); @@ -1146,7 +1278,11 @@ class DefaultStateUpdaterTest { private void verifyRestoredActiveTasks(final StreamTask... tasks) throws Exception { if (tasks.length == 0) { - assertTrue(stateUpdater.getRestoredActiveTasks().isEmpty()); + waitForCondition( + () -> stateUpdater.getRestoredActiveTasks().isEmpty(), + VERIFICATION_TIMEOUT, + "Did not get empty restored active task within the given timeout!" + ); } else { final Set<StreamTask> expectedRestoredTasks = mkSet(tasks); final Set<StreamTask> restoredTasks = new HashSet<>(); @@ -1179,7 +1315,11 @@ class DefaultStateUpdaterTest { private void verifyUpdatingTasks(final Task... tasks) throws Exception { if (tasks.length == 0) { - assertTrue(stateUpdater.getUpdatingTasks().isEmpty()); + waitForCondition( + () -> stateUpdater.getUpdatingTasks().isEmpty(), + VERIFICATION_TIMEOUT, + "Did not get empty updating task within the given timeout!" + ); } else { final Set<Task> expectedUpdatingTasks = mkSet(tasks); final Set<Task> updatingTasks = new HashSet<>(); @@ -1211,7 +1351,11 @@ class DefaultStateUpdaterTest { private void verifyRemovedTasks(final Task... tasks) throws Exception { if (tasks.length == 0) { - assertTrue(stateUpdater.getRemovedTasks().isEmpty()); + waitForCondition( + () -> stateUpdater.getRemovedTasks().isEmpty(), + VERIFICATION_TIMEOUT, + "Did not get empty removed task within the given timeout!" + ); } else { final Set<Task> expectedRemovedTasks = mkSet(tasks); final Set<Task> removedTasks = new HashSet<>(); @@ -1229,7 +1373,11 @@ class DefaultStateUpdaterTest { private void verifyPausedTasks(final Task... tasks) throws Exception { if (tasks.length == 0) { - assertTrue(stateUpdater.getPausedTasks().isEmpty()); + waitForCondition( + () -> stateUpdater.getPausedTasks().isEmpty(), + VERIFICATION_TIMEOUT, + "Did not get empty paused task within the given timeout!" + ); } else { final Set<Task> expectedPausedTasks = mkSet(tasks); final Set<Task> pausedTasks = new HashSet<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java index f994ef75c9..2bc9d05326 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskAndActionTest.java @@ -22,9 +22,11 @@ import org.junit.jupiter.api.Test; import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD; import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.PAUSE; import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE; +import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.RESUME; import static org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask; import static org.apache.kafka.streams.processor.internals.TaskAndAction.createPauseTask; import static org.apache.kafka.streams.processor.internals.TaskAndAction.createRemoveTask; +import static org.apache.kafka.streams.processor.internals.TaskAndAction.createResumeTask; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -68,6 +70,18 @@ class TaskAndActionTest { assertEquals("Action type PAUSE cannot have a task!", exception.getMessage()); } + @Test + public void shouldCreateResumeTaskAction() { + final TaskId taskId = new TaskId(0, 0); + + final TaskAndAction pauseTask = createResumeTask(taskId); + + assertEquals(RESUME, pauseTask.getAction()); + assertEquals(taskId, pauseTask.getTaskId()); + final Exception exception = assertThrows(IllegalStateException.class, pauseTask::getTask); + assertEquals("Action type RESUME cannot have a task!", exception.getMessage()); + } + @Test public void shouldThrowIfAddTaskActionIsCreatedWithNullTask() { final Exception exception = assertThrows(NullPointerException.class, () -> createAddTask(null)); @@ -85,4 +99,10 @@ class TaskAndActionTest { final Exception exception = assertThrows(NullPointerException.class, () -> createPauseTask(null)); assertTrue(exception.getMessage().contains("Task ID of task to pause is null!")); } + + @Test + public void shouldThrowIfResumeTaskActionIsCreatedWithNullTaskId() { + final Exception exception = assertThrows(NullPointerException.class, () -> createResumeTask(null)); + assertTrue(exception.getMessage().contains("Task ID of task to resume is null!")); + } } \ No newline at end of file