guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r980337970


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Hi @cadonna I actually did exactly this with a concurrent hash map keysets 
:) The main issue is because the `pausedTopologies` would be modified by the 
main thread, while it would be checked upon `add` by the restore thread, 
there's a race condition complexity here since the main thread has a sequence 
of 1) check the paused topology concurrent set, 2) create pause/resume tasks 
into the queue, 3) update the paused topology concurrent set. And with restore 
thread checking the paused topology set concurrently, we could have many racing 
complexities unless we synchronize the whole section of 1/2/3 steps above. It's 
definitely doable, but it's introducing a lot of edge cases handling when 
getting an action from the queue.
   
   Introducing the whole `topology metadata` into the state updater on the 
other hand could delay the syncing of paused topologies as we only check it 
infrequently, but we are asynchronously updating the paused topologies anyways, 
and I feel its less complex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to