guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r932692349


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -528,55 +530,61 @@ private void convertStandbyToActive(final StandbyTask 
standbyTask,
     boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         boolean allRunning = true;
 
-        final List<Task> activeTasks = new LinkedList<>();
-        for (final Task task : tasks.allTasks()) {
-            try {
-                if (task.initializeIfNeeded() && stateUpdater != null) {
-                    stateUpdater.add(task);
+        if (stateUpdater == null) {
+            final List<Task> activeTasks = new LinkedList<>();
+            for (final Task task : tasks.allTasks()) {
+                try {
+                    task.initializeIfNeeded();
+                    task.clearTaskTimeout();
+                } catch (final LockException lockException) {
+                    // it is possible that if there are multiple threads 
within the instance that one thread
+                    // trying to grab the task from the other, while the other 
has not released the lock since
+                    // it did not participate in the rebalance. In this case 
we can just retry in the next iteration
+                    log.debug("Could not initialize task {} since: {}; will 
retry", task.id(), lockException.getMessage());
+                    allRunning = false;
+                } catch (final TimeoutException timeoutException) {
+                    task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+                    allRunning = false;
                 }
-                task.clearTaskTimeout();
-            } catch (final LockException lockException) {
-                // it is possible that if there are multiple threads within 
the instance that one thread
-                // trying to grab the task from the other, while the other has 
not released the lock since
-                // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
-                log.debug("Could not initialize task {} since: {}; will 
retry", task.id(), lockException.getMessage());
-                allRunning = false;
-            } catch (final TimeoutException timeoutException) {
-                task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
-                allRunning = false;
-            }
 
-            if (task.isActive()) {
-                activeTasks.add(task);
+                if (task.isActive()) {
+                    activeTasks.add(task);
+                }
             }
-        }
-
-        if (allRunning && !activeTasks.isEmpty()) {
-
-            final Set<TopicPartition> restored = 
changelogReader.completedChangelogs();
-
-            for (final Task task : activeTasks) {
-                if (restored.containsAll(task.changelogPartitions())) {
-                    try {
-                        task.completeRestoration(offsetResetter);
-                        task.clearTaskTimeout();
-                    } catch (final TimeoutException timeoutException) {
-                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
-                        log.debug(
-                            String.format(
-                                "Could not complete restoration for %s due to 
the following exception; will retry",
-                                task.id()),
-                            timeoutException
-                        );
 
+            if (allRunning && !activeTasks.isEmpty()) {
+
+                final Set<TopicPartition> restored = 
changelogReader.completedChangelogs();
+
+                for (final Task task : activeTasks) {
+                    if (restored.containsAll(task.changelogPartitions())) {
+                        try {
+                            task.completeRestoration(offsetResetter);
+                            task.clearTaskTimeout();
+                        } catch (final TimeoutException timeoutException) {
+                            task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+                            log.debug(
+                                String.format(
+                                    "Could not complete restoration for %s due 
to the following exception; will retry",
+                                    task.id()),
+                                timeoutException
+                            );
+
+                            allRunning = false;
+                        }
+                    } else {
+                        // we found a restoring task that isn't done 
restoring, which is evidence that
+                        // not all tasks are running
                         allRunning = false;
                     }
-                } else {
-                    // we found a restoring task that isn't done restoring, 
which is evidence that
-                    // not all tasks are running
-                    allRunning = false;
                 }
             }
+        } else {
+            for (final Task task : tasks.drainPendingTaskToRestore()) {
+                stateUpdater.add(task);

Review Comment:
   Ah yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to