cadonna commented on code in PR #12743:
URL: https://github.com/apache/kafka/pull/12743#discussion_r994752848


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();

Review Comment:
   You do not need to call `expectLastCall()` with EasyMock. It is only needed 
when you use it as follows `expectLastCall().times(5)`, i.e, when you need to 
apply something on the last call.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)

Review Comment:
   If `statefulTask0` is contained in the `TaskRegistry` (i.e., `TaskManager`), 
the returned state should be `RUNNING`. Task in state `RESTORING` should only 
be contained in the state updater. This does not change anything in the context 
of this test, but I think it is a good thing to ensure this invariants also in 
test setups. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -177,6 +177,14 @@ void handleRebalanceComplete() {
         // before then the assignment has not been updated yet.
         mainConsumer.pause(mainConsumer.assignment());
 
+        if (stateUpdater != null) {
+            // All tasks that need restoration are now owned by the state 
updater.
+            // All tasks that are owned by the task manager are ready and can 
be resumed immediately.
+            for (final Task t : tasks.allTasks()) {
+                mainConsumer.resume(t.inputPartitions());
+            }
+        }

Review Comment:
   Could we do something like:
   
   ```
   final Set<TopicPartition> partitionsNotToPause = 
tasks.allTasks().stream().flatMap(task -> 
task.inputPartitions().stream()).collect(Collectors.toSet());
   mainConsumer.pause(mainConsumer.assignment().stream().filter(partition -> 
!partitionsNotToPause.contains(partition)).collect(Collectors.toSet()));
   ```
   or
   
   ```
   final Set<TopicPartition> partitionsNotToPause = 
tasks.allTasks().stream().flatMap(task -> 
task.inputPartitions().stream()).collect(Collectors.toSet());
   final Set<TopicPartition> partitionsToPause = new 
HashSet<>(mainConsumer.assignment());
   partitionsToPause.removeAll(partitionsNotToPause);
   mainConsumer.pause(partitionsToPause);        
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        consumer.resume(mkSet(t1p0));
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();

Review Comment:
   nit: Could you add an empty line before and after this line so that we have 
a block for setup, a block for the call under test, and a block for 
verifications? 
   The same comment applies to the test above.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        consumer.resume(mkSet(t1p0));
+        expectLastCall();

Review Comment:
   See my comment above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);

Review Comment:
   Here you should use the `TaskRegistry` mock instead like this:
   ```
   final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
   final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
   when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();

Review Comment:
   See my comment above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+

Review Comment:
   nit:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to