lucasbru commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114354933


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -151,9 +202,18 @@ private void resumeTasks() {
             }
         }
 
-        private void restoreTasks() {
+        private void pauseTasks() {
+            for (final Task task : updatingTasks.values()) {

Review Comment:
   Not sure, but is there any performance concern around running this loop in 
every single iteration of `runOnce` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -598,6 +685,12 @@ public Set<StandbyTask> getUpdatingStandbyTasks() {
             : Collections.emptySet();
     }
 
+    public Set<StreamTask> getUpdatingActiveTasks() {
+        return stateUpdaterThread != null

Review Comment:
   As I understand it, this function will be called quite frequently to export 
metrics. We only need the size of the collection. It could make sense to avoid 
the allocations here and just implement a `getNumberOfUpdaingActiveTasks` as a 
non-essential but free optimization. Similar for `getPausedStandbyTasks` etc. 
pp.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -399,31 +459,56 @@ private void addToRestoredTasks(final StreamTask task) {
             }
         }
 
-        private void checkAllUpdatingTaskStates(final long now) {
+        private void maybeCheckpointTasks(final long now) {
             final long elapsedMsSinceLastCommit = now - lastCommitMs;
             if (elapsedMsSinceLastCommit > commitIntervalMs) {
                 if (log.isDebugEnabled()) {
                     log.debug("Checking all restoring task states since {}ms 
has elapsed (commit interval is {}ms)",

Review Comment:
   Update the log message as well. This function isn't really checking task 
states anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to