divijvaidya commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r996480944
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -164,8 +166,8 @@ public class ErrorHandlingTaskTest {
OffsetStorageWriter offsetWriter;
@Mock
private ConnectorOffsetBackingStore offsetStore;
-
- private Capture<ConsumerRebalanceListener> rebalanceListener =
EasyMock.newCapture();
+ @Captor
+ private ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener;
Review Comment:
I think this is also not being used in the test any where.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -228,81 +228,61 @@ public void tearDown() {
@Test
public void testSinkTasksCloseErrorReporters() throws Exception {
- ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+ ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
- expectInitializeTask();
- reporter.close();
- EasyMock.expectLastCall();
- sinkTask.stop();
- EasyMock.expectLastCall();
-
- consumer.close();
- EasyMock.expectLastCall();
-
- headerConverter.close();
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
-
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.close();
- PowerMock.verifyAll();
+ // verify if invocation happened exactly 1 time
+ verifyInitializeTask();
+ verify(reporter).close();
+ verify(sinkTask).stop();
+ verify(consumer).close();
+ verify(headerConverter).close();
+ verify(sinkTask).start(TASK_PROPS);
Review Comment:
This is already verified in `verifyInitializeTask`
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -316,28 +296,30 @@ public void testErrorHandlingInSinkTasks() throws
Exception {
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
- expectInitializeTask();
- expectTaskGetTopic(true);
// valid json
- ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC,
PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
+ ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(
+ TOPIC, PARTITION1, FIRST_OFFSET,
+ null, "{\"a\": 10}".getBytes());
// bad json
- ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC,
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
-
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
+ ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(
+ TOPIC, PARTITION2, FIRST_OFFSET,
+ null, "{\"a\" 10}".getBytes());
- sinkTask.put(EasyMock.anyObject());
- EasyMock.expectLastCall().times(2);
-
- PowerMock.replayAll();
+ when(consumer.poll(any()))
+ .thenReturn(records(record1))
+ .thenReturn(records(record2));
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.iteration();
workerSinkTask.iteration();
+ verifyInitializeTask();
+ verify(sinkTask, times(2)).put(any());
+ verify(sinkTask).start(TASK_PROPS);
Review Comment:
This invocation is already verified by `verifyInitializeTask()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]