guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1102201605


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   While working on another PR I realized in `checkAllUpdatingTaskStates` we 
only try to pause tasks when the commit interval has elapsed, i.e. when we 
pause a named topology, its corresponding tasks may only be paused after a 
while, while when we resume, tasks are resumed immediately.
   
   So I think we should move the `pausing` logic out of the 
`checkAllUpdatingTaskStates` as well like `resuming`, which would leave 
`checkAllUpdatingTaskStates` to just become `maybeCheckpointUpdatingTasks`. 
WDYT?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   I think the actual culprit is like @lucasbru said, `StreamThread.maybeCommit 
uses TaskManager.allTasks`, while the callee `TaskManager.allTasks` is used in 
many different places.. For example for IQ, it was used and in that case it 
should return all tasks, whereas in this caller, `TaskManager.allTasks` should 
just return all processing tasks when updater is enabled, i.e. only the ones in 
`TaskRegistry`.
   
   > I am also wondering why Streams also commits restoring tasks.
   
   For that, the rationale is to allow the restoration progress to be recorded 
as well in case it was paused due to another rebalance (though it's 
`prepareCommit` would always return empty map, so the `committing` process for 
those restoring tasks would be reduced to writing checkpoints). But somehow in 
the middle of history we lost the logic to ever change `commitNeeded` flag for 
restoring tasks, so they would always not be triggered. Hence a new JIRA is 
proposed to fix it, and we fixed it in state updater.
   
   I thought about fixing it in a probably better way, which involves more 
changes: 1) introduce a `TaskManager.allProcessingTasks`, 2) depending on the 
updater enabled flag, let `maybeCommit` call either this one or `allTasks`. 
When I thought about that, I admit I was thinking this change is a bit too 
much, and since tasks in updater the task should not make any side-effects and 
return `commitNeeded` false this maybe just okay.
   
   But thinking about this a bit more, I think it's indeed worrisome to let 
`ReadyOnlyTask.commitNeeded` to have any potential side effects if we ever have 
bugs..



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   Ack. That makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to