cadonna commented on code in PR #12583:
URL: https://github.com/apache/kafka/pull/12583#discussion_r962717234


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -867,37 +868,47 @@ void runOnce() {
     }
 
     private void initializeAndRestorePhase() {
-        // only try to initialize the assigned tasks
-        // if the state is still in PARTITION_ASSIGNED after the poll call
+        final java.util.function.Consumer<Set<TopicPartition>> offsetResetter 
= partitions -> resetOffsets(partitions, null);
         final State stateSnapshot = state;
-        if (stateSnapshot == State.PARTITIONS_ASSIGNED
-            || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+        if (stateUpdaterEnabled) {
+            checkStateUpdater();
+        } else {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
 
-            log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
+                log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
 
-            // transit to restore active is idempotent so we can call it 
multiple times
-            changelogReader.enforceRestoreActive();
+                if (taskManager.tryToCompleteRestoration(now, offsetResetter)) 
{
+                    log.info("Restoration took {} ms for all tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
+                        taskManager.allTasks().keySet());
+                    setState(State.RUNNING);
+                }
 
-            if (taskManager.tryToCompleteRestoration(now, partitions -> 
resetOffsets(partitions, null))) {
-                changelogReader.transitToUpdateStandby();
-                log.info("Restoration took {} ms for all tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
-                    taskManager.allTasks().keySet());
-                setState(State.RUNNING);
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization call done. State is {}", state);
+                }
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("Initialization call done. State is {}", state);
+                log.debug("Idempotently invoking restoration logic in state 
{}", state);

Review Comment:
   I somehow agree with you, but I did actually not change anything here. I 
only added the `if`-branch with the call to `checkStateUpdater()` and did not 
touch the content of the `else`-branch. The diff is really odd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to