cadonna commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1068096858


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -411,6 +422,7 @@ private void checkAllUpdatingTaskStates(final long now) {
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
     private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
     private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
+    private final AtomicBoolean needsResumeCheck = new AtomicBoolean(false);

Review Comment:
   nit: I find the name a bit confusing. What do you think about 
`isTopologyResumed` or 'topologyResumed'?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   To be consistent, we should also verify here if the task already exists 
similar to the `else`-branch.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.allTasksPerId();
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());
+            return ret;
+        } else {
+            return tasks.allTasksPerId();
+        }

Review Comment:
   I could not find a test for this change. Could you add one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to