guozhangwang commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r899247014


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) 
{
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCommitRestoringTasks(final long now) {
+            final long elapsedMsSinceLastCommit = now - lastCommitMs;
+            if (elapsedMsSinceLastCommit > commitIntervalMs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Committing all restoring tasks since {}ms has 
elapsed (commit interval is {}ms)",
+                        elapsedMsSinceLastCommit, commitIntervalMs);
+                }
+
+                for (final Task task : updatingTasks.values()) {
+                    // do not enforce checkpointing during restoration if its 
position has not advanced much
+                    commitTask(task, false);
+                }
+
+                lastCommitMs = now;
+            }
+        }
+
+        private void commitTask(final Task task, final boolean 
enforceCheckpoint) {
+            // prepare commit should not take any effect except a no-op 
verification
+            final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = 
task.prepareCommit();

Review Comment:
   I was trying to leverage on the extra state validation logic inside the 
`*Commit()` function, but I agree we can directly call checkpointing which 
would be more straight-forward. LMK if you feel strong about this and I can 
change it accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to