cadonna commented on code in PR #12439: URL: https://github.com/apache/kafka/pull/12439#discussion_r931497377
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java: ########## @@ -233,44 +234,7 @@ public void closeCleanAndRecycleState() { closeTaskSensor.record(); transitionTo(State.CLOSED); - log.info("Closed clean and recycled state"); - } - - /** - * Create an active task from this standby task without closing and re-initializing the state stores. - * The task should have been in suspended state when calling this function - * - * TODO: we should be able to not need the input partitions as input param in future but always reuse - * the task's input partitions when we have fixed partitions -> tasks mapping - */ - public StreamTask recycle(final Time time, - final ThreadCache cache, - final RecordCollector recordCollector, - final Set<TopicPartition> inputPartitions, - final Consumer<byte[], byte[]> mainConsumer) { - if (!inputPartitions.equals(this.inputPartitions)) { - log.warn("Detected unmatched input partitions for task {} when recycling it from active to standby", id); - } - - stateMgr.transitionTaskType(TaskType.ACTIVE); - - log.debug("Recycling standby task {} to active", id); - - return new StreamTask( - id, - inputPartitions, - topology, - mainConsumer, - config, - streamsMetrics, - stateDirectory, - cache, - time, - stateMgr, - recordCollector, - processorContext, - logContext - ); + log.info("Closed and recycled state, and converted type to active"); Review Comment: ```suggestion log.info("Closed and recycled state"); ``` Conversion is now done in `createActiveTaskFromStandby()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2187,7 +2187,7 @@ public void shouldUnregisterMetricsInCloseCleanAndRecycleState() { task.suspend(); assertThat(getTaskMetrics(), not(empty())); - task.recycleAndConvert(); + task.prepareRecycle(); Review Comment: The following comment is not related to this PR, but I want to mention it nevertheless for awareness. In `StreamTaskTest` and `StandbyTaskTest` we have some gaps in testing `prepareRecycle()`. Most notably, we do not verify the state change to CLOSE and the recording of the `closeTaskSensor`. However, verification of this behavior is important for refactorings. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -528,55 +530,61 @@ private void convertStandbyToActive(final StandbyTask standbyTask, boolean tryToCompleteRestoration(final long now, final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) { boolean allRunning = true; - final List<Task> activeTasks = new LinkedList<>(); - for (final Task task : tasks.allTasks()) { - try { - if (task.initializeIfNeeded() && stateUpdater != null) { - stateUpdater.add(task); + if (stateUpdater == null) { + final List<Task> activeTasks = new LinkedList<>(); + for (final Task task : tasks.allTasks()) { + try { + task.initializeIfNeeded(); + task.clearTaskTimeout(); + } catch (final LockException lockException) { + // it is possible that if there are multiple threads within the instance that one thread + // trying to grab the task from the other, while the other has not released the lock since + // it did not participate in the rebalance. In this case we can just retry in the next iteration + log.debug("Could not initialize task {} since: {}; will retry", task.id(), lockException.getMessage()); + allRunning = false; + } catch (final TimeoutException timeoutException) { + task.maybeInitTaskTimeoutOrThrow(now, timeoutException); + allRunning = false; } - task.clearTaskTimeout(); - } catch (final LockException lockException) { - // it is possible that if there are multiple threads within the instance that one thread - // trying to grab the task from the other, while the other has not released the lock since - // it did not participate in the rebalance. In this case we can just retry in the next iteration - log.debug("Could not initialize task {} since: {}; will retry", task.id(), lockException.getMessage()); - allRunning = false; - } catch (final TimeoutException timeoutException) { - task.maybeInitTaskTimeoutOrThrow(now, timeoutException); - allRunning = false; - } - if (task.isActive()) { - activeTasks.add(task); + if (task.isActive()) { + activeTasks.add(task); + } } - } - - if (allRunning && !activeTasks.isEmpty()) { - - final Set<TopicPartition> restored = changelogReader.completedChangelogs(); - - for (final Task task : activeTasks) { - if (restored.containsAll(task.changelogPartitions())) { - try { - task.completeRestoration(offsetResetter); - task.clearTaskTimeout(); - } catch (final TimeoutException timeoutException) { - task.maybeInitTaskTimeoutOrThrow(now, timeoutException); - log.debug( - String.format( - "Could not complete restoration for %s due to the following exception; will retry", - task.id()), - timeoutException - ); + if (allRunning && !activeTasks.isEmpty()) { + + final Set<TopicPartition> restored = changelogReader.completedChangelogs(); + + for (final Task task : activeTasks) { + if (restored.containsAll(task.changelogPartitions())) { + try { + task.completeRestoration(offsetResetter); + task.clearTaskTimeout(); + } catch (final TimeoutException timeoutException) { + task.maybeInitTaskTimeoutOrThrow(now, timeoutException); + log.debug( + String.format( + "Could not complete restoration for %s due to the following exception; will retry", + task.id()), + timeoutException + ); + + allRunning = false; + } + } else { + // we found a restoring task that isn't done restoring, which is evidence that + // not all tasks are running allRunning = false; } - } else { - // we found a restoring task that isn't done restoring, which is evidence that - // not all tasks are running - allRunning = false; } } + } else { + for (final Task task : tasks.drainPendingTaskToRestore()) { + stateUpdater.add(task); Review Comment: We need to remember to initialize the tasks before we add them to the state updater. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -480,6 +507,19 @@ private void closeAndRecycleTasks(final Map<Task, Set<TopicPartition>> tasksToRe } } + private void convertActiveToStandby(final StreamTask activeTask, + final Set<TopicPartition> partitions) { + final StandbyTask standbyTask = standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions); + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id()); Review Comment: Out of curiosity, why do we manage the task producer in the active task creator instead of the task itself? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java: ########## @@ -51,69 +39,16 @@ public class TasksTest { private final static TaskId TASK_1_0 = new TaskId(1, 0); private final LogContext logContext = new LogContext(); - private final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class); - private final StandbyTaskCreator standbyTaskCreator = mock(StandbyTaskCreator.class); - private final StateUpdater stateUpdater = mock(StateUpdater.class); - - private Consumer<byte[], byte[]> mainConsumer = null; - - @Test - public void shouldCreateTasksWithStateUpdater() { - final Tasks tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, stateUpdater); - tasks.setMainConsumer(mainConsumer); - final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build(); - final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build(); - final StreamTask statelessTask = statelessTask(TASK_1_0).build(); - final Map<TaskId, Set<TopicPartition>> activeTasks = mkMap( - mkEntry(statefulTask.id(), statefulTask.changelogPartitions()), - mkEntry(statelessTask.id(), statelessTask.changelogPartitions()) - ); - final Map<TaskId, Set<TopicPartition>> standbyTasks = - mkMap(mkEntry(standbyTask.id(), standbyTask.changelogPartitions())); - when(activeTaskCreator.createTasks(mainConsumer, activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask)); - when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask)); - - tasks.createTasks(activeTasks, standbyTasks); - - final Exception exceptionForStatefulTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(statefulTask.id())); - assertEquals("Task unknown: " + statefulTask.id(), exceptionForStatefulTaskOnTask.getMessage()); - assertFalse(tasks.activeTasks().contains(statefulTask)); - assertFalse(tasks.allTasks().contains(statefulTask)); - final Exception exceptionForStatefulTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(statefulTask.id()))); - assertEquals("Task unknown: " + statefulTask.id(), exceptionForStatefulTaskOnTasks.getMessage()); - final Exception exceptionForStatelessTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(statelessTask.id())); - assertEquals("Task unknown: " + statelessTask.id(), exceptionForStatelessTaskOnTask.getMessage()); - assertFalse(tasks.activeTasks().contains(statelessTask)); - assertFalse(tasks.allTasks().contains(statelessTask)); - final Exception exceptionForStatelessTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(statelessTask.id()))); - assertEquals("Task unknown: " + statelessTask.id(), exceptionForStatelessTaskOnTasks.getMessage()); - final Exception exceptionForStandbyTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(standbyTask.id())); - assertEquals("Task unknown: " + standbyTask.id(), exceptionForStandbyTaskOnTask.getMessage()); - assertFalse(tasks.allTasks().contains(standbyTask)); - final Exception exceptionForStandByTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(standbyTask.id()))); - assertEquals("Task unknown: " + standbyTask.id(), exceptionForStandByTaskOnTasks.getMessage()); - verify(activeTaskCreator).createTasks(mainConsumer, activeTasks); - verify(standbyTaskCreator).createTasks(standbyTasks); - verify(stateUpdater).add(statefulTask); - } @Test - public void shouldCreateTasksWithoutStateUpdater() { - final Tasks tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, null); - tasks.setMainConsumer(mainConsumer); + public void shouldCreateTasks() { Review Comment: The names of the tests here and in `StandbyTaskTest` and `StreamTaskTest` do not reflect the content/renamed methods anymore. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -566,49 +566,9 @@ public void closeCleanAndRecycleState() { } closeTaskSensor.record(); - transitionTo(State.CLOSED); - log.info("Closed clean and recycled state"); - } - - /** - * Create a standby task from this active task without closing and re-initializing the state stores. - * The task should have been in suspended state when calling this function - * - * TODO: we should be able to not need the input partitions as input param in future but always reuse - * the task's input partitions when we have fixed partitions -> tasks mapping - */ - public StandbyTask recycle(final Set<TopicPartition> inputPartitions) { - if (state() != Task.State.CLOSED) { - throw new IllegalStateException("Attempted to convert an active task that's not closed: " + id); - } - - if (!inputPartitions.equals(this.inputPartitions)) { - log.warn("Detected unmatched input partitions for task {} when recycling it from active to standby", id); - } - - stateMgr.transitionTaskType(TaskType.STANDBY); - - final ThreadCache dummyCache = new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), - 0, - streamsMetrics - ); - - log.debug("Recycling active task {} to standby", id); - - return new StandbyTask( - id, - inputPartitions, - topology, - config, - streamsMetrics, - stateMgr, - stateDirectory, - dummyCache, - processorContext - ); + log.info("Closed and recycled state, and converted type to standby"); Review Comment: ```suggestion log.info("Closed and recycled state"); ``` Conversion is now done in `createStandbyTaskFromActive()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org