guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r932696263


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java:
##########
@@ -51,69 +39,16 @@ public class TasksTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
 
     private final LogContext logContext = new LogContext();
-    private final ActiveTaskCreator activeTaskCreator = 
mock(ActiveTaskCreator.class);
-    private final StandbyTaskCreator standbyTaskCreator = 
mock(StandbyTaskCreator.class);
-    private final StateUpdater stateUpdater = mock(StateUpdater.class);
-
-    private Consumer<byte[], byte[]> mainConsumer = null;
-
-    @Test
-    public void shouldCreateTasksWithStateUpdater() {
-        final Tasks tasks = new Tasks(logContext, activeTaskCreator, 
standbyTaskCreator, stateUpdater);
-        tasks.setMainConsumer(mainConsumer);
-        final StreamTask statefulTask = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).build();
-        final StandbyTask standbyTask = standbyTask(TASK_0_1, 
mkSet(TOPIC_PARTITION_A_1)).build();
-        final StreamTask statelessTask = statelessTask(TASK_1_0).build();
-        final Map<TaskId, Set<TopicPartition>> activeTasks = mkMap(
-            mkEntry(statefulTask.id(), statefulTask.changelogPartitions()),
-            mkEntry(statelessTask.id(), statelessTask.changelogPartitions())
-        );
-        final Map<TaskId, Set<TopicPartition>> standbyTasks =
-            mkMap(mkEntry(standbyTask.id(), 
standbyTask.changelogPartitions()));
-        when(activeTaskCreator.createTasks(mainConsumer, 
activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask));
-        
when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask));
-
-        tasks.createTasks(activeTasks, standbyTasks);
-
-        final Exception exceptionForStatefulTaskOnTask = 
assertThrows(IllegalStateException.class, () -> tasks.task(statefulTask.id()));
-        assertEquals("Task unknown: " + statefulTask.id(), 
exceptionForStatefulTaskOnTask.getMessage());
-        assertFalse(tasks.activeTasks().contains(statefulTask));
-        assertFalse(tasks.allTasks().contains(statefulTask));
-        final Exception exceptionForStatefulTaskOnTasks = 
assertThrows(IllegalStateException.class, () -> 
tasks.tasks(mkSet(statefulTask.id())));
-        assertEquals("Task unknown: " + statefulTask.id(), 
exceptionForStatefulTaskOnTasks.getMessage());
-        final Exception exceptionForStatelessTaskOnTask = 
assertThrows(IllegalStateException.class, () -> tasks.task(statelessTask.id()));
-        assertEquals("Task unknown: " + statelessTask.id(), 
exceptionForStatelessTaskOnTask.getMessage());
-        assertFalse(tasks.activeTasks().contains(statelessTask));
-        assertFalse(tasks.allTasks().contains(statelessTask));
-        final Exception exceptionForStatelessTaskOnTasks = 
assertThrows(IllegalStateException.class, () -> 
tasks.tasks(mkSet(statelessTask.id())));
-        assertEquals("Task unknown: " + statelessTask.id(), 
exceptionForStatelessTaskOnTasks.getMessage());
-        final Exception exceptionForStandbyTaskOnTask = 
assertThrows(IllegalStateException.class, () -> tasks.task(standbyTask.id()));
-        assertEquals("Task unknown: " + standbyTask.id(), 
exceptionForStandbyTaskOnTask.getMessage());
-        assertFalse(tasks.allTasks().contains(standbyTask));
-        final Exception exceptionForStandByTaskOnTasks = 
assertThrows(IllegalStateException.class, () -> 
tasks.tasks(mkSet(standbyTask.id())));
-        assertEquals("Task unknown: " + standbyTask.id(), 
exceptionForStandByTaskOnTasks.getMessage());
-        verify(activeTaskCreator).createTasks(mainConsumer, activeTasks);
-        verify(standbyTaskCreator).createTasks(standbyTasks);
-        verify(stateUpdater).add(statefulTask);
-    }
 
     @Test
-    public void shouldCreateTasksWithoutStateUpdater() {
-        final Tasks tasks = new Tasks(logContext, activeTaskCreator, 
standbyTaskCreator, null);
-        tasks.setMainConsumer(mainConsumer);
+    public void shouldCreateTasks() {

Review Comment:
   ack.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2187,7 +2187,7 @@ public void 
shouldUnregisterMetricsInCloseCleanAndRecycleState() {
 
         task.suspend();
         assertThat(getTaskMetrics(), not(empty()));
-        task.recycleAndConvert();
+        task.prepareRecycle();

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to