C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1097852548


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest {
     @Mock private ConnectorOffsetBackingStore offsetStore;
     @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerSourceTaskContext sourceTaskContext;
-    @MockStrict private TaskStatus.Listener statusListener;

Review Comment:
   In order to retain the same guarantees we have currently w/r/t interactions 
with this class, can we add a call to 
`verifyNoMoreInteractions(statusListener);` in the `tearDown` method?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,93 +814,25 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(transformationChain.apply(any(SourceRecord.class)))
+            .thenAnswer((Answer<SourceRecord>) invocation -> 
invocation.getArgument(0));
+        when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+            anyString()))
+            .thenAnswer((Answer<byte[]>) invocation -> {
+                String headerValue = invocation.getArgument(3, String.class);
+                return headerValue.getBytes(StandardCharsets.UTF_8);
+            });
+        when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+            .thenReturn(SERIALIZED_KEY);
+        when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(RECORD_SCHEMA),
+            eq(RECORD)))
+            .thenReturn(SERIALIZED_RECORD);
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
-        if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
-
-        expectApplyTransformationChain(anyTimes);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
-
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
-
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
-
-        expectTaskGetTopic(anyTimes);
-
-        return sent;
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
{
-        return expectSendRecord(TOPIC, true, emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }

Review Comment:
   Why remove this method? It seems like now there's a lot more duplication in 
each test case that used to call it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to