This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new ebf735f KafkaCrossDcConsumerTest fix (#80)
ebf735f is described below
commit ebf735f69676e773a5c86dfe5fdca79b637fd7d0
Author: Marcin Górski <[email protected]>
AuthorDate: Mon Oct 30 10:14:39 2023 +0100
KafkaCrossDcConsumerTest fix (#80)
---
.../crossdc/consumer/KafkaCrossDcConsumerTest.java | 76 ++++++++++------------
1 file changed, 33 insertions(+), 43 deletions(-)
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 709472c..ab9e3d0 100644
---
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -114,10 +114,24 @@ public class KafkaCrossDcConsumerTest {
*/
@Test
public void kafkaCrossDcConsumerCreationWithConfigurationAndStartLatch() {
- KafkaCrossDcConf conf = testCrossDCConf();
CountDownLatch startLatch = new CountDownLatch(1);
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer kafkaCrossDcConsumer = spy(new
KafkaCrossDcConsumer(conf, startLatch) {
+ @Override
+ public KafkaConsumer<String, MirroredSolrRequest>
createKafkaConsumer(Properties properties) {
+ return mockConsumer;
+ }
+
+ @Override
+ public SolrMessageProcessor createSolrMessageProcessor() {
+ return messageProcessorMock;
+ }
- KafkaCrossDcConsumer kafkaCrossDcConsumer = new
KafkaCrossDcConsumer(conf, startLatch);
+ @Override
+ protected KafkaMirroringSink
createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ });
assertNotNull(kafkaCrossDcConsumer);
assertEquals(1, startLatch.getCount());
@@ -162,8 +176,9 @@ public class KafkaCrossDcConsumerTest {
@Test
public void testHandleFailedResubmit() throws Exception {
// Set up the KafkaCrossDcConsumer
- KafkaCrossDcConf testConf = testCrossDCConf();
- KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf,
new CountDownLatch(0)));
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer consumer = createCrossDcConsumerSpy(mockConsumer);
+
doNothing().when(consumer).sendBatch(any(UpdateRequest.class),
any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
// Set up the SolrMessageProcessor mock
@@ -188,29 +203,16 @@ public class KafkaCrossDcConsumerTest {
@Test
public void testCreateKafkaCrossDcConsumer() {
- KafkaCrossDcConsumer consumer = new KafkaCrossDcConsumer(conf, new
CountDownLatch(1));
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer consumer = createCrossDcConsumerSpy(mockConsumer);
+
assertNotNull(consumer);
}
@Test
public void testHandleValidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
- KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf,
new CountDownLatch(1)) {
- @Override
- public KafkaConsumer<String, MirroredSolrRequest>
createKafkaConsumer(Properties properties) {
- return mockConsumer;
- }
-
- @Override
- public SolrMessageProcessor createSolrMessageProcessor() {
- return messageProcessorMock;
- }
-
- @Override
- protected KafkaMirroringSink
createKafkaMirroringSink(KafkaCrossDcConf conf) {
- return kafkaMirroringSinkMock;
- }
- });
+ KafkaCrossDcConsumer spyConsumer =
createCrossDcConsumerSpy(mockConsumer);
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "1");
@@ -277,22 +279,7 @@ public class KafkaCrossDcConsumerTest {
@Test
public void testHandleWakeupException() {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
- KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf,
new CountDownLatch(1)) {
- @Override
- public KafkaConsumer<String, MirroredSolrRequest>
createKafkaConsumer(Properties properties) {
- return mockConsumer;
- }
-
- @Override
- public SolrMessageProcessor createSolrMessageProcessor() {
- return messageProcessorMock;
- }
-
- @Override
- protected KafkaMirroringSink
createKafkaMirroringSink(KafkaCrossDcConf conf) {
- return kafkaMirroringSinkMock;
- }
- });
+ KafkaCrossDcConsumer spyConsumer =
createCrossDcConsumerSpy(mockConsumer);
when(mockConsumer.poll(any())).thenThrow(new WakeupException());
@@ -324,7 +311,15 @@ public class KafkaCrossDcConsumerTest {
@Test
public void testShutdown() {
KafkaConsumer<String, MirroredSolrRequest> mockConsumer =
mock(KafkaConsumer.class);
- KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf,
new CountDownLatch(1)) {
+ KafkaCrossDcConsumer spyConsumer =
createCrossDcConsumerSpy(mockConsumer);
+
+ spyConsumer.shutdown();
+
+ verify(mockConsumer, times(1)).wakeup();
+ }
+
+ private KafkaCrossDcConsumer
createCrossDcConsumerSpy(KafkaConsumer<String, MirroredSolrRequest>
mockConsumer) {
+ return spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
@Override
public KafkaConsumer<String, MirroredSolrRequest>
createKafkaConsumer(Properties properties) {
return mockConsumer;
@@ -340,10 +335,5 @@ public class KafkaCrossDcConsumerTest {
return kafkaMirroringSinkMock;
}
});
-
-
- spyConsumer.shutdown();
-
- verify(mockConsumer, times(1)).wakeup();
}
}