wcarlson5 commented on code in PR #12554:
URL: https://github.com/apache/kafka/pull/12554#discussion_r955423851


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -183,6 +185,21 @@ public int hashCode() {
      */
     Set<StreamTask> getActiveTasks();
 
+    /**
+     * Returns if the state updater restores active tasks.
+     *
+     * The state updater restores active tasks if at least one active task was 
added with the {@link StateUpdater#add(Task)}

Review Comment:
   Is this if it it _currently_ restoring an active task? It read like if it is 
able to do so. I guess I am not sure what this method is for.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1038,7 +1104,7 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map<TopicPartition, Lo
         return offsetSum;
     }
 
-    private void closeTaskDirty(final Task task) {
+    private void closeTaskDirty(final Task task, final boolean 
removeFromTasksRegistry) {

Review Comment:
   + 1 I agree that it would be best to separate this logic out. I think it 
would help with maintaining and readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to