guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r978255189
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
}
}
- @Override
- public void pause(final TaskId taskId) {
+ public void pause(final TopologyMetadata topologyMetadata) {
tasksAndActionsLock.lock();
try {
- tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
- tasksAndActionsCondition.signalAll();
+ for (final Task task : getUpdatingTasks()) {
+ if (topologyMetadata.isPaused(task.id().topologyName())) {
+
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+ tasksAndActionsCondition.signalAll();
+ }
+ }
Review Comment:
Hi @cadonna I tried out this idea but it turns out more complicated than
expected. The main issue is that if we store the paused topology outside the
restore thread, and only let the caller thread to modify it, the restore thread
still need to access it while handling add-task; and vice versa if we store the
paused topologies inside the restore thread and let that thread to modify it
upon handling pause/resume-task, the caller thread still need to access it to
determine if a new action needs to be enqueued. In either case we'd construct
two communication channels between the two.
In the end I chose a different route, which is directly pass in the
`topologyMetadata` to StateUpdater, and then periodically check that topology's
paused metadata to pause / resume tasks. As a result we would also remove
pause/resume-task actions as a queue item. Right now I just piggy-backed it
with checkpointing but I'm okay with doing it separately, periodically. After
some pondering I feel this is actually simpler than passing pause/resume
actions via channels.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]