C0urante commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1107510674
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -235,115 +236,100 @@ public void testMetricsGroup() { public void testSendRecordsConvertsData() { createWorkerTask(); - List<SourceRecord> records = new ArrayList<>(); // Can just use the same record for key and value - records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); - - Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); + List<SourceRecord> records = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) + ); + expectSendRecord(emptyHeaders()); expectTopicCreation(TOPIC); - PowerMock.replayAll(); - workerTask.toSend = records; workerTask.sendRecords(); + + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); assertEquals(SERIALIZED_RECORD, sent.getValue().value()); - PowerMock.verifyAll(); + verifyTaskGetTopic(); } @Test public void testSendRecordsPropagatesTimestamp() { final Long timestamp = System.currentTimeMillis(); - createWorkerTask(); - List<SourceRecord> records = Collections.singletonList( - new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) - ); - - Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); - + expectSendRecord(emptyHeaders()); expectTopicCreation(TOPIC); - PowerMock.replayAll(); - - workerTask.toSend = records; + workerTask.toSend = Collections.singletonList( + new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) + ); workerTask.sendRecords(); + + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(); assertEquals(timestamp, sent.getValue().timestamp()); - PowerMock.verifyAll(); + verifyTaskGetTopic(); } @Test public void testSendRecordsCorruptTimestamp() { final Long timestamp = -3L; createWorkerTask(); - List<SourceRecord> records = Collections.singletonList( + expectSendRecord(emptyHeaders()); + expectTopicCreation(TOPIC); Review Comment: Why is this added? We're testing a scenario where the task fails on an invalid record timestamp, it should never get to the point of attempting to create a topic. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + expectSendRecord(emptyHeaders()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - - expectSendRecord(); - expectSendRecord(); - - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2); + + List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues(); + assertEquals(2, capturedValues.size()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( - String topic, - boolean anyTimes, - Headers headers - ) { + private void expectSendRecord(Headers headers) { if (headers != null) - expectConvertHeadersAndKeyValue(topic, anyTimes, headers); + expectConvertHeadersAndKeyValue(headers); - expectApplyTransformationChain(anyTimes); + expectApplyTransformationChain(); - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - - IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( - producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); + expectTaskGetTopic(); + } - IAnswer<Future<RecordMetadata>> expectResponse = () -> { - synchronized (producerCallbacks) { - for (Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); - } - producerCallbacks.reset(); - } - return null; - }; + private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() { + return verifySendRecord(1); + } - if (anyTimes) - expect.andStubAnswer(expectResponse); - else - expect.andAnswer(expectResponse); + private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) { + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class); + ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class); + verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture()); - expectTaskGetTopic(anyTimes); + for (Callback cb : producerCallbacks.getAllValues()) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), + null); + } return sent; } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() { - return expectSendRecord(TOPIC, true, emptyHeaders()); + private void expectTaskGetTopic() { + when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> { + String connector = invocation.getArgument(0, String.class); + String topic = invocation.getArgument(1, String.class); + return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds()); + }); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() { - return expectSendRecord(TOPIC, false, emptyHeaders()); - } + private void verifyTaskGetTopic() { + ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class); + verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture()); - private void expectTaskGetTopic(boolean anyTimes) { - final Capture<String> connectorCapture = EasyMock.newCapture(); - final Capture<String> topicCapture = EasyMock.newCapture(); - IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic( - EasyMock.capture(connectorCapture), - EasyMock.capture(topicCapture))); - if (anyTimes) { - expect.andStubAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } else { - expect.andAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } - if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { - assertEquals("job", connectorCapture.getValue()); - assertEquals(TOPIC, topicCapture.getValue()); - } + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + + verify(admin).createOrFindTopics(newTopicCapture.capture()); + assertEquals(TOPIC, newTopicCapture.getValue().name()); Review Comment: The old `expectTaskGetTopic` method is related to recording topic usage in the status backing store, not topic creation. This part should be pulled out into a separate method since the two are not necessarily related to each other. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - // First call to describe the topic times out expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) - .andThrow(new RetriableException(new TimeoutException("timeout"))); - - // Second round - expectTopicCreation(TOPIC); - expectSendRecord(); - expectSendRecord(); - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))) + .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() { + boolean firstCall = true; + + @Override + public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) { + if (firstCall) { + firstCall = false; + throw new RetriableException(new TimeoutException("timeout")); + } + return createdTopic(TOPIC); + } + }); Review Comment: There's an easier way to set up consecutive expectations in Mockito: ```suggestion when(admin.createOrFindTopics(any(NewTopic.class))) // First call to create the topic times out .thenThrow(new RetriableException(new TimeoutException("timeout"))) // Next attempt succeeds .thenReturn(createdTopic(TOPIC)); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - // First call to describe the topic times out expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) - .andThrow(new RetriableException(new TimeoutException("timeout"))); - - // Second round - expectTopicCreation(TOPIC); - expectSendRecord(); - expectSendRecord(); - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))) + .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() { + boolean firstCall = true; + + @Override + public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) { + if (firstCall) { + firstCall = false; + throw new RetriableException(new TimeoutException("timeout")); + } + return createdTopic(TOPIC); + } + }); workerTask.toSend = Arrays.asList(record1, record2); Review Comment: Two notes that I can't leave directly inline due to GitHub UI limitations: 1. After line 466 (`assertEquals(Arrays.asList(record1, record2), workerTask.toSend);`), we should add a single call to `verifyTopicCreation()` 2. After line 470 (`assertNull(workerTask.toSend);`), we should add a follow-up call to `verifyTopicCreation(times(2));` This assumes that `verifyTopicCreation` is broken up into two overloaded variants: ```java private void verifyTopicCreation() { verifyTopicCreation(times(1)); } private void verifyTopicCreation(VerificationMode mode) { ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class); verify(admin, mode).createOrFindTopics(newTopicCapture.capture()); assertEquals(TOPIC, newTopicCapture.getValue().name()); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); + expectSendRecord(emptyHeaders()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - - expectSendRecord(); - expectSendRecord(); - - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2); + + List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues(); + assertEquals(2, capturedValues.size()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( - String topic, - boolean anyTimes, - Headers headers - ) { + private void expectSendRecord(Headers headers) { if (headers != null) - expectConvertHeadersAndKeyValue(topic, anyTimes, headers); + expectConvertHeadersAndKeyValue(headers); - expectApplyTransformationChain(anyTimes); + expectApplyTransformationChain(); - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - - IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( - producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); + expectTaskGetTopic(); + } - IAnswer<Future<RecordMetadata>> expectResponse = () -> { - synchronized (producerCallbacks) { - for (Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); - } - producerCallbacks.reset(); - } - return null; - }; + private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() { + return verifySendRecord(1); + } - if (anyTimes) - expect.andStubAnswer(expectResponse); - else - expect.andAnswer(expectResponse); + private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) { + ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class); + ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class); + verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture()); - expectTaskGetTopic(anyTimes); + for (Callback cb : producerCallbacks.getAllValues()) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), + null); + } return sent; } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() { - return expectSendRecord(TOPIC, true, emptyHeaders()); + private void expectTaskGetTopic() { + when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> { + String connector = invocation.getArgument(0, String.class); + String topic = invocation.getArgument(1, String.class); + return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds()); + }); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() { - return expectSendRecord(TOPIC, false, emptyHeaders()); - } + private void verifyTaskGetTopic() { + ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class); + verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture()); - private void expectTaskGetTopic(boolean anyTimes) { - final Capture<String> connectorCapture = EasyMock.newCapture(); - final Capture<String> topicCapture = EasyMock.newCapture(); - IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic( - EasyMock.capture(connectorCapture), - EasyMock.capture(topicCapture))); - if (anyTimes) { - expect.andStubAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } else { - expect.andAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } - if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { - assertEquals("job", connectorCapture.getValue()); - assertEquals(TOPIC, topicCapture.getValue()); - } + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); + + verify(admin).createOrFindTopics(newTopicCapture.capture()); + assertEquals(TOPIC, newTopicCapture.getValue().name()); } + @SuppressWarnings("SameParameterValue") private void expectTopicCreation(String topic) { - if (config.topicCreationEnable()) { - EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic)); - } + when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(topic)); } + @SuppressWarnings("SameParameterValue") private TopicAdmin.TopicCreationResponse createdTopic(String topic) { Set<String> created = Collections.singleton(topic); Set<String> existing = Collections.emptySet(); return new TopicAdmin.TopicCreationResponse(created, existing); } + @SuppressWarnings("SameParameterValue") private TopicAdmin.TopicCreationResponse foundTopic(String topic) { Set<String> created = Collections.emptySet(); Set<String> existing = Collections.singleton(topic); return new TopicAdmin.TopicCreationResponse(created, existing); } private void expectPreliminaryCalls() { - expectPreliminaryCalls(TOPIC); - } + expectConvertHeadersAndKeyValue(emptyHeaders()); + expectApplyTransformationChain(); + } + + private void expectConvertHeadersAndKeyValue(Headers headers) { Review Comment: This assumes that we'll always handle records whose topic is `TOPIC`, but that assumption doesn't hold in the `testSendRecordsTopicDescribeRetriesMidway` and `testSendRecordsTopicCreateRetriesMidway` cases, where we try to send records with the topic `OTHER_TOPIC`. Even though the tests are passing right now, it's important that we add expectations for converting records for the other topic since the strict stubbing we've configured Mockito with will fail the test if those expectations are not utilized, which effectively increases the coverage in these tests. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -485,32 +479,45 @@ public void testSendRecordsTopicDescribeRetriesMidway() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - // First round - expectPreliminaryCalls(OTHER_TOPIC); - expectTopicCreation(TOPIC); - expectSendRecord(); - expectSendRecord(); - - // First call to describe the topic times out - EasyMock.expect(admin.describeTopics(OTHER_TOPIC)) - .andThrow(new RetriableException(new TimeoutException("timeout"))); + expectPreliminaryCalls(); - // Second round - expectTopicCreation(OTHER_TOPIC); - expectSendRecord(OTHER_TOPIC, false, emptyHeaders()); + when(admin.describeTopics(anyString())).thenAnswer(new Answer<Map<String, TopicDescription>>() { + int counter = 0; - PowerMock.replayAll(); + @Override + public Map<String, TopicDescription> answer(InvocationOnMock invocation) { + counter++; + if (counter == 2) { + throw new RetriableException(new TimeoutException("timeout")); + } - // Try to send 3, make first pass, second fail. Should save last two + return Collections.emptyMap(); + } + }); Review Comment: This can be simplified: ```java expectTopicCreation(TOPIC); when(admin.describeTopics(eq(OTHER_TOPIC))) .thenThrow(new RetriableException(new TimeoutException("timeout"))) .thenReturn(Collections.emptyMap()); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -425,17 +412,21 @@ public void testSendRecordsTopicDescribeRetries() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); expectPreliminaryCalls(); - // First round - call to describe the topic times out - EasyMock.expect(admin.describeTopics(TOPIC)) - .andThrow(new RetriableException(new TimeoutException("timeout"))); - - // Second round - calls to describe and create succeed expectTopicCreation(TOPIC); Review Comment: We shouldn't be establishing this expectation before invoking `workerTask::sendRecords` the first time, since we don't actually expect topic creation to take place then. I think this could be better structured by replacing lines 415 (`expectTopicCreation(TOPIC)`) through to the end of the test case with this: ```java // First round - call to describe the topic times out when(admin.describeTopics(TOPIC)).thenThrow(new RetriableException(new TimeoutException("timeout"))); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); assertEquals(Arrays.asList(record1, record2), workerTask.toSend); verifyNoMoreInteractions(admin); reset(admin); // Second round - calls to describe and create succeed expectTopicCreation(TOPIC); workerTask.sendRecords(); assertNull(workerTask.toSend); verifyTopicCreation(); ``` (assuming a `verifyTopicCreation()` method is added) ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -404,17 +391,17 @@ public void testTopicCreateWhenTopicExists() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); expectPreliminaryCalls(); + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); - expectSendRecord(); - expectSendRecord(); - - PowerMock.replayAll(); + expectSendRecord(emptyHeaders()); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + + verifySendRecord(2); Review Comment: We need to make sure that, under these circumstances, we never tried to create a topic. One way to accomplish this is to verify that we never used the `TopicAdmin` for anything except the call to `describeTopics`: ```suggestion verifySendRecord(2); // Make sure we didn't try to create the topic after finding out it already existed verifyNoMoreInteractions(admin); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org