cadonna commented on code in PR #12583: URL: https://github.com/apache/kafka/pull/12583#discussion_r962717234
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -867,37 +868,47 @@ void runOnce() { } private void initializeAndRestorePhase() { - // only try to initialize the assigned tasks - // if the state is still in PARTITION_ASSIGNED after the poll call + final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = partitions -> resetOffsets(partitions, null); final State stateSnapshot = state; - if (stateSnapshot == State.PARTITIONS_ASSIGNED - || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { + if (stateUpdaterEnabled) { + checkStateUpdater(); + } else { + // only try to initialize the assigned tasks + // if the state is still in PARTITION_ASSIGNED after the poll call + if (stateSnapshot == State.PARTITIONS_ASSIGNED + || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { - log.debug("State is {}; initializing tasks if necessary", stateSnapshot); + log.debug("State is {}; initializing tasks if necessary", stateSnapshot); - // transit to restore active is idempotent so we can call it multiple times - changelogReader.enforceRestoreActive(); + if (taskManager.tryToCompleteRestoration(now, offsetResetter)) { + log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, + taskManager.allTasks().keySet()); + setState(State.RUNNING); + } - if (taskManager.tryToCompleteRestoration(now, partitions -> resetOffsets(partitions, null))) { - changelogReader.transitToUpdateStandby(); - log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, - taskManager.allTasks().keySet()); - setState(State.RUNNING); + if (log.isDebugEnabled()) { + log.debug("Initialization call done. State is {}", state); + } } if (log.isDebugEnabled()) { - log.debug("Initialization call done. State is {}", state); + log.debug("Idempotently invoking restoration logic in state {}", state); Review Comment: I somehow agree with you, but I did actually not change anything here. I only added the `if`-branch with the call to `checkStateUpdater()` and did not touch the content of the `else`-branch. The diff is really odd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org