guozhangwang commented on code in PR #12279: URL: https://github.com/apache/kafka/pull/12279#discussion_r900314537
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -290,6 +295,34 @@ private void addTaskToRestoredTasks(final StreamTask task) { restoredActiveTasksLock.unlock(); } } + + private void maybeCommitRestoringTasks(final long now) { + final long elapsedMsSinceLastCommit = now - lastCommitMs; + if (elapsedMsSinceLastCommit > commitIntervalMs) { + if (log.isDebugEnabled()) { + log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)", + elapsedMsSinceLastCommit, commitIntervalMs); + } + + for (final Task task : updatingTasks.values()) { + // do not enforce checkpointing during restoration if its position has not advanced much + commitTask(task, false); + } + + lastCommitMs = now; + } + } + + private void commitTask(final Task task, final boolean enforceCheckpoint) { + // prepare commit should not take any effect except a no-op verification + final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(); Review Comment: I plan to do the function refactoring (mentioned above) in the follow-up PR, and here I would just directly call the checkpoint functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org