C0urante commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1011902286


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -147,10 +168,6 @@ public void setUp() {
 
     @Test
     public void testStartStop() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
 

Review Comment:
   Nit: can we remove all empty lines at the beginning and ends of methods?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -547,32 +531,16 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
     private void setupWithAdmin() {
         Supplier<TopicAdmin> adminSupplier = () -> admin;
         java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
-    }
-
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
+        store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
     }
 
-    private void expectStart() throws Exception {
-        initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
+    private void expectStart() {
+        verify(initializer).accept(any());
     }
 
     private void expectStop() {
-        producer.close();
-        PowerMock.expectLastCall();
+        verify(producer).close();
         // MockConsumer close is checked after test.

Review Comment:
   I think we can just fold this logic into the method here since it's used 
consistently throughout each test case that invokes this method:
   ```
           assertFalse(store.thread.isAlive());
           assertTrue(consumer.closed());
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -547,32 +540,22 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
     private void setupWithAdmin() {
         Supplier<TopicAdmin> adminSupplier = () -> admin;
         java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
+        store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
     }
 
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
+    final Field storeThreadField = 
FieldUtils.getField(MockedKafkaBasedLog.class, "thread", true);
+    private Thread getStorePrivateThread() throws IllegalAccessException {
+        return (Thread) storeThreadField.get(store);
     }
 
-    private void expectStart() throws Exception {
+    private void expectStart() {
         initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
+        verify(initializer, times(1)).run();
     }
 
     private void expectStop() {
-        producer.close();
-        PowerMock.expectLastCall();
+        doNothing().when(producer).close();
         // MockConsumer close is checked after test.
     }
 
-    private static ByteBuffer buffer(String v) {

Review Comment:
   Thanks for the cleanup 🎉



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -147,10 +168,6 @@ public void setUp() {
 
     @Test
     public void testStartStop() throws Exception {
-        expectStart();

Review Comment:
   Don't we lose coverage here? The existing test verifies that the log invokes 
its `createProducer` and `createConsumer` methods exactly once on startup. We 
should keep those guarantees (especially since those methods have become 
pluggable over time).
   
   One way to do this is to add tracking logic to the `MockedKafkaBasedLog` 
class for how many times its `createProducer` and `createConsumer` methods are 
invoked, and then verify that the number of invocations matches an expected 
value in the `expectStart` method.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -471,51 +468,42 @@ public void testProducerError() throws Exception {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertFalse(store.thread.isAlive());
         assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        expectStop();
     }
 
     @Test
-    public void testReadEndOffsetsUsingAdmin() throws Exception {
+    public void testReadEndOffsetsUsingAdmin() {
         // Create a log that uses the admin supplier
         setupWithAdmin();
-        expectProducerAndConsumerCreate();
 
         Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-        admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-
-        PowerMock.replayAll();
+        when(admin.retryEndOffsets(eq(tps), any(), 
anyLong())).thenReturn(endOffsets);
+        when(admin.endOffsets(eq(tps))).thenReturn(endOffsets);

Review Comment:
   I think we technically lose coverage here in that we no longer verify that 
these methods are invoked once and only once. But that should be fine; those 
guarantees aren't crucial.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -547,32 +531,16 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
     private void setupWithAdmin() {
         Supplier<TopicAdmin> adminSupplier = () -> admin;
         java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
-    }
-
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
+        store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
     }
 
-    private void expectStart() throws Exception {
-        initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
+    private void expectStart() {
+        verify(initializer).accept(any());
     }
 
     private void expectStop() {

Review Comment:
   Same thought: should name this `verifyStop` instead.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -234,43 +242,42 @@ public void testReloadOnStartWithNoNewRecordsPresent() 
throws Exception {
         });
 
         store.start();
+        expectStart();
 
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         assertEquals(7L, consumer.position(TP0));
         assertEquals(7L, consumer.position(TP1));
 
         store.stop();
+        expectStop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertFalse(store.thread.isAlive());
         assertTrue(consumer.closed());
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testSendAndReadToEnd() throws Exception {
-        expectStart();
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
         ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
+        ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 =
+            
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+        when(producer.send(eq(tp0Record), 
callback0.capture())).thenReturn(tp0Future);
         TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
         ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, 
TP1_KEY, TP1_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), 
EasyMock.capture(callback1))).andReturn(tp1Future);
+        ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 =
+            
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+        when(producer.send(eq(tp1Record), 
callback1.capture())).thenReturn(tp1Future);
 
         // Producer flushes when read to log end is called
-        producer.flush();
-        PowerMock.expectLastCall();
-
-        expectStop();
-
-        PowerMock.replayAll();
+        doNothing().when(producer).flush();

Review Comment:
   This isn't necessary anymore; we don't have to establish no-op expectations 
for void methods.
   
   It does technically provide some guarantees that `producer::flush` has been 
invoked by the end of the test since we'd see an `UnnecessaryStubbingException` 
otherwise, but it'd be better to be more explicit about that by adding 
`verify(producer).flush()` sometime after the call to `store::readToEnd`. We 
probably don't want to put it _directly_ after that call since it's made inside 
the mock consumer, but it should be fine to add it after the line with 
`readEndFutureCallback::get`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -547,32 +531,16 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
     private void setupWithAdmin() {
         Supplier<TopicAdmin> adminSupplier = () -> admin;
         java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
-    }
-
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
+        store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer);
     }
 
-    private void expectStart() throws Exception {
-        initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
+    private void expectStart() {

Review Comment:
   This no longer establishes expectations, but verifies them. Should probably 
rename to `verifyStart`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -370,34 +373,31 @@ public void testPollConsumerError() throws Exception {
             consumer.schedulePollTask(finishedLatch::countDown);
         });
         store.start();
+        expectStart();
         assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         assertEquals(1L, consumer.position(TP0));
 
         store.stop();
+        expectStop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
+        assertFalse(store.thread.isAlive());
         assertTrue(consumer.closed());
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
-        expectStart();
-
         // Producer flushes when read to log end is called
         producer.flush();
-        PowerMock.expectLastCall();
-
-        expectStop();
+        doNothing().when(producer).flush();

Review Comment:
   Same comment; we should replace this with an explicit verify step.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -147,10 +168,6 @@ public void setUp() {
 
     @Test
     public void testStartStop() throws Exception {
-        expectStart();
-        expectStop();

Review Comment:
   Don't we also lose coverage here? The current tests ensure that the producer 
is closed; we should retain those guarantees.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -115,10 +115,32 @@ public class KafkaBasedLogTest {
     private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
     private Time time = new MockTime();
-    private KafkaBasedLog<String, String> store;
+
+    private class MockedKafkaBasedLog extends KafkaBasedLog<String, String> {
+        public MockedKafkaBasedLog(String topic,
+                                   Map<String, Object> producerConfigs,
+                                   Map<String, Object> consumerConfigs,
+                                   Supplier<TopicAdmin> topicAdminSupplier,
+                                   Callback<ConsumerRecord<String, String>> 
consumedCallback,
+                                   Time time,
+                                   Consumer<TopicAdmin> initializer) {
+            super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, 
consumedCallback, time, initializer);
+        }
+
+        @Override
+        protected KafkaProducer<String, String> createProducer() {
+            return producer;
+        }
+
+        @Override
+        protected MockConsumer<String, String> createConsumer() {
+            return consumer;
+        }
+    }
+    private MockedKafkaBasedLog store;
 
     @Mock
-    private Runnable initializer;
+    private Consumer<TopicAdmin> initializer;

Review Comment:
   The existing tests use a different (now-deprecated) constructor that accepts 
a `Runnable` initializer: 
https://github.com/apache/kafka/blob/dc18dd921c1e6f94f19212f45a4f851b1894dbdb/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L125-L132
   
   It should be fine to use the new constructor since it's trivially verifiable 
that the logic in this PR replicates the logic in the older constructor and we 
don't have to worry about a loss of coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to