guozhangwang commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r893752714
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
private void addTask(final Task task) {
if (isStateless(task)) {
- log.debug("Stateless active task " + task.id() + " was added
to the state updater");
addTaskToRestoredTasks((StreamTask) task);
+ log.debug("Stateless active task " + task.id() + " was added
to the restored tasks of the state updater");
} else {
+ updatingTasks.put(task.id(), task);
if (task.isActive()) {
- updatingTasks.put(task.id(), task);
- log.debug("Stateful active task " + task.id() + " was
added to the state updater");
+ log.debug("Stateful active task " + task.id() + " was
added to the updating tasks of the state updater");
changelogReader.enforceRestoreActive();
} else {
- updatingTasks.put(task.id(), task);
- log.debug("Standby task " + task.id() + " was added to the
state updater");
+ log.debug("Standby task " + task.id() + " was added to the
updating tasks of the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
}
}
}
+ private void removeTask(final TaskId taskId) {
+ final Task task = updatingTasks.remove(taskId);
Review Comment:
Not a comment: one of the bug fix we want to piggy-back in restoration is to
write the new checkpoint when we stop restoring a task otherwise the
restoration progress so-far would be lost. Also the restore callback should be
triggered as well (KAFKA-10575).
I will try to add this in a separate PR, also a good exercise on how the
state updater could interact with the processor state manager.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final
Duration timeout) {
}
@Override
- public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
+ public Set<Task> drainRemovedTasks() {
Review Comment:
When we add the logic for recycling a task, which is to be done at the task
manager still we would need two round-trips: first remove the task as
active/standby, then after recycling it add the new task as standby/active. I'm
still trying to flesh out the details here, just in case we would need to also
have a timeout for removed tasks similar to restored-active-tasks, this data
structure may also need to be transformed to a lock+condition+queue manner
instead of a blocking-queue.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -375,52 +384,42 @@ public Set<StreamTask> getRestoredActiveTasks(final
Duration timeout) {
}
@Override
- public List<ExceptionAndTasks> getFailedTasksAndExceptions() {
+ public Set<Task> drainRemovedTasks() {
+ final List<Task> result = new ArrayList<>();
+ removedTasks.drainTo(result);
+ return new HashSet<>(result);
+ }
+
+ @Override
+ public List<ExceptionAndTasks> drainExceptionsAndFailedTasks() {
final List<ExceptionAndTasks> result = new ArrayList<>();
- failedTasks.drainTo(result);
+ exceptionsAndFailedTasks.drainTo(result);
return result;
}
- @Override
- public Set<Task> getAllTasks() {
- tasksAndActionsLock.lock();
+ public Set<StandbyTask> getUpdatingStandbyTasks() {
+ return Collections.unmodifiableSet(new
HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks()));
+ }
+
+ public Set<Task> getUpdatingTasks() {
+ return Collections.unmodifiableSet(new
HashSet<>(stateUpdaterThread.getUpdatingTasks()));
+ }
+
+ public Set<StreamTask> getRestoredActiveTasks() {
restoredActiveTasksLock.lock();
try {
- final Set<Task> allTasks = new HashSet<>();
- allTasks.addAll(tasksAndActions.stream()
- .filter(t -> t.action == Action.ADD)
- .map(t -> t.task)
- .collect(Collectors.toList())
- );
- allTasks.addAll(stateUpdaterThread.getAllUpdatingTasks());
- allTasks.addAll(restoredActiveTasks);
- return Collections.unmodifiableSet(allTasks);
+ return Collections.unmodifiableSet(new
HashSet<>(restoredActiveTasks));
} finally {
restoredActiveTasksLock.unlock();
- tasksAndActionsLock.unlock();
}
}
- @Override
- public Set<StandbyTask> getStandbyTasks() {
- tasksAndActionsLock.lock();
- try {
- final Set<StandbyTask> standbyTasks = new HashSet<>();
- standbyTasks.addAll(tasksAndActions.stream()
- .filter(t -> t.action == Action.ADD)
- .filter(t -> !t.task.isActive())
- .map(t -> (StandbyTask) t.task)
- .collect(Collectors.toList())
- );
- standbyTasks.addAll(getUpdatingStandbyTasks());
- return Collections.unmodifiableSet(standbyTasks);
- } finally {
- tasksAndActionsLock.unlock();
- }
+ public List<ExceptionAndTasks> getExceptionsAndFailedTasks() {
Review Comment:
I feel some of the getters here would not end up needed in the non-testing
code since we have the corresponding `drainXX` functions that are declared in
the interface and would be used in those non-testing code. If that turns out
true let's cluster them together and commented they are for testing only.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -16,65 +16,101 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.streams.processor.TaskId;
+
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
public interface StateUpdater {
class ExceptionAndTasks {
- public final Set<Task> tasks;
- public final RuntimeException exception;
+ private final Set<Task> tasks;
+ private final RuntimeException exception;
public ExceptionAndTasks(final Set<Task> tasks, final RuntimeException
exception) {
- this.tasks = tasks;
- this.exception = exception;
+ this.tasks = Objects.requireNonNull(tasks);
+ this.exception = Objects.requireNonNull(exception);
+ }
+
+ public Set<Task> tasks() {
+ return Collections.unmodifiableSet(tasks);
+ }
+
+ public RuntimeException exception() {
+ return exception;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ExceptionAndTasks)) return false;
+ final ExceptionAndTasks that = (ExceptionAndTasks) o;
+ return tasks.equals(that.tasks) &&
exception.equals(that.exception);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tasks, exception);
}
}
/**
* Adds a task (active or standby) to the state updater.
*
+ * This method does not block until the task is added to the state updater.
+ *
* @param task task to add
*/
void add(final Task task);
/**
- * Removes a task (active or standby) from the state updater.
+ * Removes a task (active or standby) from the state updater and adds the
removed task to the removed tasks.
+ *
+ * This method does not block until the removed task is removed from the
state updater.
*
- * @param task task ro remove
+ * The task to be removed is not removed from the restored active tasks
and the failed tasks.
+ * Stateless tasks will never be added to the removed tasks since they are
immediately added to the
+ * restored active tasks.
+ *
+ * @param taskId ID of the task to remove
*/
- void remove(final Task task);
+ void remove(final TaskId taskId);
/**
- * Gets restored active tasks from state restoration/update
+ * Drains the restored active tasks from the state updater.
+ *
+ * The returned active tasks are removed from the state updater.
*
* @param timeout duration how long the calling thread should wait for
restored active tasks
*
* @return set of active tasks with up-to-date states
*/
- Set<StreamTask> getRestoredActiveTasks(final Duration timeout);
+ Set<StreamTask> drainRestoredActiveTasks(final Duration timeout);
- /**
- * Gets failed tasks and the corresponding exceptions
- *
- * @return list of failed tasks and the corresponding exceptions
- */
- List<ExceptionAndTasks> getFailedTasksAndExceptions();
/**
- * Get all tasks (active and standby) that are managed by the state
updater.
+ * Drains the removed tasks (active and standbys) from the state updater.
+ *
+ * Removed tasks returned by this method are tasks extraordinarily removed
from the state updater. These do not
Review Comment:
Like the word "extraordinarily" :)
Jokes aside, I have a slight different thought about the semantics here,
i.e. whether the drained removed and the restored/failed tasks should be
exclusive or be overlapping possibly, mainly from how easily the caller could
handle the overlapping scenarios. I left an early comment above.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -293,47 +321,165 @@ public void
shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
stateUpdater.add(task3);
- verifyRestoredActiveTasks(task3);
+ verifyRestoredActiveTasks(task1, task3);
verify(task3).completeRestoration(offsetResetter);
orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader,
times(1)).transitToUpdateStandby();
}
+ @Test
+ public void shouldRemoveActiveStatefulTask() throws Exception {
+ final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldRemoveStatefulTask(task);
+ }
+
+ @Test
+ public void shouldRemoveStandbyTask() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldRemoveStatefulTask(task);
+ }
+
+ private void shouldRemoveStatefulTask(final Task task) throws Exception {
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+ stateUpdater.add(task);
+
+ stateUpdater.remove(TASK_0_0);
+
+ verifyRemovedTasks(task);
+ verifyRestoredActiveTasks();
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+
verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+ }
+
+ @Test
+ public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks()
throws Exception {
+ final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotRemoveTaskFromRestoredActiveTasks(task);
+ }
+
+ @Test
+ public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws
Exception {
+ final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+ shouldNotRemoveTaskFromRestoredActiveTasks(task);
+ }
+
+ private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask
task) throws Exception {
+ final StreamTask controlTask =
createActiveStatefulTaskInStateRestoring(TASK_1_0,
Collections.singletonList(TOPIC_PARTITION_B_0));
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ verifyRestoredActiveTasks(task);
+
+ stateUpdater.remove(task.id());
+ stateUpdater.remove(controlTask.id());
+
+ verifyRemovedTasks(controlTask);
+ verifyRestoredActiveTasks(task);
+ verifyUpdatingTasks();
+ verifyExceptionsAndFailedTasks();
+ }
+
+ @Test
+ public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws
Exception {
+ final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotRemoveTaskFromFailedTasks(task);
+ }
+
+ @Test
+ public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
+ final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ shouldNotRemoveTaskFromFailedTasks(task);
+ }
+
+ private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws
Exception {
+ final StreamTask controlTask =
createActiveStatefulTaskInStateRestoring(TASK_1_0,
Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StreamsException streamsException = new
StreamsException("Something happened", task.id());
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+ doNothing()
+ .doThrow(streamsException)
+ .doNothing()
+ .when(changelogReader).restore(anyMap());
+ stateUpdater.add(task);
+ stateUpdater.add(controlTask);
+ final ExceptionAndTasks expectedExceptionAndTasks = new
ExceptionAndTasks(mkSet(task), streamsException);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+
+ stateUpdater.remove(task.id());
+ stateUpdater.remove(controlTask.id());
+
+ verifyRemovedTasks(controlTask);
+ verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+ verifyUpdatingTasks();
+ verifyRestoredActiveTasks();
+ }
+
+ @Test
+ public void shouldDrainRemovedTasks() throws Exception {
+ assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+
+ final StreamTask task1 =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_B_0));
+ stateUpdater.add(task1);
+ stateUpdater.remove(task1.id());
+
+ verifyDrainingRemovedTasks(task1);
+
+ final StreamTask task2 =
createActiveStatefulTaskInStateRestoring(TASK_1_1,
Collections.singletonList(TOPIC_PARTITION_C_0));
+ final StreamTask task3 =
createActiveStatefulTaskInStateRestoring(TASK_1_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask task4 =
createActiveStatefulTaskInStateRestoring(TASK_0_2,
Collections.singletonList(TOPIC_PARTITION_D_0));
+ stateUpdater.add(task2);
+ stateUpdater.remove(task2.id());
+ stateUpdater.add(task3);
+ stateUpdater.remove(task3.id());
+ stateUpdater.add(task4);
+ stateUpdater.remove(task4.id());
+
+ verifyDrainingRemovedTasks(task2, task3, task4);
+ }
+
@Test
public void
shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask()
throws Exception {
final StreamTask task1 =
createActiveStatefulTaskInStateRestoring(TASK_0_0,
Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2,
Collections.singletonList(TOPIC_PARTITION_B_0));
- final String expectedMessage = "The Streams were crossed!";
- final StreamsException expectedStreamsException = new
StreamsException(expectedMessage);
+ final String exceptionMessage = "The Streams were crossed!";
+ final StreamsException streamsException = new
StreamsException(exceptionMessage);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
-
doNothing().doThrow(expectedStreamsException).doNothing().when(changelogReader).restore(updatingTasks);
+
doNothing().doThrow(streamsException).doNothing().when(changelogReader).restore(updatingTasks);
stateUpdater.add(task1);
stateUpdater.add(task2);
- final List<ExceptionAndTasks> failedTasks = getFailedTasks(1);
- assertEquals(1, failedTasks.size());
- final ExceptionAndTasks actualFailedTasks = failedTasks.get(0);
- assertEquals(2, actualFailedTasks.tasks.size());
- assertTrue(actualFailedTasks.tasks.containsAll(Arrays.asList(task1,
task2)));
- assertTrue(actualFailedTasks.exception instanceof StreamsException);
- final StreamsException actualException = (StreamsException)
actualFailedTasks.exception;
- assertFalse(actualException.taskId().isPresent());
- assertEquals(expectedMessage, actualException.getMessage());
- assertTrue(stateUpdater.getAllTasks().isEmpty());
Review Comment:
Yup, agreed.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
private void addTask(final Task task) {
if (isStateless(task)) {
- log.debug("Stateless active task " + task.id() + " was added
to the state updater");
addTaskToRestoredTasks((StreamTask) task);
+ log.debug("Stateless active task " + task.id() + " was added
to the restored tasks of the state updater");
} else {
+ updatingTasks.put(task.id(), task);
if (task.isActive()) {
- updatingTasks.put(task.id(), task);
- log.debug("Stateful active task " + task.id() + " was
added to the state updater");
+ log.debug("Stateful active task " + task.id() + " was
added to the updating tasks of the state updater");
changelogReader.enforceRestoreActive();
} else {
- updatingTasks.put(task.id(), task);
- log.debug("Standby task " + task.id() + " was added to the
state updater");
+ log.debug("Standby task " + task.id() + " was added to the
updating tasks of the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
}
}
}
+ private void removeTask(final TaskId taskId) {
+ final Task task = updatingTasks.remove(taskId);
+ if (task != null) {
+ final Collection<TopicPartition> changelogPartitions =
task.changelogPartitions();
+ changelogReader.unregister(changelogPartitions);
+ removedTasks.add(task);
+ log.debug((task.isActive() ? "Active" : "Standby")
+ + " task " + task.id() + " was removed from the updating
tasks and added to the removed tasks.");
+ } else {
+ log.debug("Task " + taskId + " was not removed since it is not
updating.");
Review Comment:
This is a meta comment: for those tasks that have been restored, or failed,
should we still include them into the removed tasks to be returned in the
`drain` function still?
The reason I'm wondering about it is that, the caller of `updater.remove`
would likely expect to eventually see the task show up from the future `drain`
functions (again here one example would be the recycle scenario). If we do not
add them there then the caller's logic needs to be a bit complicated as to
check the `restored` / `failed` set from the updater as well while checking
when this task has been removed completely.
The down side of course is that a task can be shown in multiple of such
channels, but I feel the caller's logic to "de-dup" such cases would be easier
as long as there's a deterministic ordering of checking
removed/completed/failed tasks from the updater.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]