This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4150595b0a2 KAFKA-14684: Replace EasyMock/PowerMock with Mockito in 
WorkerSinkTaskThreadedTest (#14505)
4150595b0a2 is described below

commit 4150595b0a2e0f45f2827cebc60bcb6f6558745d
Author: Hector Geraldino <[email protected]>
AuthorDate: Mon Oct 16 05:24:52 2023 -0400

    KAFKA-14684: Replace EasyMock/PowerMock with Mockito in 
WorkerSinkTaskThreadedTest (#14505)
    
    
    Reviewers: Mickael Maison <[email protected]>, Christo Lolov 
<[email protected]>
---
 build.gradle                                       |   2 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        | 697 ++++++++++-----------
 2 files changed, 346 insertions(+), 353 deletions(-)

diff --git a/build.gradle b/build.gradle
index cc2f6d0327b..d8c9857cf46 100644
--- a/build.gradle
+++ b/build.gradle
@@ -420,7 +420,7 @@ subprojects {
       // connect tests
       "**/KafkaConfigBackingStoreTest.*",
       "**/StandaloneHerderTest.*",
-      "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*"
+      "**/WorkerSinkTaskTest.*"
     ])
   }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index db90897a23c..4b6af05b074 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -42,20 +42,16 @@ import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
-import org.easymock.Capture;
-import org.easymock.CaptureType;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
+import org.mockito.AdditionalAnswers;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -68,13 +64,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(WorkerSinkTask.class)
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("unchecked")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class WorkerSinkTaskThreadedTest {
 
     // These are fixed to keep this code simpler. In this example we assume 
byte[] raw values
@@ -106,30 +115,50 @@ public class WorkerSinkTaskThreadedTest {
         TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
         TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSinkTask.class.getName());
     }
+
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
-    private TargetState initialState = TargetState.STARTED;
+    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private final TargetState initialState = TargetState.STARTED;
     private Time time;
     private ConnectMetrics metrics;
-    @Mock private SinkTask sinkTask;
-    private Capture<WorkerSinkTaskContext> sinkTaskContext = 
EasyMock.newCapture();
-    private WorkerConfig workerConfig;
+    @Mock
+    private SinkTask sinkTask;
+    private final ArgumentCaptor<WorkerSinkTaskContext> sinkTaskContext = 
ArgumentCaptor.forClass(WorkerSinkTaskContext.class);
     @Mock
     private PluginClassLoader pluginLoader;
-    @Mock private Converter keyConverter;
-    @Mock private Converter valueConverter;
-    @Mock private HeaderConverter headerConverter;
-    @Mock private TransformationChain<SinkRecord> transformationChain;
+    @Mock
+    private Converter keyConverter;
+    @Mock
+    private Converter valueConverter;
+    @Mock
+    private HeaderConverter headerConverter;
+    @Mock
+    private TransformationChain<SinkRecord> transformationChain;
     private WorkerSinkTask workerTask;
-    @Mock private KafkaConsumer<byte[], byte[]> consumer;
-    private Capture<ConsumerRebalanceListener> rebalanceListener = 
EasyMock.newCapture();
-    @Mock private TaskStatus.Listener statusListener;
-    @Mock private StatusBackingStore statusBackingStore;
-    @Mock private ErrorHandlingMetrics errorHandlingMetrics;
+    @Mock
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private final ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+    @Mock
+    private TaskStatus.Listener statusListener;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+    @Mock
+    private ErrorHandlingMetrics errorHandlingMetrics;
 
     private long recordsReturned;
 
+    private final Function<Long, Map<TopicPartition, OffsetAndMetadata>> 
offsetsToCommitFn = expectedMessages -> {
+        final long finalOffset = FIRST_OFFSET + expectedMessages;
+
+        // All assigned partitions will have offsets committed, but we've only 
processed messages/updated offsets for one
+        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
+        offsetsToCommit.put(TOPIC_PARTITION, new 
OffsetAndMetadata(finalOffset));
+        offsetsToCommit.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
+        offsetsToCommit.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
+
+        return offsetsToCommit;
+    };
 
     @Before
     public void setup() {
@@ -139,15 +168,12 @@ public class WorkerSinkTaskThreadedTest {
         workerProps.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        workerConfig = new StandaloneConfig(workerProps);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
         workerTask = new WorkerSinkTask(
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverter,
-                valueConverter, errorHandlingMetrics, headerConverter,
-                new TransformationChain<>(Collections.emptyList(), 
RetryWithToleranceOperatorTest.NOOP_OPERATOR),
+                valueConverter, errorHandlingMetrics, headerConverter, 
transformationChain,
                 consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore,
                 Collections::emptyList);
-
         recordsReturned = 0;
     }
 
@@ -157,68 +183,72 @@ public class WorkerSinkTaskThreadedTest {
     }
 
     @Test
-    public void testPollsInBackground() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-
-        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
-        expectStopTask();
-
-        PowerMock.replayAll();
+    public void testPollsInBackground() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(1L);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
 
         // First iteration initializes partition assignment
         workerTask.iteration();
+        verifyInitialAssignment();
 
         // Then we iterate to fetch data
         for (int i = 0; i < 10; i++) {
             workerTask.iteration();
         }
+        verifyTaskGetTopic(10);
+
         workerTask.stop();
         workerTask.close();
+        verifyStopTask();
+
+        ArgumentCaptor<Collection<SinkRecord>> capturedRecords = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(11)).put(capturedRecords.capture());
 
         // Verify contents match expected values, i.e. that they were 
translated properly. With max
-        // batch size 1 and poll returns 1 message at a time, we should have a 
matching # of batches
-        assertEquals(10, capturedRecords.getValues().size());
+        // batch size 1 and poll returns 1 message at a time, we should have a 
matching # of batches + initial assignment
+        assertEquals(11, capturedRecords.getAllValues().size());
+        // First poll() returned no records as it just triggered a rebalance
+        assertTrue(capturedRecords.getAllValues().get(0).isEmpty());
+
         int offset = 0;
-        for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
+        List<Collection<SinkRecord>> filteredRecords =
+                capturedRecords.getAllValues().subList(1, 
capturedRecords.getAllValues().size() - 1);
+
+        for (Collection<SinkRecord> recs : filteredRecords) {
             assertEquals(1, recs.size());
             for (SinkRecord rec : recs) {
                 SinkRecord referenceSinkRecord
                         = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, 
VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset, TIMESTAMP, TIMESTAMP_TYPE);
                 InternalSinkRecord referenceInternalSinkRecord =
-                    new InternalSinkRecord(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + offset, null, null), referenceSinkRecord);
+                        new InternalSinkRecord(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + offset, null, null), referenceSinkRecord);
                 assertEquals(referenceInternalSinkRecord, rec);
                 offset++;
             }
         }
-
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void testCommit() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
-
-        // Make each poll() take the offset commit interval
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetCommit(1L, null, null, 0, true);
-        expectStopTask();
-
-        PowerMock.replayAll();
+    public void testCommit() {
+        long expectedMessages = 1L;
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetCommit(new ExpectOffsetCommitCommand(
+                expectedMessages, null, null, 0, true));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
 
         // Initialize partition assignment
         workerTask.iteration();
+        verifyInitialAssignment();
+
         // Fetch one record
         workerTask.iteration();
         // Trigger the commit
@@ -229,77 +259,70 @@ public class WorkerSinkTaskThreadedTest {
         workerTask.stop();
         workerTask.close();
 
-        assertEquals(2, capturedRecords.getValues().size());
+        verifyTaskGetTopic(2);
+        verifyStopTask();
+        ArgumentCaptor<Collection<SinkRecord>> capturedRecords = 
ArgumentCaptor.forClass(Collection.class);
+        verify(sinkTask, times(3)).put(capturedRecords.capture());
 
-        PowerMock.verifyAll();
+        assertEquals(3, capturedRecords.getAllValues().size());
+        verifyOffsetCommit(expectedMessages);
     }
 
     @Test
-    public void testCommitFailure() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT);
-
-        Capture<Collection<SinkRecord>> capturedRecords = 
expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetCommit(1L, new RuntimeException(), null, 0, true);
-        // Should rewind to last known good positions, which in this case will 
be the offsets loaded during initialization
-        // for all topic partitions
-        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        expectStopTask();
-
-        PowerMock.replayAll();
+    public void testCommitFailure() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetCommit(new ExpectOffsetCommitCommand(
+                1L, new RuntimeException(), null, 0, true));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
 
         // Initialize partition assignment
         workerTask.iteration();
+        verifyInitialAssignment();
+
         // Fetch some data
         workerTask.iteration();
         // Trigger the commit
         workerTask.iteration();
 
         assertEquals(1, workerTask.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"committing"));
+        assertFalse(workerTask.isCommitting());
+
         workerTask.stop();
         workerTask.close();
+        verifyStopTask();
+        verifyTaskGetTopic(2);
 
-        PowerMock.verifyAll();
+        // Should rewind to last known good positions, which in this case will 
be the offsets loaded during initialization
+        // for all topic partitions
+        verify(consumer).seek(TOPIC_PARTITION, FIRST_OFFSET);
+        verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET);
+        verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET);
     }
 
     @Test
-    public void testCommitSuccessFollowedByFailure() throws Exception {
+    public void testCommitSuccessFollowedByFailure() {
         // Validate that we rewind to the correct offsets if a task's 
preCommit() method throws an exception
-
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(3);
-        Capture<Collection<SinkRecord>> capturedRecords = 
expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetCommit(1L, null, null, 0, true);
-        expectOffsetCommit(2L, new RuntimeException(), null, 0, true);
-        // Should rewind to last known committed positions
-        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        expectStopTask();
-
-        PowerMock.replayAll();
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetCommit(
+                new ExpectOffsetCommitCommand(1L, null, null, 0, true),
+                new ExpectOffsetCommitCommand(2L, new RuntimeException(), 
null, 0, true)
+        );
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
 
         // Initialize partition assignment
         workerTask.iteration();
+        verifyInitialAssignment();
+
         // Fetch some data
         workerTask.iteration();
         // Trigger first commit,
@@ -307,33 +330,35 @@ public class WorkerSinkTaskThreadedTest {
         // Trigger second (failing) commit
         workerTask.iteration();
 
+        // Should rewind to last known committed positions
+        verify(consumer).seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
+        verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET);
+        verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET);
+
         assertEquals(1, workerTask.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"committing"));
+        assertFalse(workerTask.isCommitting());
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(3);
     }
 
     @Test
-    public void testCommitConsumerFailure() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
-
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetCommit(1L, null, new Exception(), 0, true);
-        expectStopTask();
-
-        PowerMock.replayAll();
+    public void testCommitConsumerFailure() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetCommit(new ExpectOffsetCommitCommand(
+                1L, null, new Exception(), 0, true));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
 
         // Initialize partition assignment
         workerTask.iteration();
+        verifyInitialAssignment();
+
         // Fetch some data
         workerTask.iteration();
         // Trigger commit
@@ -341,33 +366,31 @@ public class WorkerSinkTaskThreadedTest {
 
         // TODO Response to consistent failures?
         assertEquals(1, workerTask.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"committing"));
+        assertFalse(workerTask.isCommitting());
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(2);
     }
 
     @Test
-    public void testCommitTimeout() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
-
+    public void testCommitTimeout() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
         // Cut down amount of time to pass in each poll so we trigger exactly 
1 offset commit
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 
2);
-        expectOffsetCommit(2L, null, null, 
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
-        expectStopTask();
-
-        PowerMock.replayAll();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
+        expectOffsetCommit(new ExpectOffsetCommitCommand(
+                2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, 
false));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
 
         // Initialize partition assignment
         workerTask.iteration();
+        verifyInitialAssignment();
+
         // Fetch some data
         workerTask.iteration();
         workerTask.iteration();
@@ -378,29 +401,27 @@ public class WorkerSinkTaskThreadedTest {
 
         // TODO Response to consistent failures?
         assertEquals(1, workerTask.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"committing"));
+        assertFalse(workerTask.isCommitting());
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(4);
     }
 
     @Test
-    public void testAssignmentPauseResume() throws Exception {
+    public void testAssignmentPauseResume() {
         // Just validate that the calls are passed through to the consumer, 
and that where appropriate errors are
         // converted
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-
-        expectPollInitialAssignment();
-        expectOnePoll().andAnswer(() -> {
-            assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2, TOPIC_PARTITION3)),
-                    sinkTaskContext.getValue().assignment());
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
+            assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2, TOPIC_PARTITION3)), sinkTaskContext.getValue().assignment());
             return null;
-        });
-        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             try {
                 sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
                 fail("Trying to pause unassigned partition should have thrown 
an Connect exception");
@@ -409,260 +430,237 @@ public class WorkerSinkTaskThreadedTest {
             }
             sinkTaskContext.getValue().pause(TOPIC_PARTITION, 
TOPIC_PARTITION2);
             return null;
-        });
-        consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-        PowerMock.expectLastCall();
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             try {
                 sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
                 fail("Trying to resume unassigned partition should have thrown 
an Connect exception");
             } catch (ConnectException e) {
                 // expected
             }
-
             sinkTaskContext.getValue().resume(TOPIC_PARTITION, 
TOPIC_PARTITION2);
             return null;
-        });
-        consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-        PowerMock.expectLastCall();
+        }).when(sinkTask).put(any(Collection.class));
 
-        expectStopTask();
+        doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION));
+        doAnswer(invocation -> 
null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
-        PowerMock.replayAll();
+        doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION));
+        doAnswer(invocation -> 
null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.stop();
         workerTask.close();
+        verifyStopTask();
+        verifyTaskGetTopic(3);
 
-        PowerMock.verifyAll();
+        verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
+        verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
     }
 
     @Test
-    public void testRewind() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
+    public void testRewind() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
 
         final long startOffset = 40L;
         final Map<TopicPartition, Long> offsets = new HashMap<>();
 
-        expectOnePoll().andAnswer(() -> {
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
             offsets.put(TOPIC_PARTITION, startOffset);
             sinkTaskContext.getValue().offset(offsets);
             return null;
-        });
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             Map<TopicPartition, Long> offsets1 = 
sinkTaskContext.getValue().offsets();
             assertEquals(0, offsets1.size());
             return null;
-        });
-
-        expectStopTask();
-        PowerMock.replayAll();
+        }).when(sinkTask).put(any(Collection.class));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
         workerTask.iteration();
+        verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(2);
     }
 
     @Test
-    public void testRewindOnRebalanceDuringPoll() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
+    public void testRewindOnRebalanceDuringPoll() {
+        final long startOffset = 40L;
+
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectRebalanceDuringPoll(startOffset);
 
-        expectRebalanceDuringPoll().andAnswer(() -> {
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
             Map<TopicPartition, Long> offsets = 
sinkTaskContext.getValue().offsets();
             assertEquals(0, offsets.size());
             return null;
-        });
 
-        expectStopTask();
-        PowerMock.replayAll();
+        }).when(sinkTask).put(any(Collection.class));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
+        verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(1);
     }
 
-    private void expectInitializeTask() {
-
-        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
+    private void verifyInitializeTask() {
+        verify(consumer).subscribe(eq(singletonList(TOPIC)), 
rebalanceListener.capture());
+        verify(sinkTask).initialize(sinkTaskContext.capture());
+        verify(sinkTask).start(TASK_PROPS);
     }
 
-    private void expectPollInitialAssignment() {
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
 
-        sinkTask.open(INITIAL_ASSIGNMENT);
-        EasyMock.expectLastCall();
-
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(()
 -> {
-            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
-            return ConsumerRecords.empty();
-        });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
-
-        sinkTask.put(Collections.emptyList());
-        EasyMock.expectLastCall();
+    private void expectInitialAssignment() {
+        when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
     }
 
-    private IExpectationSetters<Set<TopicPartition>> 
expectConsumerAssignment(Set<TopicPartition> assignment) {
-        return EasyMock.expect(consumer.assignment()).andReturn(assignment);
+    private void verifyInitialAssignment() {
+        verify(sinkTask).open(INITIAL_ASSIGNMENT);
+        verify(sinkTask).put(Collections.emptyList());
     }
 
-    private void expectStopTask() {
-        sinkTask.stop();
-        PowerMock.expectLastCall();
+    private void verifyStopTask() {
+        verify(sinkTask).stop();
 
         // No offset commit since it happens in the mocked worker thread, but 
the main thread does need to wake up the
         // consumer so it exits quickly
-        consumer.wakeup();
-        PowerMock.expectLastCall();
+        verify(consumer).wakeup();
 
-        consumer.close();
-        PowerMock.expectLastCall();
+        verify(consumer).close();
 
         try {
-            headerConverter.close();
+            verify(headerConverter).close();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        PowerMock.expectLastCall();
     }
 
     // Note that this can only be called once per test currently
-    private Capture<Collection<SinkRecord>> expectPolls(final long 
pollDelayMs) {
+    private void expectPolls(final long pollDelayMs) {
         // Stub out all the consumer stream/iterator responses, which we just 
want to verify occur,
         // but don't care about the exact details here.
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(pollDelayMs);
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty()))));
-                recordsReturned++;
-                return records;
-            });
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
-
-        final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
-        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(
-            recordCapture::getValue).anyTimes();
-
-        Capture<Collection<SinkRecord>> capturedRecords = 
EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(capturedRecords));
-        EasyMock.expectLastCall().anyTimes();
-        return capturedRecords;
-    }
-
-    @SuppressWarnings("unchecked")
-    private IExpectationSetters<Object> expectOnePoll() {
-        // Currently the SinkTask's put() method will not be invoked unless we 
provide some data, so instead of
-        // returning empty data, we return one record. The expectation is that 
the data will be ignored by the
-        // response behavior specified using the return value of this method.
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(1L);
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty()))));
-                recordsReturned++;
-                return records;
-            });
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
+        when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> {
+            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+            return ConsumerRecords.empty();
+        }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+            // "Sleep" so time will progress
+            time.sleep(pollDelayMs);
+
+            TopicPartition topicPartition = new TopicPartition(TOPIC, 
PARTITION);
+            ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, 
TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty());
+            ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, 
singletonList(consumerRecord)));
+            recordsReturned++;
+            return records;
+        });
+        when(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        when(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+        
when(transformationChain.apply(any(SinkRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg());
     }
 
-    @SuppressWarnings("unchecked")
-    private IExpectationSetters<Object> expectRebalanceDuringPoll() {
+    @SuppressWarnings("SameParameterValue")
+    private void expectRebalanceDuringPoll(long startOffset) {
         final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2, TOPIC_PARTITION3);
 
-        final long startOffset = 40L;
         final Map<TopicPartition, Long> offsets = new HashMap<>();
         offsets.put(TOPIC_PARTITION, startOffset);
 
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(1L);
-
-                sinkTaskContext.getValue().offset(offsets);
-                rebalanceListener.getValue().onPartitionsAssigned(partitions);
-
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty())
-                                )));
-                recordsReturned++;
-                return records;
-            });
-
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
-
-        sinkTask.open(partitions);
-        EasyMock.expectLastCall();
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
+        when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> {
+            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+            return ConsumerRecords.empty();
+        }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+            // "Sleep" so time will progress
+            time.sleep(1L);
+
+            sinkTaskContext.getValue().offset(offsets);
+            rebalanceListener.getValue().onPartitionsAssigned(partitions);
+
+            TopicPartition topicPartition = new TopicPartition(TOPIC, 
PARTITION);
+            ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(
+                    TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 
TIMESTAMP, TIMESTAMP_TYPE,
+                    0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), 
Optional.empty());
+            ConsumerRecords<byte[], byte[]> records =
+                    new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, 
singletonList(consumerRecord)));
+            recordsReturned++;
+            return records;
+        });
+
+        when(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        when(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
     }
 
-    private Capture<OffsetCommitCallback> expectOffsetCommit(final long 
expectedMessages,
-                                                             final 
RuntimeException error,
-                                                             final Exception 
consumerCommitError,
-                                                             final long 
consumerCommitDelayMs,
-                                                             final boolean 
invokeCallback) {
+    private void expectOffsetCommit(ExpectOffsetCommitCommand... commands) {
+        doAnswer(new Answer<Object>() {
+            int index = 0;
+
+            @Override
+            public Object answer(InvocationOnMock invocation) {
+                ExpectOffsetCommitCommand commitCommand = commands[index++];
+                // All assigned partitions will have offsets committed, but 
we've only processed messages/updated offsets for one
+                final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
offsetsToCommitFn.apply(commitCommand.expectedMessages);
+
+                if (commitCommand.error != null) {
+                    throw commitCommand.error;
+                } else {
+                    return offsetsToCommit;
+                }
+            }
+        }).when(sinkTask).preCommit(anyMap());
+
+        doAnswer(new Answer<Object>() {
+            int index = 0;
+
+            @Override
+            public Object answer(InvocationOnMock invocation) {
+                ExpectOffsetCommitCommand commitCommand = commands[index++];
+
+                time.sleep(commitCommand.consumerCommitDelayMs);
+                if (commitCommand.invokeCallback) {
+                    OffsetCommitCallback callback = invocation.getArgument(1);
+                    final Map<TopicPartition, OffsetAndMetadata> 
offsetsToCommit = offsetsToCommitFn.apply(commitCommand.expectedMessages);
+
+                    callback.onComplete(offsetsToCommit, 
commitCommand.consumerCommitError);
+                }
+                return null;
+            }
+        }).when(consumer).commitAsync(anyMap(), 
any(OffsetCommitCallback.class));
+    }
+
+    private void verifyOffsetCommit(final long expectedMessages) {
         final long finalOffset = FIRST_OFFSET + expectedMessages;
 
         // All assigned partitions will have offsets committed, but we've only 
processed messages/updated offsets for one
@@ -670,48 +668,26 @@ public class WorkerSinkTaskThreadedTest {
         offsetsToCommit.put(TOPIC_PARTITION, new 
OffsetAndMetadata(finalOffset));
         offsetsToCommit.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
         offsetsToCommit.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
-        sinkTask.preCommit(offsetsToCommit);
-        IExpectationSetters<Object> expectation = PowerMock.expectLastCall();
-        if (error != null) {
-            expectation.andThrow(error).once();
-            return null;
-        } else {
-            expectation.andReturn(offsetsToCommit);
-        }
 
-        final Capture<OffsetCommitCallback> capturedCallback = 
EasyMock.newCapture();
-        consumer.commitAsync(EasyMock.eq(offsetsToCommit),
-                EasyMock.capture(capturedCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
-            time.sleep(consumerCommitDelayMs);
-            if (invokeCallback)
-                capturedCallback.getValue().onComplete(offsetsToCommit, 
consumerCommitError);
-            return null;
+        verify(sinkTask).preCommit(offsetsToCommit);
+        verify(consumer).commitAsync(eq(offsetsToCommit), 
any(OffsetCommitCallback.class));
+    }
+
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
         });
-        return capturedCallback;
     }
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+    private void verifyTaskGetTopic(int times) {
+        ArgumentCaptor<String> connectorCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = 
ArgumentCaptor.forClass(String.class);
+        verify(statusBackingStore, 
times(times)).getTopic(connectorCapture.capture(), topicCapture.capture());
+
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
     }
 
     private RecordHeaders emptyHeaders() {
@@ -720,4 +696,21 @@ public class WorkerSinkTaskThreadedTest {
 
     private static abstract class TestSinkTask extends SinkTask {
     }
+
+    @SuppressWarnings("NewClassNamingConvention")
+    private static class ExpectOffsetCommitCommand {
+        final long expectedMessages;
+        final RuntimeException error;
+        final Exception consumerCommitError;
+        final long consumerCommitDelayMs;
+        final boolean invokeCallback;
+
+        private ExpectOffsetCommitCommand(long expectedMessages, 
RuntimeException error, Exception consumerCommitError, long 
consumerCommitDelayMs, boolean invokeCallback) {
+            this.expectedMessages = expectedMessages;
+            this.error = error;
+            this.consumerCommitError = consumerCommitError;
+            this.consumerCommitDelayMs = consumerCommitDelayMs;
+            this.invokeCallback = invokeCallback;
+        }
+    }
 }


Reply via email to