guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r978255189


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Hi @cadonna I tried out this idea but it turns out more complicated than 
expected. The main issue is that if we store the paused topology outside the 
restore thread, and only let the caller thread to modify it, the restore thread 
still need to access it while handling add-task; and vice versa if we store the 
paused topologies inside the restore thread and let that thread to modify it 
upon handling pause/resume-task, the caller thread still need to access it to 
determine if a new action needs to be enqueued. In either case we'd construct 
two communication channels between the two.
   
   In the end I chose a different route, which is directly pass in the 
`topologyMetadata` to StateUpdater, and then periodically check that topology's 
paused metadata to pause / resume tasks. As a result we would also remove 
pause/resume-task actions as a queue item. Right now I just piggy-backed it 
with checkpointing but I'm okay with doing it separately, periodically. After 
some pondering I feel this is actually simpler than passing pause/resume 
actions via channels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to