cadonna commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r931497377


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java:
##########
@@ -233,44 +234,7 @@ public void closeCleanAndRecycleState() {
         closeTaskSensor.record();
         transitionTo(State.CLOSED);
 
-        log.info("Closed clean and recycled state");
-    }
-
-    /**
-     * Create an active task from this standby task without closing and 
re-initializing the state stores.
-     * The task should have been in suspended state when calling this function
-     *
-     * TODO: we should be able to not need the input partitions as input param 
in future but always reuse
-     *       the task's input partitions when we have fixed partitions -> 
tasks mapping
-     */
-    public StreamTask recycle(final Time time,
-                              final ThreadCache cache,
-                              final RecordCollector recordCollector,
-                              final Set<TopicPartition> inputPartitions,
-                              final Consumer<byte[], byte[]> mainConsumer) {
-        if (!inputPartitions.equals(this.inputPartitions)) {
-            log.warn("Detected unmatched input partitions for task {} when 
recycling it from active to standby", id);
-        }
-
-        stateMgr.transitionTaskType(TaskType.ACTIVE);
-
-        log.debug("Recycling standby task {} to active", id);
-
-        return new StreamTask(
-            id,
-            inputPartitions,
-            topology,
-            mainConsumer,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateMgr,
-            recordCollector,
-            processorContext,
-            logContext
-        );
+        log.info("Closed and recycled state, and converted type to active");

Review Comment:
   ```suggestion
           log.info("Closed and recycled state");
   ```
   Conversion is now done in `createActiveTaskFromStandby()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2187,7 +2187,7 @@ public void 
shouldUnregisterMetricsInCloseCleanAndRecycleState() {
 
         task.suspend();
         assertThat(getTaskMetrics(), not(empty()));
-        task.recycleAndConvert();
+        task.prepareRecycle();

Review Comment:
   The following comment is not related to this PR, but I want to mention it 
nevertheless for awareness. In `StreamTaskTest` and `StandbyTaskTest` we have 
some gaps in testing `prepareRecycle()`. Most notably, we do not verify the 
state change to CLOSE and the recording of the `closeTaskSensor`. However, 
verification of this behavior is important for refactorings.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -528,55 +530,61 @@ private void convertStandbyToActive(final StandbyTask 
standbyTask,
     boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         boolean allRunning = true;
 
-        final List<Task> activeTasks = new LinkedList<>();
-        for (final Task task : tasks.allTasks()) {
-            try {
-                if (task.initializeIfNeeded() && stateUpdater != null) {
-                    stateUpdater.add(task);
+        if (stateUpdater == null) {
+            final List<Task> activeTasks = new LinkedList<>();
+            for (final Task task : tasks.allTasks()) {
+                try {
+                    task.initializeIfNeeded();
+                    task.clearTaskTimeout();
+                } catch (final LockException lockException) {
+                    // it is possible that if there are multiple threads 
within the instance that one thread
+                    // trying to grab the task from the other, while the other 
has not released the lock since
+                    // it did not participate in the rebalance. In this case 
we can just retry in the next iteration
+                    log.debug("Could not initialize task {} since: {}; will 
retry", task.id(), lockException.getMessage());
+                    allRunning = false;
+                } catch (final TimeoutException timeoutException) {
+                    task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+                    allRunning = false;
                 }
-                task.clearTaskTimeout();
-            } catch (final LockException lockException) {
-                // it is possible that if there are multiple threads within 
the instance that one thread
-                // trying to grab the task from the other, while the other has 
not released the lock since
-                // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
-                log.debug("Could not initialize task {} since: {}; will 
retry", task.id(), lockException.getMessage());
-                allRunning = false;
-            } catch (final TimeoutException timeoutException) {
-                task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
-                allRunning = false;
-            }
 
-            if (task.isActive()) {
-                activeTasks.add(task);
+                if (task.isActive()) {
+                    activeTasks.add(task);
+                }
             }
-        }
-
-        if (allRunning && !activeTasks.isEmpty()) {
-
-            final Set<TopicPartition> restored = 
changelogReader.completedChangelogs();
-
-            for (final Task task : activeTasks) {
-                if (restored.containsAll(task.changelogPartitions())) {
-                    try {
-                        task.completeRestoration(offsetResetter);
-                        task.clearTaskTimeout();
-                    } catch (final TimeoutException timeoutException) {
-                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
-                        log.debug(
-                            String.format(
-                                "Could not complete restoration for %s due to 
the following exception; will retry",
-                                task.id()),
-                            timeoutException
-                        );
 
+            if (allRunning && !activeTasks.isEmpty()) {
+
+                final Set<TopicPartition> restored = 
changelogReader.completedChangelogs();
+
+                for (final Task task : activeTasks) {
+                    if (restored.containsAll(task.changelogPartitions())) {
+                        try {
+                            task.completeRestoration(offsetResetter);
+                            task.clearTaskTimeout();
+                        } catch (final TimeoutException timeoutException) {
+                            task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+                            log.debug(
+                                String.format(
+                                    "Could not complete restoration for %s due 
to the following exception; will retry",
+                                    task.id()),
+                                timeoutException
+                            );
+
+                            allRunning = false;
+                        }
+                    } else {
+                        // we found a restoring task that isn't done 
restoring, which is evidence that
+                        // not all tasks are running
                         allRunning = false;
                     }
-                } else {
-                    // we found a restoring task that isn't done restoring, 
which is evidence that
-                    // not all tasks are running
-                    allRunning = false;
                 }
             }
+        } else {
+            for (final Task task : tasks.drainPendingTaskToRestore()) {
+                stateUpdater.add(task);

Review Comment:
   We need to remember to initialize the tasks before we add them to the state 
updater.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -480,6 +507,19 @@ private void closeAndRecycleTasks(final Map<Task, 
Set<TopicPartition>> tasksToRe
         }
     }
 
+    private void convertActiveToStandby(final StreamTask activeTask,
+                                        final Set<TopicPartition> partitions) {
+        final StandbyTask standbyTask = 
standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());

Review Comment:
   Out of curiosity, why do we manage the task producer in the active task 
creator instead of the task itself?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java:
##########
@@ -51,69 +39,16 @@ public class TasksTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
 
     private final LogContext logContext = new LogContext();
-    private final ActiveTaskCreator activeTaskCreator = 
mock(ActiveTaskCreator.class);
-    private final StandbyTaskCreator standbyTaskCreator = 
mock(StandbyTaskCreator.class);
-    private final StateUpdater stateUpdater = mock(StateUpdater.class);
-
-    private Consumer<byte[], byte[]> mainConsumer = null;
-
-    @Test
-    public void shouldCreateTasksWithStateUpdater() {
-        final Tasks tasks = new Tasks(logContext, activeTaskCreator, 
standbyTaskCreator, stateUpdater);
-        tasks.setMainConsumer(mainConsumer);
-        final StreamTask statefulTask = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).build();
-        final StandbyTask standbyTask = standbyTask(TASK_0_1, 
mkSet(TOPIC_PARTITION_A_1)).build();
-        final StreamTask statelessTask = statelessTask(TASK_1_0).build();
-        final Map<TaskId, Set<TopicPartition>> activeTasks = mkMap(
-            mkEntry(statefulTask.id(), statefulTask.changelogPartitions()),
-            mkEntry(statelessTask.id(), statelessTask.changelogPartitions())
-        );
-        final Map<TaskId, Set<TopicPartition>> standbyTasks =
-            mkMap(mkEntry(standbyTask.id(), 
standbyTask.changelogPartitions()));
-        when(activeTaskCreator.createTasks(mainConsumer, 
activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask));
-        
when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask));
-
-        tasks.createTasks(activeTasks, standbyTasks);
-
-        final Exception exceptionForStatefulTaskOnTask = 
assertThrows(IllegalStateException.class, () -> tasks.task(statefulTask.id()));
-        assertEquals("Task unknown: " + statefulTask.id(), 
exceptionForStatefulTaskOnTask.getMessage());
-        assertFalse(tasks.activeTasks().contains(statefulTask));
-        assertFalse(tasks.allTasks().contains(statefulTask));
-        final Exception exceptionForStatefulTaskOnTasks = 
assertThrows(IllegalStateException.class, () -> 
tasks.tasks(mkSet(statefulTask.id())));
-        assertEquals("Task unknown: " + statefulTask.id(), 
exceptionForStatefulTaskOnTasks.getMessage());
-        final Exception exceptionForStatelessTaskOnTask = 
assertThrows(IllegalStateException.class, () -> tasks.task(statelessTask.id()));
-        assertEquals("Task unknown: " + statelessTask.id(), 
exceptionForStatelessTaskOnTask.getMessage());
-        assertFalse(tasks.activeTasks().contains(statelessTask));
-        assertFalse(tasks.allTasks().contains(statelessTask));
-        final Exception exceptionForStatelessTaskOnTasks = 
assertThrows(IllegalStateException.class, () -> 
tasks.tasks(mkSet(statelessTask.id())));
-        assertEquals("Task unknown: " + statelessTask.id(), 
exceptionForStatelessTaskOnTasks.getMessage());
-        final Exception exceptionForStandbyTaskOnTask = 
assertThrows(IllegalStateException.class, () -> tasks.task(standbyTask.id()));
-        assertEquals("Task unknown: " + standbyTask.id(), 
exceptionForStandbyTaskOnTask.getMessage());
-        assertFalse(tasks.allTasks().contains(standbyTask));
-        final Exception exceptionForStandByTaskOnTasks = 
assertThrows(IllegalStateException.class, () -> 
tasks.tasks(mkSet(standbyTask.id())));
-        assertEquals("Task unknown: " + standbyTask.id(), 
exceptionForStandByTaskOnTasks.getMessage());
-        verify(activeTaskCreator).createTasks(mainConsumer, activeTasks);
-        verify(standbyTaskCreator).createTasks(standbyTasks);
-        verify(stateUpdater).add(statefulTask);
-    }
 
     @Test
-    public void shouldCreateTasksWithoutStateUpdater() {
-        final Tasks tasks = new Tasks(logContext, activeTaskCreator, 
standbyTaskCreator, null);
-        tasks.setMainConsumer(mainConsumer);
+    public void shouldCreateTasks() {

Review Comment:
   The names of the tests here and in `StandbyTaskTest` and `StreamTaskTest` do 
not reflect the content/renamed methods anymore.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -566,49 +566,9 @@ public void closeCleanAndRecycleState() {
         }
 
         closeTaskSensor.record();
-
         transitionTo(State.CLOSED);
 
-        log.info("Closed clean and recycled state");
-    }
-
-    /**
-     * Create a standby task from this active task without closing and 
re-initializing the state stores.
-     * The task should have been in suspended state when calling this function
-     *
-     * TODO: we should be able to not need the input partitions as input param 
in future but always reuse
-     *       the task's input partitions when we have fixed partitions -> 
tasks mapping
-     */
-    public StandbyTask recycle(final Set<TopicPartition> inputPartitions) {
-        if (state() != Task.State.CLOSED) {
-            throw new IllegalStateException("Attempted to convert an active 
task that's not closed: " + id);
-        }
-
-        if (!inputPartitions.equals(this.inputPartitions)) {
-            log.warn("Detected unmatched input partitions for task {} when 
recycling it from active to standby", id);
-        }
-
-        stateMgr.transitionTaskType(TaskType.STANDBY);
-
-        final ThreadCache dummyCache = new ThreadCache(
-            new LogContext(String.format("stream-thread [%s] ", 
Thread.currentThread().getName())),
-            0,
-            streamsMetrics
-        );
-
-        log.debug("Recycling active task {} to standby", id);
-
-        return new StandbyTask(
-            id,
-            inputPartitions,
-            topology,
-            config,
-            streamsMetrics,
-            stateMgr,
-            stateDirectory,
-            dummyCache,
-            processorContext
-        );
+        log.info("Closed and recycled state, and converted type to standby");

Review Comment:
   ```suggestion
           log.info("Closed and recycled state");
   ```
   Conversion is now done in `createStandbyTaskFromActive()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to