lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1097298086


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.allTasksPerId();
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());
+            return ret;
+        } else {
+            return tasks.allTasksPerId();
+        }

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.allTasksPerId();
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());

Review Comment:
   Yes, once you merge the other one we can rebase this one.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   `StreamThread.maybeCommit` uses `TaskManager.allTasks` that excluded tasks 
in the state updater before, but includes tasks in the state updater now. 
However, `commitNeeded` should be `false` for all those tasks right? I was 
scratching my head here a bit, because `StreamThread.maybeCommit` was 
processing `RESTORING` tasks in the old code path, but not in the state updater 
code path.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   w.r.t. Bruno's comment: done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   The `PauseResumeIntegrationTest` is testing precisely this (that _no_ 
progress is made when a task is added in paused state). I also wasn't sure how 
important this property is for KSQL, but I suppose it is important enough that 
Jim wrote a unit test for it. I also think it makes sense to have the property, 
making a little progress on a paused task is unintuitive IMHO.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -411,6 +422,7 @@ private void checkAllUpdatingTaskStates(final long now) {
     private final Condition restoredActiveTasksCondition = 
restoredActiveTasksLock.newCondition();
     private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = 
new LinkedBlockingQueue<>();
     private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
+    private final AtomicBoolean needsResumeCheck = new AtomicBoolean(false);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to