yashmayya commented on code in PR #14153:
URL: https://github.com/apache/kafka/pull/14153#discussion_r1286889309
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -64,18 +58,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaBasedLog.class)
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
Review Comment:
We should also remove this test from the list here so that it isn't skipped
on Java 16 and newer -
https://github.com/apache/kafka/blob/8dec3e66163420ee0c2c259eef6e0c0f3185ca17/build.gradle#L413-L423
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -114,29 +113,29 @@ public class KafkaBasedLogTest {
private static final String TP0_VALUE_NEW = "VAL0_NEW";
private static final String TP1_VALUE_NEW = "VAL1_NEW";
- private Time time = new MockTime();
+ private final Time time = new MockTime();
private KafkaBasedLog<String, String> store;
@Mock
- private Runnable initializer;
+ private Consumer<TopicAdmin> initializer;
@Mock
private KafkaProducer<String, String> producer;
private MockConsumer<String, String> consumer;
@Mock
private TopicAdmin admin;
+ @Mock
+ private Supplier<TopicAdmin> topicAdminSupplier;
- private Map<TopicPartition, List<ConsumerRecord<String, String>>>
consumedRecords = new HashMap<>();
- private Callback<ConsumerRecord<String, String>> consumedCallback =
(error, record) -> {
+ private final Map<TopicPartition, List<ConsumerRecord<String, String>>>
consumedRecords = new HashMap<>();
+ private final Callback<ConsumerRecord<String, String>> consumedCallback =
(error, record) -> {
TopicPartition partition = new TopicPartition(record.topic(),
record.partition());
List<ConsumerRecord<String, String>> records =
consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
records.add(record);
};
- @SuppressWarnings("unchecked")
@Before
public void setUp() {
- store = PowerMock.createPartialMock(KafkaBasedLog.class, new
String[]{"createConsumer", "createProducer"},
- TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time,
initializer);
+ store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS,
topicAdminSupplier, consumedCallback, time, initializer));
Review Comment:
It looks like the only reason we need a partial mock here is to stub in the
mock producer / consumer. Since the `KafkaBasedLog::createProducer` and
`KafkaBasedLog::createConsumer` methods are now `protected` (looks like they
were `private` when this test was originally written), could we just override
those methods to return the mock clients and use a regular instance instead of
using a spy / partial mock? I'd prefer it if we could avoid the use of spies /
partial mocks as far as possible, since they aren't very OOP-y.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -160,18 +156,12 @@ public void testStartStop() throws Exception {
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
store.stop();
-
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
Review Comment:
Aren't we losing this test coverage? Can we bump up the visibility of the
`thread` instance variable to `package-private` (with a comment stating that it
is being made visible for testing) and retain this coverage?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -486,12 +440,8 @@ public void testReadEndOffsetsUsingAdmin() throws
Exception {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
- admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(),
EasyMock.anyLong());
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
- admin.endOffsets(EasyMock.eq(tps));
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-
- PowerMock.replayAll();
+ when(admin.retryEndOffsets(eq(tps), any(),
anyLong())).thenReturn(endOffsets);
+ when(admin.endOffsets(eq(tps))).thenReturn(endOffsets);
Review Comment:
We're losing out on the existing coverage here since we're no longer
ensuring that these methods are called one time only. Can we add an explicit
verification for this at the end of the test?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -531,48 +478,28 @@ public void
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
// Getting end offsets upon startup should work fine
- admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(),
EasyMock.anyLong());
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+ when(admin.retryEndOffsets(eq(tps), any(),
anyLong())).thenReturn(endOffsets);
// Getting end offsets using the admin client should fail with leader
not available
- admin.endOffsets(EasyMock.eq(tps));
- PowerMock.expectLastCall().andThrow(new
LeaderNotAvailableException("retry"));
-
- PowerMock.replayAll();
+ when(admin.endOffsets(eq(tps))).thenThrow(new
LeaderNotAvailableException("retry"));
store.start();
assertThrows(LeaderNotAvailableException.class, () ->
store.readEndOffsets(tps, false));
}
- @SuppressWarnings("unchecked")
private void setupWithAdmin() {
Supplier<TopicAdmin> adminSupplier = () -> admin;
java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
- store = PowerMock.createPartialMock(KafkaBasedLog.class, new
String[]{"createConsumer", "createProducer"},
- TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier,
consumedCallback, time, initializer);
- }
-
- private void expectProducerAndConsumerCreate() throws Exception {
- PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
- PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
- }
-
- private void expectStart() throws Exception {
- initializer.run();
- EasyMock.expectLastCall().times(1);
-
- expectProducerAndConsumerCreate();
+ store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS,
adminSupplier, consumedCallback, time, initializer));
Review Comment:
Do we still need this separate `setupWithAdmin` method now that we're
setting up the `KafkaBasedLog` store with a topic admin supplier even in the
regular `setUp` method?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -241,30 +225,20 @@ public void testReloadOnStartWithNoNewRecordsPresent()
throws Exception {
store.stop();
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ verifyStartAndStop();
}
@Test
public void testSendAndReadToEnd() throws Exception {
- expectStart();
+ expectProducerAndConsumerCreate();
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC,
TP0_KEY, TP0_VALUE);
- Capture<org.apache.kafka.clients.producer.Callback> callback0 =
EasyMock.newCapture();
- EasyMock.expect(producer.send(EasyMock.eq(tp0Record),
EasyMock.capture(callback0))).andReturn(tp0Future);
+ ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 =
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+ when(producer.send(eq(tp0Record),
callback0.capture())).thenReturn(tp0Future);
TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC,
TP1_KEY, TP1_VALUE);
- Capture<org.apache.kafka.clients.producer.Callback> callback1 =
EasyMock.newCapture();
- EasyMock.expect(producer.send(EasyMock.eq(tp1Record),
EasyMock.capture(callback1))).andReturn(tp1Future);
-
- // Producer flushes when read to log end is called
Review Comment:
Can we retain this clarifying comment (we can add it above the producer
flush verification)?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -531,48 +478,28 @@ public void
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
// Getting end offsets upon startup should work fine
- admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(),
EasyMock.anyLong());
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+ when(admin.retryEndOffsets(eq(tps), any(),
anyLong())).thenReturn(endOffsets);
// Getting end offsets using the admin client should fail with leader
not available
- admin.endOffsets(EasyMock.eq(tps));
- PowerMock.expectLastCall().andThrow(new
LeaderNotAvailableException("retry"));
-
- PowerMock.replayAll();
+ when(admin.endOffsets(eq(tps))).thenThrow(new
LeaderNotAvailableException("retry"));
store.start();
assertThrows(LeaderNotAvailableException.class, () ->
store.readEndOffsets(tps, false));
}
- @SuppressWarnings("unchecked")
private void setupWithAdmin() {
Supplier<TopicAdmin> adminSupplier = () -> admin;
java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
- store = PowerMock.createPartialMock(KafkaBasedLog.class, new
String[]{"createConsumer", "createProducer"},
- TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier,
consumedCallback, time, initializer);
- }
-
- private void expectProducerAndConsumerCreate() throws Exception {
- PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
- PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
- }
-
- private void expectStart() throws Exception {
- initializer.run();
- EasyMock.expectLastCall().times(1);
-
- expectProducerAndConsumerCreate();
+ store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS,
adminSupplier, consumedCallback, time, initializer));
}
- private void expectStop() {
- producer.close();
- PowerMock.expectLastCall();
- // MockConsumer close is checked after test.
+ private void expectProducerAndConsumerCreate() {
+ doReturn(producer).when(store).createProducer();
+ doReturn(consumer).when(store).createConsumer();
}
- private static ByteBuffer buffer(String v) {
- return ByteBuffer.wrap(v.getBytes());
+ private void verifyStartAndStop() {
+ verify(initializer).accept(any());
Review Comment:
Now that we're moving to the newer non-deprecated `KafkaBasedLog`
constructor in this test class, could we enhance this verification to also
verify that the initializer is called with the admin client from the supplier -
https://github.com/apache/kafka/blob/8dec3e66163420ee0c2c259eef6e0c0f3185ca17/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L232-L240
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]