cadonna commented on code in PR #12743:
URL: https://github.com/apache/kafka/pull/12743#discussion_r994752848
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
Review Comment:
You do not need to call `expectLastCall()` with EasyMock. It is only needed
when you use it as follows `expectLastCall().times(5)`, i.e, when you need to
apply something on the last call.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
+ verify(consumer);
+ }
+
+ @Test
+ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ final StreamTask statefulTask0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
Review Comment:
If `statefulTask0` is contained in the `TaskRegistry` (i.e., `TaskManager`),
the returned state should be `RUNNING`. Task in state `RESTORING` should only
be contained in the state updater. This does not change anything in the context
of this test, but I think it is a good thing to ensure this invariants also in
test setups.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -177,6 +177,14 @@ void handleRebalanceComplete() {
// before then the assignment has not been updated yet.
mainConsumer.pause(mainConsumer.assignment());
+ if (stateUpdater != null) {
+ // All tasks that need restoration are now owned by the state
updater.
+ // All tasks that are owned by the task manager are ready and can
be resumed immediately.
+ for (final Task t : tasks.allTasks()) {
+ mainConsumer.resume(t.inputPartitions());
+ }
+ }
Review Comment:
Could we do something like:
```
final Set<TopicPartition> partitionsNotToPause =
tasks.allTasks().stream().flatMap(task ->
task.inputPartitions().stream()).collect(Collectors.toSet());
mainConsumer.pause(mainConsumer.assignment().stream().filter(partition ->
!partitionsNotToPause.contains(partition)).collect(Collectors.toSet()));
```
or
```
final Set<TopicPartition> partitionsNotToPause =
tasks.allTasks().stream().flatMap(task ->
task.inputPartitions().stream()).collect(Collectors.toSet());
final Set<TopicPartition> partitionsToPause = new
HashSet<>(mainConsumer.assignment());
partitionsToPause.removeAll(partitionsNotToPause);
mainConsumer.pause(partitionsToPause);
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
+ verify(consumer);
+ }
+
+ @Test
+ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ final StreamTask statefulTask0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ taskManager.addTask(statefulTask0);
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ consumer.resume(mkSet(t1p0));
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
Review Comment:
nit: Could you add an empty line before and after this line so that we have
a block for setup, a block for the call under test, and a block for
verifications?
The same comment applies to the test above.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
+ verify(consumer);
+ }
+
+ @Test
+ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ final StreamTask statefulTask0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ taskManager.addTask(statefulTask0);
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ consumer.resume(mkSet(t1p0));
+ expectLastCall();
Review Comment:
See my comment above
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
+ verify(consumer);
+ }
+
+ @Test
+ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ final StreamTask statefulTask0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ taskManager.addTask(statefulTask0);
Review Comment:
Here you should use the `TaskRegistry` mock instead like this:
```
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
+ verify(consumer);
+ }
+
+ @Test
+ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ final StreamTask statefulTask0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ taskManager.addTask(statefulTask0);
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
Review Comment:
See my comment above
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
assertThat(taskManager.lockedTaskDirectories(),
is(singleton(taskId01)));
}
+ @Test
+ public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+ expect(consumer.assignment()).andReturn(assigned);
+ consumer.pause(assigned);
+ expectLastCall();
+ replay(consumer);
+ taskManager.handleRebalanceComplete();
+ verify(consumer);
+ }
+
+ @Test
+ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+ taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ final StreamTask statefulTask0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
+ taskManager.addTask(statefulTask0);
+ final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
Review Comment:
nit:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]