This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 560ab2cc319 KAFKA-14133: Migrate ActiveTaskCreator mock in
TaskManagerTest to Mockito (#13681)
560ab2cc319 is described below
commit 560ab2cc319899d7ce1181408507061e90752d3e
Author: Mehari Beyene <[email protected]>
AuthorDate: Wed May 31 23:47:31 2023 -0700
KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
(#13681)
This pull requests migrates the ActiveTaskCreator mock in TaskManagerTest
from EasyMock to Mockito
The change is restricted to a single mock to minimize the scope and make it
easier for review.
Please see two examples that follow the same pattern below:
#13529
#13621
Reviewers: Divij Vaidya <[email protected]>, Manyanda Chitimbo
<[email protected]>, Christo Lolov <[email protected]>, Bruno Cadonna
<[email protected]>
---
.../processor/internals/TaskManagerTest.java | 613 ++++++++++-----------
1 file changed, 276 insertions(+), 337 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 48d046e1b90..3f0023971a5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -68,6 +68,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
+import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.io.File;
@@ -128,9 +129,11 @@ import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@RunWith(EasyMockRunner.class)
public class TaskManagerTest {
@@ -188,7 +191,7 @@ public class TaskManagerTest {
private ChangelogReader changeLogReader;
@Mock(type = MockType.STRICT)
private Consumer<byte[], byte[]> consumer;
- @Mock(type = MockType.STRICT)
+ @org.mockito.Mock
private ActiveTaskCreator activeTaskCreator;
@Mock(type = MockType.NICE)
private StandbyTaskCreator standbyTaskCreator;
@@ -291,16 +294,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(activeTaskToRecycle.id(),
activeTaskToRecycle.inputPartitions()))
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(),
activeTaskToRecycle.inputPartitions());
}
@@ -312,16 +315,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToRecycle));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(standbyTaskToRecycle.id(),
standbyTaskToRecycle.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(),
standbyTaskToRecycle.inputPartitions());
}
@@ -334,13 +337,13 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
}
@@ -353,13 +356,13 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
Mockito.verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
}
@@ -373,16 +376,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(),
newInputPartitions)),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
Mockito.verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(),
newInputPartitions);
}
@@ -395,16 +398,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(),
reassignedActiveTask.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
}
@Test
@@ -415,16 +418,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reAssignedRevokedActiveTask));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(reAssignedRevokedActiveTask.id(),
reAssignedRevokedActiveTask.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(tasks).removePendingActiveTaskToSuspend(reAssignedRevokedActiveTask.id());
}
@@ -437,16 +440,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(),
newInputPartitions))
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(stateUpdater,
never()).remove(standbyTaskToUpdateInputPartitions.id());
Mockito.verify(tasks, never())
.addPendingTaskToUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(),
newInputPartitions);
@@ -460,16 +463,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reAssignedStandbyTask));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(reAssignedStandbyTask.id(),
reAssignedStandbyTask.inputPartitions()))
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
}
@Test
@@ -483,16 +486,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose,
standbyTaskToRecycle));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(standbyTaskToRecycle.id(),
standbyTaskToRecycle.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
@@ -539,18 +542,15 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
- expect(activeTaskCreator.createTasks(consumer, mkMap(
- mkEntry(activeTaskToBeCreated.id(),
activeTaskToBeCreated.inputPartitions())))
- ).andReturn(createdTasks);
+ final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
+ mkEntry(activeTaskToBeCreated.id(),
activeTaskToBeCreated.inputPartitions()));
+ when(activeTaskCreator.createTasks(consumer,
tasksToBeCreated)).thenReturn(createdTasks);
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
- taskManager.handleAssignment(
- mkMap(mkEntry(activeTaskToBeCreated.id(),
activeTaskToBeCreated.inputPartitions())),
- Collections.emptyMap()
- );
+ taskManager.handleAssignment(tasksToBeCreated, Collections.emptyMap());
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
Mockito.verify(tasks).addPendingTaskToInit(createdTasks);
}
@@ -562,18 +562,18 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final Set<Task> createdTasks = mkSet(standbyTaskToBeCreated);
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(mkMap(
mkEntry(standbyTaskToBeCreated.id(),
standbyTaskToBeCreated.inputPartitions())))
).andReturn(createdTasks);
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToBeCreated.id(),
standbyTaskToBeCreated.inputPartitions()))
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(tasks).addPendingTaskToInit(createdTasks);
}
@@ -590,17 +590,17 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
expect(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
activeTaskToRecycle.inputPartitions()))
.andReturn(recycledStandbyTask);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(activeTaskToRecycle.id(),
activeTaskToRecycle.inputPartitions()))
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(activeTaskToRecycle).prepareCommit();
Mockito.verify(tasks).replaceActiveWithStandby(recycledStandbyTask);
}
@@ -613,7 +613,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -624,7 +624,8 @@ public class TaskManagerTest {
);
assertEquals(illegalStateException.getMessage(), "Standby tasks should
only be managed by the state updater");
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -635,14 +636,14 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
Mockito.verify(activeTaskToClose).prepareCommit();
Mockito.verify(activeTaskToClose).closeClean();
Mockito.verify(tasks).removeTask(activeTaskToClose);
@@ -656,7 +657,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -664,7 +665,8 @@ public class TaskManagerTest {
);
assertEquals(illegalStateException.getMessage(), "Standby tasks should
only be managed by the state updater");
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -677,16 +679,16 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(),
newInputPartitions)),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(activeTaskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(newInputPartitions),
any());
}
@@ -698,16 +700,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
}
@Test
@@ -718,16 +720,16 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
- expect(activeTaskCreator.createTasks(consumer,
Collections.emptyMap())).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(activeTaskToResume).resume();
Mockito.verify(stateUpdater).add(activeTaskToResume);
Mockito.verify(tasks).removeTask(activeTaskToResume);
@@ -742,7 +744,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -753,7 +755,8 @@ public class TaskManagerTest {
);
assertEquals(illegalStateException.getMessage(), "Standby tasks should
only be managed by the state updater");
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -767,20 +770,19 @@ public class TaskManagerTest {
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
- expect(activeTaskCreator.createTasks(
- consumer,
- mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions()))
- )).andReturn(emptySet());
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions())),
Collections.emptyMap()
);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(
+ consumer,
+ mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions()))
+ );
Mockito.verify(activeTaskToClose).closeClean();
}
@@ -845,17 +847,16 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
- expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01),
eq(taskId01Partitions), eq(consumer)))
- .andStubReturn(task01Converted);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().once();
+ when(activeTaskCreator.createActiveTaskFromStandby(task01,
taskId01Partitions,
+ consumer)).thenReturn(task01Converted);
expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00),
eq(taskId00Partitions)))
.andStubReturn(task00Converted);
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
Mockito.verify(task00).suspend();
Mockito.verify(task01).suspend();
Mockito.verify(task00Converted).initializeIfNeeded();
@@ -879,13 +880,10 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true);
when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true);
taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().once();
- replay(activeTaskCreator);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
Mockito.verify(task00).suspend();
Mockito.verify(task00).closeClean();
Mockito.verify(task01).suspend();
@@ -960,10 +958,8 @@ public class TaskManagerTest {
when(stateUpdater.drainRemovedTasks())
.thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose,
taskToUpdateInputPartitions));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-
expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1),
eq(taskId01Partitions), eq(consumer)))
- .andStubReturn(convertedTask1);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().times(2);
+ when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1,
taskId01Partitions, consumer))
+ .thenReturn(convertedTask1);
expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0),
eq(taskId00Partitions)))
.andStubReturn(convertedTask0);
expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
@@ -980,11 +976,12 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId04Partitions);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.setMainConsumer(consumer);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> {
});
- verify(activeTaskCreator, standbyTaskCreator, consumer);
+ verify(standbyTaskCreator, consumer);
+ Mockito.verify(activeTaskCreator,
times(2)).closeAndRemoveTaskProducerIfNeeded(any());
Mockito.verify(convertedTask0).initializeIfNeeded();
Mockito.verify(convertedTask1).initializeIfNeeded();
Mockito.verify(stateUpdater).add(convertedTask0);
@@ -1155,12 +1152,12 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
expect(standbyTaskCreator.createStandbyTaskFromActive(statefulTask,
statefulTask.inputPartitions()))
.andStubReturn(standbyTask);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Mockito.verify(statefulTask).suspend();
Mockito.verify(standbyTask).initializeIfNeeded();
Mockito.verify(stateUpdater).add(standbyTask);
@@ -1226,12 +1223,10 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpCloseCleanRestoredTask(statefulTask, tasks);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- replay(activeTaskCreator);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Mockito.verify(statefulTask).suspend();
Mockito.verify(statefulTask).closeClean();
Mockito.verify(statefulTask, never()).closeDirty();
@@ -1246,15 +1241,13 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpCloseCleanRestoredTask(statefulTask, tasks);
doThrow(RuntimeException.class).when(statefulTask).closeClean();
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- replay(activeTaskCreator);
assertThrows(
RuntimeException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter)
);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Mockito.verify(statefulTask).closeDirty();
Mockito.verify(tasks, never()).removeTask(statefulTask);
}
@@ -1266,16 +1259,15 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpCloseCleanRestoredTask(statefulTask, tasks);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- expectLastCall().andThrow(new RuntimeException("Something happened"));
- replay(activeTaskCreator);
+ final TaskId taskId = statefulTask.id();
+ doThrow(new RuntimeException("Something happened"))
+
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);
assertThrows(
RuntimeException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter)
);
- verify(activeTaskCreator);
Mockito.verify(statefulTask, never()).closeDirty();
Mockito.verify(tasks, never()).removeTask(statefulTask);
}
@@ -1300,13 +1292,11 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToCloseDirty(statefulTask.id())).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(activeTaskCreator);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Mockito.verify(statefulTask).prepareCommit();
Mockito.verify(statefulTask).suspend();
Mockito.verify(statefulTask).closeDirty();
@@ -1539,15 +1529,15 @@ public class TaskManagerTest {
mkEntry(taskId03, mkSet(t1p3)),
mkEntry(taskId04, mkSet(t1p4))
);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(activeTasksAssignment))).andStubReturn(emptyList());
expect(standbyTaskCreator.createTasks(eq(standbyTasksAssignment))).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(activeTasksAssignment,
standbyTasksAssignment);
Mockito.verify(topologyBuilder).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p1,
t1p2, t2p2)), Mockito.anyString());
Mockito.verify(topologyBuilder,
never()).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p3, t1p4)),
Mockito.anyString());
- verify(activeTaskCreator, standbyTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(any(),
Mockito.eq(activeTasksAssignment));
+ verify(standbyTaskCreator);
}
@Test
@@ -1739,9 +1729,9 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask uninitializedTask = new
StateMachineTask(taskId00, taskId00Partitions, true);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(uninitializedTask.state(), is(State.CREATED));
@@ -1765,9 +1755,9 @@ public class TaskManagerTest {
final StateMachineTask closedTask = new StateMachineTask(taskId00,
taskId00Partitions, true);
taskManager.handleRebalanceStart(singleton("topic"));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singleton(closedTask));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singleton(closedTask));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
closedTask.suspend();
@@ -1827,9 +1817,7 @@ public class TaskManagerTest {
// first `handleAssignment`
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(emptyMap()))).andStubReturn(emptyList());
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expectLastCall();
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
@@ -1841,7 +1829,7 @@ public class TaskManagerTest {
consumer.commitSync(offsets);
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -1854,6 +1842,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -1867,13 +1856,11 @@ public class TaskManagerTest {
// first `handleAssignment`
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(emptyMap()))).andStubReturn(emptyList());
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expectLastCall();
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
taskManager.handleRevocation(taskId00Partitions);
@@ -1889,6 +1876,7 @@ public class TaskManagerTest {
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -1898,7 +1886,7 @@ public class TaskManagerTest {
// `handleAssignment`
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
makeTaskFolders(taskId00.toString(), taskId01.toString());
@@ -1912,17 +1900,14 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(),
Matchers.is(mkSet(taskId00, taskId01)));
- // `handleLostAll`
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
- expectLastCall();
-
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
+ // `handleLostAll`
taskManager.handleLostAll();
assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CLOSED));
@@ -1936,20 +1921,16 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void
shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
- activeTaskCreator.reInitializeThreadProducer();
- expectLastCall();
-
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
- replay(activeTaskCreator);
-
taskManager.handleLostAll();
- verify(activeTaskCreator);
+ Mockito.verify(activeTaskCreator).reInitializeThreadProducer();
}
@Test
@@ -1960,16 +1941,15 @@ public class TaskManagerTest {
// `handleAssignment`
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
// `handleAssignment`
consumer.commitSync(offsets);
expectLastCall();
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
- expectLastCall().andThrow(new RuntimeException("KABOOM!"));
+ doThrow(new
RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2010,10 +1990,10 @@ public class TaskManagerTest {
// `handleAssignment`
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
expect(consumer.assignment()).andReturn(taskId00Partitions);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
tp -> assertThat(tp, is(empty()))), is(true));
@@ -2048,10 +2028,10 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
expect(consumer.assignment()).andReturn(taskId00Partitions);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
tp -> assertThat(tp, is(empty()))), is(true));
@@ -2082,8 +2062,8 @@ public class TaskManagerTest {
assignment.putAll(taskId01Assignment);
// `handleAssignment`
- expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
- .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(asList(corruptedTask, nonCorruptedTask));
expect(standbyTaskCreator.createTasks(anyObject()))
.andStubReturn(Collections.emptySet());
expectRestoreToBeCompleted(consumer);
@@ -2091,8 +2071,7 @@ public class TaskManagerTest {
// check that we should not commit empty map either
consumer.commitSync(eq(emptyMap()));
expectLastCall().andStubThrow(new AssertionError("should not invoke
commitSync when offset map is empty"));
-
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
tp -> assertThat(tp, is(empty()))), is(true));
@@ -2125,12 +2104,12 @@ public class TaskManagerTest {
assignment.putAll(taskId01Assignment);
// `handleAssignment`
- expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
- .andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
expect(standbyTaskCreator.createTasks(anyObject()))
.andStubReturn(Collections.emptySet());
expect(consumer.assignment()).andReturn(taskId00Partitions);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignment, emptyMap());
@@ -2141,7 +2120,6 @@ public class TaskManagerTest {
assertThat(nonRunningNonCorruptedTask.partitionsForOffsetReset,
equalTo(Collections.emptySet()));
assertThat(corruptedTask.partitionsForOffsetReset,
equalTo(taskId00Partitions));
- verify(activeTaskCreator);
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(consumer);
}
@@ -2162,11 +2140,12 @@ public class TaskManagerTest {
// handleAssignment
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId01Assignment)))
+ .thenReturn(singleton(runningNonCorruptedActive));
expectRestoreToBeCompleted(consumer);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2202,14 +2181,15 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
assignment.putAll(taskId00Assignment);
assignment.putAll(taskId01Assignment);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(asList(corruptedActive, uncorruptedActive));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
expectRestoreToBeCompleted(consumer);
expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
- replay(activeTaskCreator, standbyTaskCreator, consumer,
stateDirectory, stateManager);
+ replay(standbyTaskCreator, consumer, stateDirectory, stateManager);
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
@@ -2254,7 +2234,8 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
assignment.putAll(taskId00Assignment);
assignment.putAll(taskId01Assignment);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(asList(corruptedActive, uncorruptedActive));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
expectRestoreToBeCompleted(consumer);
@@ -2264,7 +2245,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2301,7 +2282,7 @@ public class TaskManagerTest {
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
{
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
- expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+ when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager =
EasyMock.createMock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new
AtomicBoolean(false);
@@ -2330,7 +2311,8 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
assignment.putAll(taskId00Assignment);
assignment.putAll(taskId01Assignment);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(corruptedActiveTask,
uncorruptedActiveTask));
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
expectRestoreToBeCompleted(consumer);
@@ -2342,7 +2324,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions));
- replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager);
+ replay(standbyTaskCreator, consumer, stateManager);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2412,15 +2394,15 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(asList(revokedActiveTask,
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
+ .thenReturn(asList(revokedActiveTask,
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall();
consumer.commitSync(expectedCommittedOffsets);
expectLastCall().andThrow(new TimeoutException());
expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2439,7 +2421,7 @@ public class TaskManagerTest {
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
{
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
- expect(activeTaskCreator.threadProducer()).andStubReturn(producer);
+ when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager =
EasyMock.createMock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2476,10 +2458,9 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask,
unrevokedActiveTaskWithoutCommitNeeded));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
+ .thenReturn(asList(revokedActiveTask, unrevokedActiveTask,
unrevokedActiveTaskWithoutCommitNeeded));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
- expectLastCall();
final ConsumerGroupMetadata groupMetadata = new
ConsumerGroupMetadata("appId");
expect(consumer.groupMetadata()).andReturn(groupMetadata);
@@ -2488,7 +2469,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
- replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager);
+ replay(standbyTaskCreator, consumer, stateManager);
taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2515,11 +2496,10 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
anyObject())).andStubReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andStubReturn(Collections.emptySet());
consumer.commitSync(Collections.emptyMap());
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(emptyMap(), taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2539,11 +2519,10 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
// expect these calls twice (because we're going to
tryToCompleteRestoration twice)
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andReturn(singletonList(task01)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2555,7 +2534,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
- verify(activeTaskCreator);
+ Mockito.verify(activeTaskCreator).createTasks(any(),
Mockito.eq(emptyMap()));
}
@Test
@@ -2565,10 +2544,9 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
// expect these calls twice (because we're going to
tryToCompleteRestoration twice)
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2580,7 +2558,8 @@ public class TaskManagerTest {
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertEquals(newPartitionsSet, task00.inputPartitions());
- verify(activeTaskCreator, consumer);
+ verify(consumer);
+ Mockito.verify(activeTaskCreator).createTasks(any(),
Mockito.eq(emptyMap()));
}
@Test
@@ -2591,9 +2570,9 @@ public class TaskManagerTest {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(consumer, activeTaskCreator, standbyTaskCreator);
+ replay(consumer, standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -2604,7 +2583,6 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(),
Matchers.equalTo(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator);
Mockito.verify(changeLogReader).enforceRestoreActive();
}
@@ -2632,9 +2610,9 @@ public class TaskManagerTest {
expect(consumer.assignment()).andReturn(emptySet());
consumer.resume(eq(emptySet()));
expectLastCall();
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(task00, task01));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(asList(task00, task01));
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(consumer, activeTaskCreator, standbyTaskCreator);
+ replay(consumer, standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -2650,7 +2628,6 @@ public class TaskManagerTest {
Matchers.equalTo(mkMap(mkEntry(taskId00, task00),
mkEntry(taskId01, task01)))
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator);
Mockito.verify(changeLogReader).enforceRestoreActive();
}
@@ -2672,9 +2649,9 @@ public class TaskManagerTest {
consumer.resume(eq(emptySet()));
expectLastCall();
expectLastCall();
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(consumer, activeTaskCreator, standbyTaskCreator);
+ replay(consumer, standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -2688,7 +2665,6 @@ public class TaskManagerTest {
Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator);
Mockito.verify(changeLogReader).enforceRestoreActive();
}
@@ -2699,12 +2675,12 @@ public class TaskManagerTest {
task00.setCommittableOffsetsAndMetadata(offsets);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
consumer.commitSync(offsets);
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2750,11 +2726,10 @@ public class TaskManagerTest {
);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive)))
- .andReturn(asList(task00, task01, task02));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
+ .thenReturn(asList(task00, task01, task02));
- expect(activeTaskCreator.threadProducer()).andReturn(producer);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
+ when(activeTaskCreator.threadProducer()).thenReturn(producer);
expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
.andReturn(singletonList(task10));
@@ -2772,7 +2747,7 @@ public class TaskManagerTest {
task10.committedOffsets();
EasyMock.expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2822,16 +2797,14 @@ public class TaskManagerTest {
);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive)))
- .andReturn(asList(task00, task01, task02));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
- expectLastCall();
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
+ .thenReturn(asList(task00, task01, task02));
expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
.andReturn(singletonList(task10));
consumer.commitSync(expectedCommittedOffsets);
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2864,12 +2837,11 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(singleton(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2896,12 +2868,11 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(singleton(task00));
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -2917,12 +2888,9 @@ public class TaskManagerTest {
public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
- expectLastCall().once();
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
@@ -2932,6 +2900,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
}
@Test
@@ -2945,9 +2914,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@@ -3016,14 +2985,10 @@ public class TaskManagerTest {
}
};
- expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
- .andStubReturn(asList(task00, task01, task02, task03));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().times(4);
- activeTaskCreator.closeThreadProducerIfNeeded();
- expectLastCall();
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(asList(task00, task01, task02, task03));
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -3068,8 +3033,9 @@ public class TaskManagerTest {
assertThat(task03.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+ Mockito.verify(activeTaskCreator,
times(4)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator);
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3087,13 +3053,11 @@ public class TaskManagerTest {
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(singletonList(task00));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
- expectLastCall().andThrow(new RuntimeException("whatever"));
- activeTaskCreator.closeThreadProducerIfNeeded();
- expectLastCall();
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+ doThrow(new RuntimeException("whatever"))
+
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -3120,8 +3084,9 @@ public class TaskManagerTest {
assertThat(exception.getCause().getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator);
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3137,13 +3102,10 @@ public class TaskManagerTest {
}
};
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(singletonList(task00));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
- expectLastCall();
- activeTaskCreator.closeThreadProducerIfNeeded();
- expectLastCall().andThrow(new RuntimeException("whatever"));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+ doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -3171,7 +3133,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -3223,7 +3185,6 @@ public class TaskManagerTest {
taskManager.addTask(task01);
taskManager.addTask(task02);
- replay(activeTaskCreator);
final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleRevocation(union(HashSet::new,
taskId01Partitions, taskId02Partitions)));
@@ -3233,7 +3194,7 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.SUSPENDED));
assertThat(task02.state(), is(Task.State.SUSPENDED));
- verify(activeTaskCreator);
+ Mockito.verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -3265,13 +3226,11 @@ public class TaskManagerTest {
}
};
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(task00, task01, task02));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().andThrow(new RuntimeException("whatever")).times(3);
- activeTaskCreator.closeThreadProducerIfNeeded();
- expectLastCall().andThrow(new RuntimeException("whatever all"));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(asList(task00, task01, task02));
+ doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.any());
+ doThrow(new RuntimeException("whatever
all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
- replay(activeTaskCreator, standbyTaskCreator);
+ replay(standbyTaskCreator);
taskManager.handleAssignment(assignment, emptyMap());
@@ -3305,8 +3264,9 @@ public class TaskManagerTest {
assertThat(task02.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+ Mockito.verify(activeTaskCreator,
times(3)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator);
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3315,7 +3275,6 @@ public class TaskManagerTest {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions,
false);
// `handleAssignment`
- expect(activeTaskCreator.createTasks(anyObject(),
anyObject())).andStubReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(assignment))).andStubReturn(singletonList(task00));
// `tryToCompleteRestoration`
@@ -3326,10 +3285,8 @@ public class TaskManagerTest {
// `shutdown`
consumer.commitSync(Collections.emptyMap());
expectLastCall();
- activeTaskCreator.closeThreadProducerIfNeeded();
- expectLastCall();
- replay(consumer, activeTaskCreator, standbyTaskCreator);
+ replay(consumer, standbyTaskCreator);
taskManager.handleAssignment(emptyMap(), assignment);
assertThat(task00.state(), is(Task.State.CREATED));
@@ -3344,7 +3301,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator);
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3359,14 +3316,12 @@ public class TaskManagerTest {
new ExceptionAndTasks(mkSet(failedStatefulTask), new
RuntimeException()),
new ExceptionAndTasks(mkSet(failedStandbyTask), new
RuntimeException()))
);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
- activeTaskCreator.closeThreadProducerIfNeeded();
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(activeTaskCreator);
taskManager.shutdown(true);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
Mockito.verify(failedStatefulTask).prepareCommit();
Mockito.verify(failedStatefulTask).suspend();
@@ -3384,15 +3339,13 @@ public class TaskManagerTest {
final Set<Task> restoredTasks = restoredActiveTasks.stream().map(t ->
(Task) t).collect(Collectors.toSet());
when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks);
when(tasks.activeTasks()).thenReturn(restoredTasks);
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
- activeTaskCreator.closeThreadProducerIfNeeded();
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(activeTaskCreator);
taskManager.shutdown(true);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
Mockito.verify(tasks).addActiveTasks(restoredTasks);
Mockito.verify(statefulTask1).closeClean();
@@ -3409,14 +3362,12 @@ public class TaskManagerTest {
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask,
removedStatefulTask));
when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask,
removedStandbyTask));
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
- activeTaskCreator.closeThreadProducerIfNeeded();
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(activeTaskCreator);
taskManager.shutdown(true);
- verify(activeTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
+ Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
Mockito.verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
Mockito.verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
@@ -3428,10 +3379,10 @@ public class TaskManagerTest {
public void shouldInitializeNewActiveTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment)))
- .andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment)))
+ .thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3448,10 +3399,9 @@ public class TaskManagerTest {
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
anyObject())).andStubReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(emptyMap(), taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3484,14 +3434,14 @@ public class TaskManagerTest {
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment)))
- .andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment)))
+ .thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment)))
.andStubReturn(singletonList(task01));
consumer.commitSync(offsets);
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3528,14 +3478,14 @@ public class TaskManagerTest {
);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive)))
- .andStubReturn(Arrays.asList(task00, task01, task02));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
+ .thenReturn(Arrays.asList(task00, task01, task02));
expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
.andStubReturn(Arrays.asList(task03, task04, task05));
consumer.commitSync(eq(emptyMap()));
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3562,11 +3512,10 @@ public class TaskManagerTest {
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, false);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
anyObject())).andStubReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00));
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3587,12 +3536,12 @@ public class TaskManagerTest {
makeTaskFolders(taskId00.toString(), task01.toString());
expectLockObtainedFor(taskId00, taskId01);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment)))
- .andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment)))
+ .thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment)))
.andStubReturn(singletonList(task01));
- replay(activeTaskCreator, standbyTaskCreator, stateDirectory,
consumer);
+ replay(standbyTaskCreator, stateDirectory, consumer);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3636,9 +3585,8 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaProducerIfEosAlphaEnabled() {
final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
- .andReturn(producer)
- .andReturn(producer);
+ when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+ .thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 =
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -3654,7 +3602,7 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
- expect(activeTaskCreator.threadProducer()).andReturn(producer);
+ when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 =
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -3685,7 +3633,7 @@ public class TaskManagerTest {
reset(consumer);
expect(consumer.groupMetadata()).andStubReturn(new
ConsumerGroupMetadata("appId"));
- replay(activeTaskCreator, consumer, producer);
+ replay(consumer, producer);
taskManager.commitAll();
@@ -3702,9 +3650,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3728,10 +3676,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
anyObject())).andStubReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(emptyMap(), taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3763,10 +3710,10 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3799,9 +3746,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3831,7 +3778,7 @@ public class TaskManagerTest {
futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
expect(adminClient.deleteRecords(anyObject())).andReturn(deleteRecordsResult).times(2);
- replay(activeTaskCreator, adminClient, consumer);
+ replay(adminClient, consumer);
taskManager.addTask(task00);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3876,14 +3823,14 @@ public class TaskManagerTest {
);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive)))
- .andStubReturn(asList(task00, task01, task02, task03));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
+ .thenReturn(asList(task00, task01, task02, task03));
expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
.andStubReturn(singletonList(task04));
consumer.commitSync(expectedCommittedOffsets);
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3920,9 +3867,10 @@ public class TaskManagerTest {
assignment.put(taskId01, taskId01Partitions);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(Arrays.asList(task00, task01));
+ when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ .thenReturn(Arrays.asList(task00, task01));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -3955,7 +3903,7 @@ public class TaskManagerTest {
// check that we should be processing at most max num records
assertThat(taskManager.process(3, time), is(6));
- // check that if there's no records proccssible, we would stop early
+ // check that if there's no records processable, we would stop early
assertThat(taskManager.process(3, time), is(5));
assertThat(taskManager.process(3, time), is(0));
}
@@ -4033,9 +3981,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -4058,10 +4006,10 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment)))
- .andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment)))
+ .thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -4087,9 +4035,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -4109,9 +4057,9 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -4136,11 +4084,10 @@ public class TaskManagerTest {
};
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment)))
- .andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject()))
.andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -4160,9 +4107,9 @@ public class TaskManagerTest {
}
};
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(false));
@@ -4179,12 +4126,12 @@ public class TaskManagerTest {
task00.setCommittableOffsetsAndMetadata(offsets);
expectRestoreToBeCompleted(consumer);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
consumer.commitSync(offsets);
expectLastCall();
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TaskManager.class)) {
LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
@@ -4311,8 +4258,7 @@ public class TaskManagerTest {
new MockTime());
final Map<MetricName, Metric> dummyProducerMetrics =
singletonMap(testMetricName, testMetric);
-
expect(activeTaskCreator.producerMetrics()).andReturn(dummyProducerMetrics);
- replay(activeTaskCreator);
+
when(activeTaskCreator.producerMetrics()).thenReturn(dummyProducerMetrics);
assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics));
}
@@ -4339,10 +4285,10 @@ public class TaskManagerTest {
allActiveTasks.addAll(restoringTasks);
expect(standbyTaskCreator.createTasks(eq(standbyAssignment))).andStubReturn(standbyTasks);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks);
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
expectRestoreToBeCompleted(consumer);
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(allActiveTasksAssignment,
standbyAssignment);
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
@@ -4448,10 +4394,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
final StreamsProducer producer = mock(StreamsProducer.class);
-
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
- .andReturn(producer)
- .andReturn(producer)
- .andReturn(producer);
+
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@@ -4473,7 +4416,7 @@ public class TaskManagerTest {
when(tasks.allTasks()).thenReturn(mkSet(task00, task01, task02));
expect(consumer.groupMetadata()).andStubReturn(null);
- replay(activeTaskCreator, consumer);
+ replay(consumer);
task00.setCommitNeeded();
task01.setCommitNeeded();
@@ -4493,9 +4436,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
- expect(activeTaskCreator.threadProducer())
- .andReturn(producer)
- .andReturn(producer);
+ when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@@ -4511,7 +4452,7 @@ public class TaskManagerTest {
final StateMachineTask task02 = new StateMachineTask(taskId02,
taskId02Partitions, true);
expect(consumer.groupMetadata()).andStubReturn(null);
- replay(activeTaskCreator, consumer);
+ replay(consumer);
task00.setCommitNeeded();
task01.setCommitNeeded();
@@ -4582,9 +4523,9 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = new
HashMap<>(taskId00Assignment);
assignment.putAll(taskId01Assignment);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andReturn(asList(task00, task01));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignment))).thenReturn(asList(task00, task01));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- replay(activeTaskCreator, standbyTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(assignment, Collections.emptyMap());
@@ -4608,21 +4549,20 @@ public class TaskManagerTest {
final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class);
expect(standbyTask.id()).andStubReturn(taskId00);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(activeTask));
+ when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
activeTask.prepareRecycle();
expectLastCall().once();
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
- expectLastCall().anyTimes();
expect(standbyTaskCreator.createStandbyTaskFromActive(anyObject(),
eq(taskId00Partitions))).andReturn(standbyTask);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
- replay(activeTask, standbyTask, activeTaskCreator, standbyTaskCreator,
consumer);
+ replay(activeTask, standbyTask, standbyTaskCreator, consumer);
taskManager.handleAssignment(taskId00Assignment,
Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
- verify(activeTaskCreator, standbyTaskCreator);
+ verify(standbyTaskCreator);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ Mockito.verify(activeTaskCreator).createTasks(any(),
Mockito.eq(emptyMap()));
}
@Test
@@ -4635,19 +4575,18 @@ public class TaskManagerTest {
final StreamTask activeTask = mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
- expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask),
eq(taskId00Partitions), anyObject())).andReturn(activeTask);
- expect(activeTaskCreator.createTasks(anyObject(),
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask),
Mockito.eq(taskId00Partitions), any()))
+ .thenReturn(activeTask);
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
- replay(standbyTaskCreator, activeTaskCreator, consumer);
+ replay(standbyTaskCreator, consumer);
taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
taskManager.handleAssignment(taskId00Assignment,
Collections.emptyMap());
- verify(standbyTaskCreator, activeTaskCreator);
+ verify(standbyTaskCreator);
+ Mockito.verify(activeTaskCreator, times(2)).createTasks(any(),
Mockito.eq(emptyMap()));
}
@Test