C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1107510674


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -235,115 +236,100 @@ public void testMetricsGroup() {
     public void testSendRecordsConvertsData() {
         createWorkerTask();
 
-        List<SourceRecord> records = new ArrayList<>();
         // Can just use the same record for key and value
-        records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+        List<SourceRecord> records = Collections.singletonList(
+            new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD)
+        );
 
+        expectSendRecord(emptyHeaders());
         expectTopicCreation(TOPIC);
 
-        PowerMock.replayAll();
-
         workerTask.toSend = records;
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
verifySendRecord();
+
         assertEquals(SERIALIZED_KEY, sent.getValue().key());
         assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
-        PowerMock.verifyAll();
+        verifyTaskGetTopic();
     }
 
     @Test
     public void testSendRecordsPropagatesTimestamp() {
         final Long timestamp = System.currentTimeMillis();
-
         createWorkerTask();
 
-        List<SourceRecord> records = Collections.singletonList(
-                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
-        );
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
-
+        expectSendRecord(emptyHeaders());
         expectTopicCreation(TOPIC);
 
-        PowerMock.replayAll();
-
-        workerTask.toSend = records;
+        workerTask.toSend = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
verifySendRecord();
         assertEquals(timestamp, sent.getValue().timestamp());
 
-        PowerMock.verifyAll();
+        verifyTaskGetTopic();
     }
 
     @Test
     public void testSendRecordsCorruptTimestamp() {
         final Long timestamp = -3L;
         createWorkerTask();
 
-        List<SourceRecord> records = Collections.singletonList(
+        expectSendRecord(emptyHeaders());
+        expectTopicCreation(TOPIC);

Review Comment:
   Why is this added? We're testing a scenario where the task fails on an 
invalid record timestamp, it should never get to the point of attempting to 
create a topic.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = 
sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> 
verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = 
ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), 
producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 
0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
{
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = 
ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), 
topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());

Review Comment:
   The old `expectTaskGetTopic` method is related to recording topic usage in 
the status backing store, not topic creation.
   
   This part should be pulled out into a separate method since the two are not 
necessarily related to each other.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse 
answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new 
TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });

Review Comment:
   There's an easier way to set up consecutive expectations in Mockito:
   ```suggestion
           when(admin.createOrFindTopics(any(NewTopic.class)))
                   // First call to create the topic times out
                   .thenThrow(new RetriableException(new 
TimeoutException("timeout")))
                   // Next attempt succeeds
                   .thenReturn(createdTopic(TOPIC));
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse 
answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new 
TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });
 
         workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   Two notes that I can't leave directly inline due to GitHub UI limitations:
   
   1. After line 466 (`assertEquals(Arrays.asList(record1, record2), 
workerTask.toSend);`), we should add a single call to `verifyTopicCreation()`
   2. After line 470 (`assertNull(workerTask.toSend);`), we should add a 
follow-up call to `verifyTopicCreation(times(2));`
   
   This assumes that `verifyTopicCreation` is broken up into two overloaded 
variants:
   ```java
       private void verifyTopicCreation() {
           verifyTopicCreation(times(1));
       }
   
       private void verifyTopicCreation(VerificationMode mode) {
           ArgumentCaptor<NewTopic> newTopicCapture = 
ArgumentCaptor.forClass(NewTopic.class);
           verify(admin, mode).createOrFindTopics(newTopicCapture.capture());
           assertEquals(TOPIC, newTopicCapture.getValue().name());
       }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = 
sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> 
verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = 
ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), 
producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 
0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
{
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = 
ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), 
topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());
     }
 
+    @SuppressWarnings("SameParameterValue")
     private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }
+        when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+        
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(topic));
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
         Set<String> created = Collections.singleton(topic);
         Set<String> existing = Collections.emptySet();
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
         Set<String> created = Collections.emptySet();
         Set<String> existing = Collections.singleton(topic);
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
     private void expectPreliminaryCalls() {
-        expectPreliminaryCalls(TOPIC);
-    }
+        expectConvertHeadersAndKeyValue(emptyHeaders());
+        expectApplyTransformationChain();
+    }
+
+    private void expectConvertHeadersAndKeyValue(Headers headers) {

Review Comment:
   This assumes that we'll always handle records whose topic is `TOPIC`, but 
that assumption doesn't hold in the `testSendRecordsTopicDescribeRetriesMidway` 
and `testSendRecordsTopicCreateRetriesMidway` cases, where we try to send 
records with the topic `OTHER_TOPIC`.
   
   Even though the tests are passing right now, it's important that we add 
expectations for converting records for the other topic since the strict 
stubbing we've configured Mockito with will fail the test if those expectations 
are not utilized, which effectively increases the coverage in these tests.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -485,32 +479,45 @@ public void testSendRecordsTopicDescribeRetriesMidway() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First round
-        expectPreliminaryCalls(OTHER_TOPIC);
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
-
-        // First call to describe the topic times out
-        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
-                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+        expectPreliminaryCalls();
 
-        // Second round
-        expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
+        when(admin.describeTopics(anyString())).thenAnswer(new 
Answer<Map<String, TopicDescription>>() {
+            int counter = 0;
 
-        PowerMock.replayAll();
+            @Override
+            public Map<String, TopicDescription> answer(InvocationOnMock 
invocation) {
+                counter++;
+                if (counter == 2) {
+                    throw new RetriableException(new 
TimeoutException("timeout"));
+                }
 
-        // Try to send 3, make first pass, second fail. Should save last two
+                return Collections.emptyMap();
+            }
+        });

Review Comment:
   This can be simplified:
   
   ```java
           expectTopicCreation(TOPIC);
           when(admin.describeTopics(eq(OTHER_TOPIC)))
                   .thenThrow(new RetriableException(new 
TimeoutException("timeout")))
                   .thenReturn(Collections.emptyMap());
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -425,17 +412,21 @@ public void testSendRecordsTopicDescribeRetries() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
         expectPreliminaryCalls();
-        // First round - call to describe the topic times out
-        EasyMock.expect(admin.describeTopics(TOPIC))
-                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
-
-        // Second round - calls to describe and create succeed
         expectTopicCreation(TOPIC);

Review Comment:
   We shouldn't be establishing this expectation before invoking 
`workerTask::sendRecords` the first time, since we don't actually expect topic 
creation to take place then.
   
   I think this could be better structured by replacing lines 415 
(`expectTopicCreation(TOPIC)`) through to the end of the test case with this:
   
   ```java
           // First round - call to describe the topic times out
           when(admin.describeTopics(TOPIC)).thenThrow(new 
RetriableException(new TimeoutException("timeout")));
   
           workerTask.toSend = Arrays.asList(record1, record2);
           workerTask.sendRecords();
           assertEquals(Arrays.asList(record1, record2), workerTask.toSend);
           verifyNoMoreInteractions(admin);
   
           reset(admin);
           // Second round - calls to describe and create succeed
           expectTopicCreation(TOPIC);
   
           workerTask.sendRecords();
           assertNull(workerTask.toSend);
           verifyTopicCreation();
   ```
   
   (assuming a `verifyTopicCreation()` method is added)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -404,17 +391,17 @@ public void testTopicCreateWhenTopicExists() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
         expectPreliminaryCalls();
+
         TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
         TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC,
 topicDesc));
+        
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
 
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        expectSendRecord(emptyHeaders());
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        verifySendRecord(2);

Review Comment:
   We need to make sure that, under these circumstances, we never tried to 
create a topic. One way to accomplish this is to verify that we never used the 
`TopicAdmin` for anything except the call to `describeTopics`:
   ```suggestion
           verifySendRecord(2);
           // Make sure we didn't try to create the topic after finding out it 
already existed
           verifyNoMoreInteractions(admin);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to