C0urante commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r998501094


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -197,10 +191,11 @@ public void setup() {
         workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
         workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, 
String.valueOf(enableTopicCreation));
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        pluginLoader = mock(PluginClassLoader.class);

Review Comment:
   This line can be removed; we already instantiate the loader before each test 
since it's annotated with `@Mock`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -176,17 +174,13 @@ public class ErrorHandlingTaskTest {
     private WorkerErrantRecordReporter workerErrantRecordReporter;
 
     private ErrorHandlingMetrics errorHandlingMetrics;
-
     private boolean enableTopicCreation;
-
+    

Review Comment:
   Can we revert unnecessary whitespace changes like this one?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -197,10 +191,11 @@ public void setup() {
         workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
         workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, 
String.valueOf(enableTopicCreation));
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        pluginLoader = mock(PluginClassLoader.class);
         workerConfig = new StandaloneConfig(workerProps);
         sourceConfig = new SourceConnectorConfig(plugins, 
sourceConnectorProps(TOPIC), true);
         errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
+

Review Comment:
   Can we revert unnecessary whitespace changes like this one?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -316,28 +290,29 @@ public void testErrorHandlingInSinkTasks() throws 
Exception {
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSinkTask(initialState, retryWithToleranceOperator);
 
-        expectInitializeTask();
-        expectTaskGetTopic(true);
 

Review Comment:
   Can remove this empty line :)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,72 +494,27 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
         assertEquals(expected, measured, 0.001d);
     }
 
-    private void expectInitializeTask() {
-        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectClose() {
-        producer.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
-
-        admin.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
+    private void verifyClose() throws IOException {
 
-        offsetReader.close();
-        EasyMock.expectLastCall();
-
-        offsetStore.stop();
-        EasyMock.expectLastCall();
-
-        try {
-            headerConverter.close();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        EasyMock.expectLastCall();
+        verify(producer).close(any(Duration.class));

Review Comment:
   Can remove this empty line :)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -438,30 +423,27 @@ public void 
testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
-
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
         expectTopicCreation(TOPIC);
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
 
-        PowerMock.replayAll();
+        when(producer.send(any(), any()))
+                .thenReturn(null)
+                .thenReturn(null);

Review Comment:
   Same comment RE not needing to set up these expectations.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -374,30 +350,40 @@ public void testErrorHandlingInSourceTasks() throws 
Exception {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
         expectTopicCreation(TOPIC);
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
 
-        PowerMock.replayAll();
+        when(producer.send(any(), any()))
+                .thenReturn(null)
+                .thenReturn(null);
 
         workerSourceTask.initialize(TASK_CONFIG);
         workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
+        verify(workerSourceTask, times(3)).isStopping();
+        verify(workerSourceTask).commitOffsets();
+
+        verify(offsetStore).start();
+
+        verify(sourceTask).initialize(any());
+        verify(sourceTask).start(any());
+
+        verify(sourceTask, times(2)).poll();
+
+        verify(producer, times(2)).send(any(), any());
+        assertEquals(null, producer.send(any()));

Review Comment:
   What's the point of this assertion? Aren't we the ones that control the 
behavior of the producer, since it's a mock?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,72 +494,27 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
         assertEquals(expected, measured, 0.001d);
     }
 
-    private void expectInitializeTask() {
-        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectClose() {
-        producer.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
-
-        admin.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
+    private void verifyClose() throws IOException {

Review Comment:
   This method appears to only apply to source tasks; can we rename it to 
something like `verifyCloseSource`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -482,6 +475,13 @@ private void assertSinkMetricValue(String name, double 
expected) {
         assertEquals(expected, measured, 0.001d);
     }
 
+    private void verifyInitializeTask() {

Review Comment:
   This method appears to only apply to sink tasks; can we rename it to 
something like `verifyInitializeSink`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,72 +494,27 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
         assertEquals(expected, measured, 0.001d);
     }
 
-    private void expectInitializeTask() {
-        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectClose() {
-        producer.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
-
-        admin.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
+    private void verifyClose() throws IOException {
 
-        offsetReader.close();
-        EasyMock.expectLastCall();
-
-        offsetStore.stop();
-        EasyMock.expectLastCall();
-
-        try {
-            headerConverter.close();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        EasyMock.expectLastCall();
+        verify(producer).close(any(Duration.class));
+        verify(admin).close(any(Duration.class));
+        verify(offsetReader).close();
+        verify(offsetStore).stop();
+        // headerConverter.close() can throw IOException
+        verify(headerConverter).close();
     }
 
     private void expectTopicCreation(String topic) {
         if (workerConfig.topicCreationEnable()) {

Review Comment:
   This isn't your fault, but this testing method is needlessly complicated and 
covers cases that will never occur.
   
   We can (and should) simplify it:
   ```java
       private void expectTopicCreation(String topic) {
           if (enableTopicCreation) {
               
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
               Set<String> created = Collections.singleton(topic);
               Set<String> existing = Collections.emptySet();
               TopicAdmin.TopicCreationResponse response = new 
TopicAdmin.TopicCreationResponse(created, existing);
               
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(response);
           }
       }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -438,30 +423,27 @@ public void 
testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
-
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
         expectTopicCreation(TOPIC);
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
 
-        PowerMock.replayAll();
+        when(producer.send(any(), any()))
+                .thenReturn(null)
+                .thenReturn(null);
 
         workerSourceTask.initialize(TASK_CONFIG);
         workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
+

Review Comment:
   We can remove this empty line :)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -438,30 +423,27 @@ public void 
testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
-
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
         expectTopicCreation(TOPIC);

Review Comment:
   Same thought RE verifying topic creation during the test (if topic creation 
is enabled).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -177,16 +180,14 @@ public class ErrorHandlingTaskTest {
 
     private ErrorHandlingMetrics errorHandlingMetrics;
 
-    private boolean enableTopicCreation;
+    private boolean enableTopicCreation = false;
 
+    private MockitoSession mockitoSession;
     @ParameterizedTest.Parameters
     public static Collection<Boolean> parameters() {
         return Arrays.asList(false, true);
     }
 
-    public ErrorHandlingTaskTest(boolean enableTopicCreation) {

Review Comment:
   This constructor is used to create parameterized tests (every `@Test` case 
is ran twice, once with `enableTopicCreation` set to true, and once with it set 
to false).
   
   To fix this:
   - Annotate the `ErrorHandlingTaskTest` class with 
`@RunWith(Parameterized.class)`, where the `Parameterized` class is 
`org.junit.runners.Parameterized`
   - Add this declaration at the top of the class:
   ```java
       @Rule
       public MockitoRule rule = 
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
   ```
   - Add the constructor back
   - Change the `parameters()` method to be annotated with 
`@Parameterized.Parameters` instead of `@ParameterizedTest.Parameters`
   



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,72 +502,29 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
         assertEquals(expected, measured, 0.001d);
     }
 
-    private void expectInitializeTask() {
-        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {

Review Comment:
   I don't see a corresponding `verify` check in the 
`testErrorHandlingInSinkTasks` case. Can we add one?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -374,30 +350,40 @@ public void testErrorHandlingInSourceTasks() throws 
Exception {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
         expectTopicCreation(TOPIC);
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
 
-        PowerMock.replayAll();
+        when(producer.send(any(), any()))
+                .thenReturn(null)
+                .thenReturn(null);

Review Comment:
   We don't have to set up these expectations any more; Mockito will 
automatically return `null` from this method.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -374,30 +350,40 @@ public void testErrorHandlingInSourceTasks() throws 
Exception {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
         expectTopicCreation(TOPIC);

Review Comment:
   Can we also add a corresponding `verify` step to make absolutely sure that 
we attempted topic creation during the test (if topic creation was enabled)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to