clolov commented on code in PR #14505:
URL: https://github.com/apache/kafka/pull/14505#discussion_r1352038137


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java:
##########
@@ -409,309 +430,266 @@ public void testAssignmentPauseResume() throws 
Exception {
             }
             sinkTaskContext.getValue().pause(TOPIC_PARTITION, 
TOPIC_PARTITION2);
             return null;
-        });
-        consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-        PowerMock.expectLastCall();
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             try {
                 sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
                 fail("Trying to resume unassigned partition should have thrown 
an Connect exception");
             } catch (ConnectException e) {
                 // expected
             }
-
             sinkTaskContext.getValue().resume(TOPIC_PARTITION, 
TOPIC_PARTITION2);
             return null;
-        });
-        consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-        PowerMock.expectLastCall();
+        }).when(sinkTask).put(any(Collection.class));
 
-        expectStopTask();
+        doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION));
+        doAnswer(invocation -> 
null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
-        PowerMock.replayAll();
+        doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION));
+        doAnswer(invocation -> 
null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.stop();
         workerTask.close();
+        verifyStopTask();
+        verifyTaskGetTopic(3);
 
-        PowerMock.verifyAll();
+        verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
+        verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
     }
 
     @Test
-    public void testRewind() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
+    public void testRewind() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
 
         final long startOffset = 40L;
         final Map<TopicPartition, Long> offsets = new HashMap<>();
 
-        expectOnePoll().andAnswer(() -> {
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
             offsets.put(TOPIC_PARTITION, startOffset);
             sinkTaskContext.getValue().offset(offsets);
             return null;
-        });
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             Map<TopicPartition, Long> offsets1 = 
sinkTaskContext.getValue().offsets();
             assertEquals(0, offsets1.size());
             return null;
-        });
-
-        expectStopTask();
-        PowerMock.replayAll();
+        }).when(sinkTask).put(any(Collection.class));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
         workerTask.iteration();
+        verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(2);
     }
 
     @Test
-    public void testRewindOnRebalanceDuringPoll() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
+    public void testRewindOnRebalanceDuringPoll() {
+        final long startOffset = 40L;
+
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectRebalanceDuringPoll(startOffset);
 
-        expectRebalanceDuringPoll().andAnswer(() -> {
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
             Map<TopicPartition, Long> offsets = 
sinkTaskContext.getValue().offsets();
             assertEquals(0, offsets.size());
             return null;
-        });
 
-        expectStopTask();
-        PowerMock.replayAll();
+        }).when(sinkTask).put(any(Collection.class));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
+        verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(1);
     }
 
-    private void expectInitializeTask() {
-
-        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
+    private void verifyInitializeTask() {
+        verify(consumer).subscribe(eq(singletonList(TOPIC)), 
rebalanceListener.capture());
+        verify(sinkTask).initialize(sinkTaskContext.capture());
+        verify(sinkTask).start(TASK_PROPS);
     }
 
-    private void expectPollInitialAssignment() {
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
 
-        sinkTask.open(INITIAL_ASSIGNMENT);
-        EasyMock.expectLastCall();
-
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(()
 -> {
-            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
-            return ConsumerRecords.empty();
-        });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
-
-        sinkTask.put(Collections.emptyList());
-        EasyMock.expectLastCall();
+    private void expectInitialAssignment() {
+        when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
     }
 
-    private IExpectationSetters<Set<TopicPartition>> 
expectConsumerAssignment(Set<TopicPartition> assignment) {
-        return EasyMock.expect(consumer.assignment()).andReturn(assignment);
+    private void verifyInitialAssignment() {
+        verify(sinkTask).open(INITIAL_ASSIGNMENT);
+        verify(sinkTask).put(Collections.emptyList());
     }
 
-    private void expectStopTask() {
-        sinkTask.stop();
-        PowerMock.expectLastCall();
+    private void verifyStopTask() {
+        verify(sinkTask).stop();
 
         // No offset commit since it happens in the mocked worker thread, but 
the main thread does need to wake up the
         // consumer so it exits quickly
-        consumer.wakeup();
-        PowerMock.expectLastCall();
+        verify(consumer).wakeup();
 
-        consumer.close();
-        PowerMock.expectLastCall();
+        verify(consumer).close();
 
         try {
-            headerConverter.close();
+            verify(headerConverter).close();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        PowerMock.expectLastCall();
     }
 
     // Note that this can only be called once per test currently
-    private Capture<Collection<SinkRecord>> expectPolls(final long 
pollDelayMs) {
+    private void expectPolls(final long pollDelayMs) {
         // Stub out all the consumer stream/iterator responses, which we just 
want to verify occur,
         // but don't care about the exact details here.
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(pollDelayMs);
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty()))));
-                recordsReturned++;
-                return records;
-            });
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
-
-        final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
-        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(
-            recordCapture::getValue).anyTimes();
-
-        Capture<Collection<SinkRecord>> capturedRecords = 
EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(capturedRecords));
-        EasyMock.expectLastCall().anyTimes();
-        return capturedRecords;
-    }
-
-    @SuppressWarnings("unchecked")
-    private IExpectationSetters<Object> expectOnePoll() {
-        // Currently the SinkTask's put() method will not be invoked unless we 
provide some data, so instead of
-        // returning empty data, we return one record. The expectation is that 
the data will be ignored by the
-        // response behavior specified using the return value of this method.
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(1L);
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty()))));
-                recordsReturned++;
-                return records;
-            });
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
+        when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> {
+            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+            return ConsumerRecords.empty();
+        }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+            // "Sleep" so time will progress
+            time.sleep(pollDelayMs);
+
+            TopicPartition topicPartition = new TopicPartition(TOPIC, 
PARTITION);
+            ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, 
TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty());
+            ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, 
singletonList(consumerRecord)));
+            recordsReturned++;
+            return records;
+        });
+        when(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        when(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+        
when(transformationChain.apply(any(SinkRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg());
     }
 
-    @SuppressWarnings("unchecked")
-    private IExpectationSetters<Object> expectRebalanceDuringPoll() {
+    //    @SuppressWarnings("unchecked")

Review Comment:
   If this is unneeded can you remove it?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java:
##########
@@ -409,309 +430,266 @@ public void testAssignmentPauseResume() throws 
Exception {
             }
             sinkTaskContext.getValue().pause(TOPIC_PARTITION, 
TOPIC_PARTITION2);
             return null;
-        });
-        consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-        PowerMock.expectLastCall();
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             try {
                 sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
                 fail("Trying to resume unassigned partition should have thrown 
an Connect exception");
             } catch (ConnectException e) {
                 // expected
             }
-
             sinkTaskContext.getValue().resume(TOPIC_PARTITION, 
TOPIC_PARTITION2);
             return null;
-        });
-        consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
-        PowerMock.expectLastCall();
+        }).when(sinkTask).put(any(Collection.class));
 
-        expectStopTask();
+        doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).pause(singletonList(UNASSIGNED_TOPIC_PARTITION));
+        doAnswer(invocation -> 
null).when(consumer).pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
-        PowerMock.replayAll();
+        doThrow(new IllegalStateException("unassigned topic 
partition")).when(consumer).resume(singletonList(UNASSIGNED_TOPIC_PARTITION));
+        doAnswer(invocation -> 
null).when(consumer).resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.stop();
         workerTask.close();
+        verifyStopTask();
+        verifyTaskGetTopic(3);
 
-        PowerMock.verifyAll();
+        verify(consumer, atLeastOnce()).pause(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
+        verify(consumer, atLeastOnce()).resume(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2));
     }
 
     @Test
-    public void testRewind() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
+    public void testRewind() {
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
 
         final long startOffset = 40L;
         final Map<TopicPartition, Long> offsets = new HashMap<>();
 
-        expectOnePoll().andAnswer(() -> {
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
             offsets.put(TOPIC_PARTITION, startOffset);
             sinkTaskContext.getValue().offset(offsets);
             return null;
-        });
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        expectOnePoll().andAnswer(() -> {
+        }).doAnswer(invocation -> {
             Map<TopicPartition, Long> offsets1 = 
sinkTaskContext.getValue().offsets();
             assertEquals(0, offsets1.size());
             return null;
-        });
-
-        expectStopTask();
-        PowerMock.replayAll();
+        }).when(sinkTask).put(any(Collection.class));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
         workerTask.iteration();
+        verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(2);
     }
 
     @Test
-    public void testRewindOnRebalanceDuringPoll() throws Exception {
-        expectInitializeTask();
-        expectTaskGetTopic(true);
-        expectPollInitialAssignment();
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
+    public void testRewindOnRebalanceDuringPoll() {
+        final long startOffset = 40L;
+
+        expectTaskGetTopic();
+        expectInitialAssignment();
+        expectRebalanceDuringPoll(startOffset);
 
-        expectRebalanceDuringPoll().andAnswer(() -> {
+        doAnswer(invocation -> {
+            return null; // initial assignment
+        }).doAnswer(invocation -> {
             Map<TopicPartition, Long> offsets = 
sinkTaskContext.getValue().offsets();
             assertEquals(0, offsets.size());
             return null;
-        });
 
-        expectStopTask();
-        PowerMock.replayAll();
+        }).when(sinkTask).put(any(Collection.class));
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
+        verifyInitializeTask();
+
         workerTask.iteration();
+        verifyInitialAssignment();
+
         workerTask.iteration();
+        verify(consumer).seek(TOPIC_PARTITION, startOffset);
+
         workerTask.stop();
         workerTask.close();
-
-        PowerMock.verifyAll();
+        verifyStopTask();
+        verifyTaskGetTopic(1);
     }
 
-    private void expectInitializeTask() {
-
-        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
+    private void verifyInitializeTask() {
+        verify(consumer).subscribe(eq(singletonList(TOPIC)), 
rebalanceListener.capture());
+        verify(sinkTask).initialize(sinkTaskContext.capture());
+        verify(sinkTask).start(TASK_PROPS);
     }
 
-    private void expectPollInitialAssignment() {
-        expectConsumerAssignment(INITIAL_ASSIGNMENT).times(2);
 
-        sinkTask.open(INITIAL_ASSIGNMENT);
-        EasyMock.expectLastCall();
-
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(()
 -> {
-            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
-            return ConsumerRecords.empty();
-        });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
-
-        sinkTask.put(Collections.emptyList());
-        EasyMock.expectLastCall();
+    private void expectInitialAssignment() {
+        when(consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
     }
 
-    private IExpectationSetters<Set<TopicPartition>> 
expectConsumerAssignment(Set<TopicPartition> assignment) {
-        return EasyMock.expect(consumer.assignment()).andReturn(assignment);
+    private void verifyInitialAssignment() {
+        verify(sinkTask).open(INITIAL_ASSIGNMENT);
+        verify(sinkTask).put(Collections.emptyList());
     }
 
-    private void expectStopTask() {
-        sinkTask.stop();
-        PowerMock.expectLastCall();
+    private void verifyStopTask() {
+        verify(sinkTask).stop();
 
         // No offset commit since it happens in the mocked worker thread, but 
the main thread does need to wake up the
         // consumer so it exits quickly
-        consumer.wakeup();
-        PowerMock.expectLastCall();
+        verify(consumer).wakeup();
 
-        consumer.close();
-        PowerMock.expectLastCall();
+        verify(consumer).close();
 
         try {
-            headerConverter.close();
+            verify(headerConverter).close();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        PowerMock.expectLastCall();
     }
 
     // Note that this can only be called once per test currently
-    private Capture<Collection<SinkRecord>> expectPolls(final long 
pollDelayMs) {
+    private void expectPolls(final long pollDelayMs) {
         // Stub out all the consumer stream/iterator responses, which we just 
want to verify occur,
         // but don't care about the exact details here.
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(pollDelayMs);
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty()))));
-                recordsReturned++;
-                return records;
-            });
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
-
-        final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
-        
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(
-            recordCapture::getValue).anyTimes();
-
-        Capture<Collection<SinkRecord>> capturedRecords = 
EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(capturedRecords));
-        EasyMock.expectLastCall().anyTimes();
-        return capturedRecords;
-    }
-
-    @SuppressWarnings("unchecked")
-    private IExpectationSetters<Object> expectOnePoll() {
-        // Currently the SinkTask's put() method will not be invoked unless we 
provide some data, so instead of
-        // returning empty data, we return one record. The expectation is that 
the data will be ignored by the
-        // response behavior specified using the return value of this method.
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(1L);
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty()))));
-                recordsReturned++;
-                return records;
-            });
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
+        when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> {
+            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+            return ConsumerRecords.empty();
+        }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+            // "Sleep" so time will progress
+            time.sleep(pollDelayMs);
+
+            TopicPartition topicPartition = new TopicPartition(TOPIC, 
PARTITION);
+            ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, 
TIMESTAMP_TYPE, 0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), Optional.empty());
+            ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, 
singletonList(consumerRecord)));
+            recordsReturned++;
+            return records;
+        });
+        when(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        when(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+        
when(transformationChain.apply(any(SinkRecord.class))).thenAnswer(AdditionalAnswers.returnsFirstArg());
     }
 
-    @SuppressWarnings("unchecked")
-    private IExpectationSetters<Object> expectRebalanceDuringPoll() {
+    //    @SuppressWarnings("unchecked")
+    @SuppressWarnings("SameParameterValue")
+    private void expectRebalanceDuringPoll(long startOffset) {
         final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2, TOPIC_PARTITION3);
 
-        final long startOffset = 40L;
         final Map<TopicPartition, Long> offsets = new HashMap<>();
         offsets.put(TOPIC_PARTITION, startOffset);
 
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
-            () -> {
-                // "Sleep" so time will progress
-                time.sleep(1L);
-
-                sinkTaskContext.getValue().offset(offsets);
-                rebalanceListener.getValue().onPartitionsAssigned(partitions);
-
-                ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                        Collections.singletonMap(
-                                new TopicPartition(TOPIC, PARTITION),
-                                Arrays.asList(new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE,
-                                    0, 0, RAW_KEY, RAW_VALUE, new 
RecordHeaders(), Optional.empty())
-                                )));
-                recordsReturned++;
-                return records;
-            });
-
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
-
-        sinkTask.open(partitions);
-        EasyMock.expectLastCall();
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
+        when(consumer.poll(any(Duration.class))).thenAnswer(invocation -> {
+            
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+            return ConsumerRecords.empty();
+        }).thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> {
+            // "Sleep" so time will progress
+            time.sleep(1L);
+
+            sinkTaskContext.getValue().offset(offsets);
+            rebalanceListener.getValue().onPartitionsAssigned(partitions);
+
+            TopicPartition topicPartition = new TopicPartition(TOPIC, 
PARTITION);
+            ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(
+                    TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, 
TIMESTAMP, TIMESTAMP_TYPE,
+                    0, 0, RAW_KEY, RAW_VALUE, emptyHeaders(), 
Optional.empty());
+            ConsumerRecords<byte[], byte[]> records =
+                    new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, 
singletonList(consumerRecord)));
+            recordsReturned++;
+            return records;
+        });
+
+        when(keyConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        when(valueConverter.toConnectData(TOPIC, emptyHeaders(), 
RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
     }
 
-    private Capture<OffsetCommitCallback> expectOffsetCommit(final long 
expectedMessages,
-                                                             final 
RuntimeException error,
-                                                             final Exception 
consumerCommitError,
-                                                             final long 
consumerCommitDelayMs,
-                                                             final boolean 
invokeCallback) {
+    @SuppressWarnings("rawtypes")

Review Comment:
   For my understanding, what in the function is causing this warning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to