C0urante commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1011902286
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -147,10 +168,6 @@ public void setUp() { @Test public void testStartStop() throws Exception { - expectStart(); - expectStop(); - - PowerMock.replayAll(); Review Comment: Nit: can we remove all empty lines at the beginning and ends of methods? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -547,32 +531,16 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier<TopicAdmin> adminSupplier = () -> admin; java.util.function.Consumer<TopicAdmin> initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); - } - - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); + store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } - private void expectStart() throws Exception { - initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); + private void expectStart() { + verify(initializer).accept(any()); } private void expectStop() { - producer.close(); - PowerMock.expectLastCall(); + verify(producer).close(); // MockConsumer close is checked after test. Review Comment: I think we can just fold this logic into the method here since it's used consistently throughout each test case that invokes this method: ``` assertFalse(store.thread.isAlive()); assertTrue(consumer.closed()); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -547,32 +540,22 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier<TopicAdmin> adminSupplier = () -> admin; java.util.function.Consumer<TopicAdmin> initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); + store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); + final Field storeThreadField = FieldUtils.getField(MockedKafkaBasedLog.class, "thread", true); + private Thread getStorePrivateThread() throws IllegalAccessException { + return (Thread) storeThreadField.get(store); } - private void expectStart() throws Exception { + private void expectStart() { initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); + verify(initializer, times(1)).run(); } private void expectStop() { - producer.close(); - PowerMock.expectLastCall(); + doNothing().when(producer).close(); // MockConsumer close is checked after test. } - private static ByteBuffer buffer(String v) { Review Comment: Thanks for the cleanup 🎉 ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -147,10 +168,6 @@ public void setUp() { @Test public void testStartStop() throws Exception { - expectStart(); Review Comment: Don't we lose coverage here? The existing test verifies that the log invokes its `createProducer` and `createConsumer` methods exactly once on startup. We should keep those guarantees (especially since those methods have become pluggable over time). One way to do this is to add tracking logic to the `MockedKafkaBasedLog` class for how many times its `createProducer` and `createConsumer` methods are invoked, and then verify that the number of invocations matches an expected value in the `expectStart` method. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -471,51 +468,42 @@ public void testProducerError() throws Exception { store.stop(); - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertFalse(store.thread.isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); + expectStop(); } @Test - public void testReadEndOffsetsUsingAdmin() throws Exception { + public void testReadEndOffsetsUsingAdmin() { // Create a log that uses the admin supplier setupWithAdmin(); - expectProducerAndConsumerCreate(); Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1)); Map<TopicPartition, Long> endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - - PowerMock.replayAll(); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); + when(admin.endOffsets(eq(tps))).thenReturn(endOffsets); Review Comment: I think we technically lose coverage here in that we no longer verify that these methods are invoked once and only once. But that should be fine; those guarantees aren't crucial. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -547,32 +531,16 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier<TopicAdmin> adminSupplier = () -> admin; java.util.function.Consumer<TopicAdmin> initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); - } - - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); + store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } - private void expectStart() throws Exception { - initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); + private void expectStart() { + verify(initializer).accept(any()); } private void expectStop() { Review Comment: Same thought: should name this `verifyStop` instead. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -234,43 +242,42 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { }); store.start(); + expectStart(); assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); assertEquals(7L, consumer.position(TP0)); assertEquals(7L, consumer.position(TP1)); store.stop(); + expectStop(); - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertFalse(store.thread.isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test public void testSendAndReadToEnd() throws Exception { - expectStart(); TestFuture<RecordMetadata> tp0Future = new TestFuture<>(); ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = + ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); TestFuture<RecordMetadata> tp1Future = new TestFuture<>(); ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); + ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 = + ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp1Record), callback1.capture())).thenReturn(tp1Future); // Producer flushes when read to log end is called - producer.flush(); - PowerMock.expectLastCall(); - - expectStop(); - - PowerMock.replayAll(); + doNothing().when(producer).flush(); Review Comment: This isn't necessary anymore; we don't have to establish no-op expectations for void methods. It does technically provide some guarantees that `producer::flush` has been invoked by the end of the test since we'd see an `UnnecessaryStubbingException` otherwise, but it'd be better to be more explicit about that by adding `verify(producer).flush()` sometime after the call to `store::readToEnd`. We probably don't want to put it _directly_ after that call since it's made inside the mock consumer, but it should be fine to add it after the line with `readEndFutureCallback::get`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -547,32 +531,16 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio private void setupWithAdmin() { Supplier<TopicAdmin> adminSupplier = () -> admin; java.util.function.Consumer<TopicAdmin> initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); - } - - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); + store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); } - private void expectStart() throws Exception { - initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); + private void expectStart() { Review Comment: This no longer establishes expectations, but verifies them. Should probably rename to `verifyStart`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -370,34 +373,31 @@ public void testPollConsumerError() throws Exception { consumer.schedulePollTask(finishedLatch::countDown); }); store.start(); + expectStart(); assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS)); assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); assertEquals(1L, consumer.position(TP0)); store.stop(); + expectStop(); - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); + assertFalse(store.thread.isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { - expectStart(); - // Producer flushes when read to log end is called producer.flush(); - PowerMock.expectLastCall(); - - expectStop(); + doNothing().when(producer).flush(); Review Comment: Same comment; we should replace this with an explicit verify step. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -147,10 +168,6 @@ public void setUp() { @Test public void testStartStop() throws Exception { - expectStart(); - expectStop(); Review Comment: Don't we also lose coverage here? The current tests ensure that the producer is closed; we should retain those guarantees. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -115,10 +115,32 @@ public class KafkaBasedLogTest { private static final String TP1_VALUE_NEW = "VAL1_NEW"; private Time time = new MockTime(); - private KafkaBasedLog<String, String> store; + + private class MockedKafkaBasedLog extends KafkaBasedLog<String, String> { + public MockedKafkaBasedLog(String topic, + Map<String, Object> producerConfigs, + Map<String, Object> consumerConfigs, + Supplier<TopicAdmin> topicAdminSupplier, + Callback<ConsumerRecord<String, String>> consumedCallback, + Time time, + Consumer<TopicAdmin> initializer) { + super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer); + } + + @Override + protected KafkaProducer<String, String> createProducer() { + return producer; + } + + @Override + protected MockConsumer<String, String> createConsumer() { + return consumer; + } + } + private MockedKafkaBasedLog store; @Mock - private Runnable initializer; + private Consumer<TopicAdmin> initializer; Review Comment: The existing tests use a different (now-deprecated) constructor that accepts a `Runnable` initializer: https://github.com/apache/kafka/blob/dc18dd921c1e6f94f19212f45a4f851b1894dbdb/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L125-L132 It should be fine to use the new constructor since it's trivially verifiable that the logic in this PR replicates the logic in the older constructor and we don't have to worry about a loss of coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org