C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1143902556


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -315,33 +305,28 @@ public void testPause() throws Exception {
 
         taskFuture.get();
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verifyTaskGetTopic(10);

Review Comment:
   Should this use `count.get()` instead of `10`, since it's possible that the 
task may have been polled extra times between when line 293 
(`assertTrue(awaitLatch(pollLatch));`) completed and when the task finished 
pausing?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -832,166 +746,117 @@ private CountDownLatch expectPolls(int minimum, final 
AtomicInteger count) throw
         // Note that we stub these to allow any number of calls because the 
thread will continue to
         // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
         // calls
-        EasyMock.expect(sourceTask.poll())
-                .andStubAnswer(() -> {
-                    count.incrementAndGet();
-                    latch.countDown();
-                    Thread.sleep(10);
-                    return RECORDS;
-                });
+        doAnswer((Answer<List<SourceRecord>>) invocation -> {
+            count.incrementAndGet();
+            latch.countDown();
+            Thread.sleep(10);
+            return RECORDS;
+        }).when(sourceTask).poll();
+
         // Fallout of the poll() call
-        expectSendRecordAnyTimes();
+        expectSendRecord();
         return latch;
     }
 
     private CountDownLatch expectPolls(int count) throws InterruptedException {
         return expectPolls(count, new AtomicInteger());
     }
 
-    @SuppressWarnings("unchecked")
-    private void expectSendRecordSyncFailure(Throwable error) {
-        expectConvertHeadersAndKeyValue(false);
-        expectApplyTransformationChain(false);
-
-        EasyMock.expect(
-            producer.send(EasyMock.anyObject(ProducerRecord.class),
-                
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
-            .andThrow(error);
+    private void expectSendRecord() throws InterruptedException {
+        expectSendRecordTaskCommitRecordSucceed();
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(true);
+    //
+    private void expectSendRecordProducerCallbackFail() throws 
InterruptedException {
+        expectSendRecord(TOPIC, false, false, true, emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(false);
+    private void expectSendRecordTaskCommitRecordSucceed() throws 
InterruptedException {
+        expectSendRecord(TOPIC, true, true, true, emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordProducerCallbackFail() throws InterruptedException {
-        return expectSendRecord(TOPIC, false, false, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, true, true, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, true, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-        String topic,
-        boolean anyTimes,
-        boolean sendSuccess,
-        boolean commitSuccess,
-        boolean isMockedConverters,
-        Headers headers
+    private void expectSendRecord(
+            String topic,
+            boolean sendSuccess,
+            boolean commitSuccess,
+            boolean isMockedConverters,
+            Headers headers
     ) throws InterruptedException {
         if (isMockedConverters) {
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(topic, headers);
         }
 
-        expectApplyTransformationChain(anyTimes);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-            producer.send(EasyMock.capture(sent),
-                EasyMock.capture(producerCallbacks)));
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (org.apache.kafka.clients.producer.Callback cb : 
producerCallbacks.getValues()) {
-                    if (sendSuccess) {
-                        cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0,
-                            0L, 0, 0), null);
-                    } else {
-                        cb.onCompletion(null, new 
TopicAuthorizationException("foo"));
-                    }
-                }
-                producerCallbacks.reset();
-            }
-            return sendFuture;
-        };
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+        expectApplyTransformationChain();
 
         if (sendSuccess) {
             // 2. As a result of a successful producer send callback, we'll 
notify the source task of the record commit
-            expectTaskCommitRecordWithOffset(anyTimes, commitSuccess);
-            expectTaskGetTopic(anyTimes);
+            expectTaskCommitRecordWithOffset(commitSuccess);
+            expectTaskGetTopic();
         }
 
-        return sent;
+        doAnswer(producerSendAnswer(sendSuccess))
+                .when(producer).send(any(ProducerRecord.class), 
any(org.apache.kafka.clients.producer.Callback.class));
     }
 
-    private void expectConvertHeadersAndKeyValue(boolean anyTimes) {
-        expectConvertHeadersAndKeyValue(TOPIC, anyTimes, emptyHeaders());
+    private Answer<Future<RecordMetadata>> producerSendAnswer(boolean 
sendSuccess) {
+        return invocation -> {
+            org.apache.kafka.clients.producer.Callback cb = 
invocation.getArgument(1);
+            if (sendSuccess) {
+                cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 
0), 0, 0, 0L, 0, 0),
+                        null);
+            } else {
+                cb.onCompletion(null, new TopicAuthorizationException("foo"));
+            }
+
+            return null;
+        };
     }
 
-    private void expectConvertHeadersAndKeyValue(String topic, boolean 
anyTimes, Headers headers) {
-        for (Header header : headers) {
-            IExpectationSetters<byte[]> convertHeaderExpect = 
EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), 
Schema.STRING_SCHEMA, new String(header.value())));
-            if (anyTimes)
-                convertHeaderExpect.andStubReturn(header.value());
-            else
-                convertHeaderExpect.andReturn(header.value());
+    private void expectConvertHeadersAndKeyValue(String topic, Headers 
headers) {
+        if (headers.iterator().hasNext()) {
+            when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+                    anyString()))
+                    .thenAnswer((Answer<byte[]>) invocation -> {
+                        String headerValue = invocation.getArgument(3, 
String.class);
+                        return headerValue.getBytes(StandardCharsets.UTF_8);
+                    });
         }
-        IExpectationSetters<byte[]> convertKeyExpect = 
EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
-        if (anyTimes)
-            convertKeyExpect.andStubReturn(SERIALIZED_KEY);
-        else
-            convertKeyExpect.andReturn(SERIALIZED_KEY);
-        IExpectationSetters<byte[]> convertValueExpect = 
EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, 
RECORD));
-        if (anyTimes)
-            convertValueExpect.andStubReturn(SERIALIZED_RECORD);
-        else
-            convertValueExpect.andReturn(SERIALIZED_RECORD);
+
+        when(keyConverter.fromConnectData(eq(topic), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+                .thenReturn(SERIALIZED_KEY);
+        when(valueConverter.fromConnectData(eq(topic), any(Headers.class), 
eq(RECORD_SCHEMA),
+                eq(RECORD)))
+                .thenReturn(SERIALIZED_RECORD);
     }
 
-    private void expectApplyTransformationChain(boolean anyTimes) {
-        final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
-        IExpectationSetters<SourceRecord> convertKeyExpect = 
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
-        if (anyTimes)
-            convertKeyExpect.andStubAnswer(recordCapture::getValue);
-        else
-            convertKeyExpect.andAnswer(recordCapture::getValue);
+    private void expectApplyTransformationChain() {
+        when(transformationChain.apply(any(SourceRecord.class)))
+                .thenAnswer(AdditionalAnswers.returnsFirstArg());
     }
 
-    private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean 
succeed) throws InterruptedException {
-        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.anyObject(RecordMetadata.class));
-        IExpectationSetters<Void> expect = EasyMock.expectLastCall();
+    //
+    private void expectTaskCommitRecordWithOffset(boolean succeed) throws 
InterruptedException {
         if (!succeed) {
-            expect = expect.andThrow(new RuntimeException("Error committing 
record in source task"));
-        }
-        if (anyTimes) {
-            expect.anyTimes();
+            doThrow(new RuntimeException("Error committing record in source 
task"))
+                    .when(sourceTask).commitRecord(any(SourceRecord.class), 
any(RecordMetadata.class));
         }
     }
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
+        });
+    }
+
+    private void verifyTaskGetTopic(int times) {
+        ArgumentCaptor<String> connectorCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = 
ArgumentCaptor.forClass(String.class);
+        verify(statusBackingStore, 
times(times)).getTopic(connectorCapture.capture(), topicCapture.capture());
+
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
     }
 
     private boolean awaitLatch(CountDownLatch latch) {

Review Comment:
   Out of scope, so implement (or don't) at your discretion, but this method 
(and invocations of it) can be replaced with 
[ConcurrencyUtils::awaitLatch](https://github.com/apache/kafka/blob/1d8f79964e9578f3769fd13c2535988bfc803696/connect/runtime/src/test/java/org/apache/kafka/connect/test/util/ConcurrencyUtils.java#L42-L50).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -315,33 +305,28 @@ public void testPause() throws Exception {
 
         taskFuture.get();
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verifyTaskGetTopic(10);
+        verifyOffsetFlush(true);
+        verifyTopicCreation(TOPIC);
+        verify(statusListener).onPause(taskId);
+        verify(statusListener).onShutdown(taskId);
+        verify(sourceTask).stop();
+        verify(offsetWriter).offset(PARTITION, OFFSET);
+        verifyClose();
     }
 
     @Test
     public void testPollsInBackground() throws Exception {
         createWorkerTask();
 
-        expectCleanStartup();
-
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the 
offset writer
 
         expectTopicCreation(TOPIC);
+        expectOffsetFlush(true, true);

Review Comment:
   Nit: this can be simplified, right?
   ```suggestion
           expectOffsetFlush(true);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -315,33 +305,28 @@ public void testPause() throws Exception {
 
         taskFuture.get();
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verifyTaskGetTopic(10);
+        verifyOffsetFlush(true);
+        verifyTopicCreation(TOPIC);
+        verify(statusListener).onPause(taskId);
+        verify(statusListener).onShutdown(taskId);
+        verify(sourceTask).stop();
+        verify(offsetWriter).offset(PARTITION, OFFSET);
+        verifyClose();
     }
 
     @Test
     public void testPollsInBackground() throws Exception {
         createWorkerTask();
 
-        expectCleanStartup();
-
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the 
offset writer

Review Comment:
   Is this still accurate? Looks like we do end up performing an offset flush 
in this test.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -391,37 +369,26 @@ public void testFailureInPoll() throws Exception {
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verifyOffsetFlush(true);
+        verify(statusListener).onFailure(taskId, exception);
+        verify(sourceTask).stop();

Review Comment:
   Looks like we might be missing coverage for this call (originally verified 
in `expectEmptyOffsetFlush`):
   ```suggestion
           verify(sourceTask).commit();
           verify(sourceTask).stop();
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -1003,31 +868,35 @@ private boolean awaitLatch(CountDownLatch latch) {
         return false;
     }
 
-    @SuppressWarnings("unchecked")
     private void expectOffsetFlush(boolean succeed) throws Exception {
-        EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), 
EasyMock.anyObject())).andReturn(true);
-        Future<Void> flushFuture = PowerMock.createMock(Future.class);
-        
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
+        expectOffsetFlush(succeed, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void expectOffsetFlush(boolean succeed, boolean offsetFlush, 
Boolean... remainingVals) throws Exception {
+        when(offsetWriter.beginFlush(anyLong(), 
any(TimeUnit.class))).thenReturn(offsetFlush, remainingVals);
+
+        if (offsetFlush) {

Review Comment:
   Isn't this technically incorrect? Shouldn't we set up expectations for 
`offsetWriter::doFlush` if any of `remainingVals` is `true` as well?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -474,129 +442,107 @@ public void testFailureInPollAfterStop() throws 
Exception {
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verify(statusListener).onShutdown(taskId);
+        verify(sourceTask).stop();
+        verifyOffsetFlush(true);
+        verifyClose();
     }
 
     @Test
     public void testPollReturnsNoRecords() throws Exception {
         // Test that the task handles an empty list of records
         createWorkerTask();
 
-        expectCleanStartup();
-
         // We'll wait for some data, then trigger a flush
-        final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
-        expectEmptyOffsetFlush();
-
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectEmptyOffsetFlush();
-
-        statusListener.onShutdown(taskId);
-        EasyMock.expectLastCall();
-
-        expectClose();
-
-        PowerMock.replayAll();
+        final CountDownLatch pollLatch = expectEmptyPolls(new AtomicInteger());
+        expectOffsetFlush(true, false);
 
         workerTask.initialize(TASK_CONFIG);
         Future<?> taskFuture = executor.submit(workerTask);
 
         assertTrue(awaitLatch(pollLatch));
         assertTrue(workerTask.commitOffsets());
+        verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class));
+        reset(offsetWriter);
+
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
+        verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class));
+        verifyNoMoreInteractions(offsetWriter);
 
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verify(sourceTask).stop();
+        verify(statusListener).onShutdown(taskId);
+        verifyClose();
     }
 
     @Test
     public void testCommit() throws Exception {
         // Test that the task commits properly when prompted
         createWorkerTask();
 
-        expectCleanStartup();
-
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
-        expectOffsetFlush(true);
-
-        offsetWriter.offset(PARTITION, OFFSET);

Review Comment:
   Are we losing coverage for this invocation in this test?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -433,35 +400,36 @@ public void testFailureInPollAfterCancel() throws 
Exception {
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verify(producer).close(Duration.ZERO);
+        verify(producer).close(Duration.ofSeconds(30));
+        verify(sourceTask).stop();
+        verify(admin).close(any(Duration.class));
+        verify(transformationChain).close();
+        verify(offsetReader, times(2)).close();

Review Comment:
   Since `OffsetStorageReaderImpl::close` is idempotent, it doesn't hurt that 
we invoke it twice, but it's definitely not necessary. We just have to make 
sure it's invoked at least once:
   ```suggestion
           verify(offsetReader, atLeast(1)).close();
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -474,129 +442,107 @@ public void testFailureInPollAfterStop() throws 
Exception {
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verify(statusListener).onShutdown(taskId);
+        verify(sourceTask).stop();
+        verifyOffsetFlush(true);
+        verifyClose();
     }
 
     @Test
     public void testPollReturnsNoRecords() throws Exception {
         // Test that the task handles an empty list of records
         createWorkerTask();
 
-        expectCleanStartup();
-
         // We'll wait for some data, then trigger a flush
-        final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
-        expectEmptyOffsetFlush();
-
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectEmptyOffsetFlush();
-
-        statusListener.onShutdown(taskId);
-        EasyMock.expectLastCall();
-
-        expectClose();
-
-        PowerMock.replayAll();
+        final CountDownLatch pollLatch = expectEmptyPolls(new AtomicInteger());
+        expectOffsetFlush(true, false);
 
         workerTask.initialize(TASK_CONFIG);
         Future<?> taskFuture = executor.submit(workerTask);
 
         assertTrue(awaitLatch(pollLatch));
         assertTrue(workerTask.commitOffsets());
+        verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class));
+        reset(offsetWriter);

Review Comment:
   I try to avoid `reset` when I can. Can we remove this line and change the 
next assertion for `offsetWriter::beginFlush` to use `times(2)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to