divijvaidya commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1234973245
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTransitionToRunningOfRestoredTask(task, tasks);
- consumer.resume(task.inputPartitions());
- replay(consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Mockito.verify(task).completeRestoration(noOpResetter);
Mockito.verify(task).clearTaskTimeout();
Mockito.verify(tasks).addTask(task);
- verify(consumer);
+ Mockito.verify(consumer).resume(task.inputPartitions());
Review Comment:
could you please add `verifyNoMoreInteraction`for consumer mock here. Asking
because looks like there should be more than one invocation of
consumer.resume() in this test but we are only testing for one.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
assertEquals(taskManager.notPausedTasks().size(), 0);
}
- private static void expectRestoreToBeCompleted(final Consumer<byte[],
byte[]> consumer) {
+ private static void expectAssignmentToBeCalled(final Consumer<byte[],
byte[]> consumer) {
final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
- expect(consumer.assignment()).andReturn(assignment);
- consumer.resume(assignment);
- expectLastCall();
+ when(consumer.assignment()).thenReturn(assignment);
+ }
+
+ private static void verifyResumeWasCalled(final Consumer<byte[], byte[]>
consumer) {
+ final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
+ Mockito.verify(consumer, atLeastOnce()).resume(assignment);
+ }
+
+ private static void verifyResumeWasCalledWith(final Consumer<byte[],
byte[]> consumer, Set<TopicPartition> assignment) {
Review Comment:
s/verifyResumeWasCalledWith /verifyResumeWasCalledWithAssignment
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
assertEquals(taskManager.notPausedTasks().size(), 0);
}
- private static void expectRestoreToBeCompleted(final Consumer<byte[],
byte[]> consumer) {
+ private static void expectAssignmentToBeCalled(final Consumer<byte[],
byte[]> consumer) {
final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
- expect(consumer.assignment()).andReturn(assignment);
- consumer.resume(assignment);
- expectLastCall();
+ when(consumer.assignment()).thenReturn(assignment);
+ }
+
+ private static void verifyResumeWasCalled(final Consumer<byte[], byte[]>
consumer) {
+ final Set<TopicPartition> assignment = singleton(new
TopicPartition("assignment", 0));
+ Mockito.verify(consumer, atLeastOnce()).resume(assignment);
Review Comment:
why are we checking for atLeastOnce and not the exact times? Isn't this
relaxing constrains from what we were doing with easy mock?
(same for verifyResumeWasCalledWith)
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void
shouldHandleMultipleRemovedTasksFromStateUpdater() {
.thenReturn(convertedTask1);
when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0,
taskId00Partitions))
.thenReturn(convertedTask0);
- expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
Review Comment:
was this an unnecessary stub? asking because I didn't see mockito stub for
consumer.assignment().
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
// `handleAssignment`
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
- // `tryToCompleteRestoration`
- expect(consumer.assignment()).andReturn(emptySet());
- consumer.resume(eq(emptySet()));
- expectLastCall();
-
- // `shutdown`
- consumer.commitSync(Collections.emptyMap());
Review Comment:
verify
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
.thenReturn(singletonList(task10));
final ConsumerGroupMetadata groupMetadata = new
ConsumerGroupMetadata("appId");
- expect(consumer.groupMetadata()).andReturn(groupMetadata);
- producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
- expectLastCall();
-
- task00.committedOffsets();
- EasyMock.expectLastCall();
- task01.committedOffsets();
- EasyMock.expectLastCall();
- task02.committedOffsets();
- EasyMock.expectLastCall();
- task10.committedOffsets();
- EasyMock.expectLastCall();
Review Comment:
verify
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() {
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
- expectRestoreToBeCompleted(consumer);
+ expectAssignmentToBeCalled(consumer);
when(activeTaskCreator.createTasks(any(),
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
- consumer.commitSync(offsets);
Review Comment:
verify this?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() {
mkEntry(taskId05, taskId05Partitions)
);
- expectRestoreToBeCompleted(consumer);
+ expectAssignmentToBeCalled(consumer);
when(activeTaskCreator.createTasks(any(),
Mockito.eq(assignmentActive)))
.thenReturn(Arrays.asList(task00, task01, task02));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(Arrays.asList(task03, task04, task05));
- consumer.commitSync(eq(emptyMap()));
Review Comment:
verify
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]