This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 4a469af  Add back in the EmbeddedKafkaCluster. (#7)
4a469af is described below

commit 4a469afb0d4b9bf8c60b1c01ca15c680b845ac16
Author: Mark Robert Miller <[email protected]>
AuthorDate: Sat Apr 30 14:48:08 2022 -0500

    Add back in the EmbeddedKafkaCluster. (#7)
---
 crossdc-consumer/build.gradle                      | 10 ++++
 .../apache/solr/crossdc/KafkaMirroringSink.java    |  8 ++-
 .../org/apache/solr/crossdc/consumer/Consumer.java | 58 ++++++++++++--------
 .../messageprocessor/SolrMessageProcessor.java     |  4 +-
 ...ionTest.java => SimpleSolrIntegrationTest.java} | 34 +++++++-----
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  | 64 ++++++++++++++++++++++
 .../SolrKafkaTestsIgnoredThreadsFilter.java        | 41 ++++++++++++++
 7 files changed, 180 insertions(+), 39 deletions(-)

diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 9366996..141b060 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -44,6 +44,16 @@ dependencies {
     testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
'8.11.1'
     testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: '8.11.1'
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
+
+    testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
+    testImplementation ('org.apache.kafka:kafka-clients:2.8.1:test') {
+        //exclude group: 'org.apache.kafka' // or finer grained, if we like
+    }
+    testImplementation ('org.apache.kafka:kafka-streams:2.8.1:test') {
+        //exclude group: 'org.apache.kafka' // or finer grained, if we like
+    }
+    // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
 
 }
 subprojects {
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
index 1b6109a..c5beb49 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -23,10 +23,12 @@ import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-public class KafkaMirroringSink implements RequestMirroringSink {
+public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(KafkaMirroringSink.class);
 
     private long lastSuccessfulEnqueueNanos;
@@ -89,4 +91,8 @@ public class KafkaMirroringSink implements 
RequestMirroringSink {
                 conf.getSlowSubmitThresholdInMillis(),
                 elapsedTimeMillis);
     }
+
+    @Override public void close() throws IOException {
+        producer.close();
+    }
 }
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 3bb62b3..f82eebb 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -35,6 +35,7 @@ import org.eclipse.jetty.server.ServerConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.Collections;
@@ -59,12 +60,15 @@ public class Consumer {
 
     public void start(String[] args) {
 
-        // TODO: use args for config
+        boolean enableDataEncryption = 
Boolean.getBoolean("enableDataEncryption");
+        String topicName = System.getProperty("topicName");
+        String zkConnectString = System.getProperty("zkConnectString");
+
         server = new Server();
         ServerConnector connector = new ServerConnector(server);
         connector.setPort(8090);
         server.setConnectors(new Connector[] {connector});
-        crossDcConsumer = getCrossDcConsumer();
+        crossDcConsumer = getCrossDcConsumer(zkConnectString, topicName, 
enableDataEncryption);
 
         // Start consumer thread
         consumerThreadExecutor = Executors.newSingleThreadExecutor();
@@ -75,9 +79,10 @@ public class Consumer {
         Runtime.getRuntime().addShutdownHook(shutdownHook);
     }
 
-    private CrossDcConsumer getCrossDcConsumer() {
-        // nocommit - hardcoded conf
-        KafkaCrossDcConf conf = new KafkaCrossDcConf("test-topic", true, 
"localhost:2181");
+    private CrossDcConsumer getCrossDcConsumer(String zkConnectString, String 
topicName,
+        boolean enableDataEncryption) {
+
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, 
enableDataEncryption, zkConnectString);
         return new KafkaCrossDcConsumer(conf);
     }
 
@@ -108,7 +113,7 @@ public class Consumer {
      * resubmitting to the queue in case of temporary failures.
      */
     public static class KafkaCrossDcConsumer extends CrossDcConsumer {
-        private static final Logger logger = 
LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
+        private static final Logger log = 
LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
 
         private final KafkaConsumer<String, MirroredSolrRequest> consumer;
         private final KafkaMirroringSink kafkaMirroringSink;
@@ -121,13 +126,13 @@ public class Consumer {
          */
         public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
             final Properties kafkaConsumerProp = new Properties();
-            logger.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProp);
+            log.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProp);
             consumer = createConsumer(kafkaConsumerProp);
 
             // Create producer for resubmitting failed requests
-            logger.info("Creating Kafka resubmit producer");
+            log.info("Creating Kafka resubmit producer");
             this.kafkaMirroringSink = new KafkaMirroringSink(conf);
-            logger.info("Created Kafka resubmit producer");
+            log.info("Created Kafka resubmit producer");
 
         }
 
@@ -144,18 +149,23 @@ public class Consumer {
          */
         @Override
         public void run() {
-            logger.info("About to start Kafka consumer thread ");
+            log.info("About to start Kafka consumer thread...");
             String topic="topic";
 
-            logger.info("Kafka consumer subscribing to topic topic={}", topic);
+            log.info("Kafka consumer subscribing to topic topic={}", topic);
             consumer.subscribe(Collections.singleton(topic));
 
             while (pollAndProcessRequests()) {
                 //no-op within this loop: everything is done in 
pollAndProcessRequests method defined above.
             }
 
-            logger.info("Closed kafka consumer. Exiting now.");
+            log.info("Closed kafka consumer. Exiting now.");
             consumer.close();
+            try {
+                kafkaMirroringSink.close();
+            } catch (IOException e) {
+                log.error("Failed to close kafka mirroring sink", e);
+            }
 
         }
 
@@ -170,7 +180,7 @@ public class Consumer {
                     List<ConsumerRecord<String, MirroredSolrRequest>> 
partitionRecords = records.records(partition);
                     try {
                         for (ConsumerRecord<String, MirroredSolrRequest> 
record : partitionRecords) {
-                            logger.trace("Fetched record from topic={} 
partition={} key={} value={}",
+                            log.trace("Fetched record from topic={} 
partition={} key={} value={}",
                                     record.topic(), record.partition(), 
record.key(), record.value());
                             IQueueHandler.Result result = 
messageProcessor.handleItem(record.value());
                             switch (result.status()) {
@@ -182,7 +192,7 @@ public class Consumer {
                                     break;
                                 case NOT_HANDLED_SHUTDOWN:
                                 case FAILED_RETRY:
-                                    logger.error("Unexpected response while 
processing request. We never expect {}.",
+                                    log.error("Unexpected response while 
processing request. We never expect {}.",
                                             result.status().toString());
                                     break;
                                 default:
@@ -193,28 +203,28 @@ public class Consumer {
 
                         // handleItem sets the thread interrupt, let's exit if 
there has been an interrupt set
                         if(Thread.currentThread().isInterrupted()) {
-                            logger.info("Kafka Consumer thread interrupted, 
shutting down Kafka consumer.");
+                            log.info("Kafka Consumer thread interrupted, 
shutting down Kafka consumer.");
                             return false;
                         }
                     } catch (MirroringException e) {
                         // We don't really know what to do here, so it's wiser 
to just break out.
-                        logger.error("Mirroring exception occured while 
resubmitting to Kafka. We are going to stop the consumer thread now.", e);
+                        log.error("Mirroring exception occured while 
resubmitting to Kafka. We are going to stop the consumer thread now.", e);
                         return false;
                     } catch (WakeupException e) {
-                        logger.info("Caught wakeup exception, shutting down 
KafkaSolrRequestConsumer.");
+                        log.info("Caught wakeup exception, shutting down 
KafkaSolrRequestConsumer.");
                         return false;
                     } catch (Exception e) {
                         // If there is any exception returned by handleItem, 
then reset the offset.
-                        logger.warn("Exception occurred in Kafka consumer 
thread, but we will continue.", e);
+                        log.warn("Exception occurred in Kafka consumer thread, 
but we will continue.", e);
                         resetOffsetForPartition(partition, partitionRecords);
                         break;
                     }
                 }
             } catch (WakeupException e) {
-                logger.info("Caught wakeup exception, shutting down 
KafkaSolrRequestConsumer");
+                log.info("Caught wakeup exception, shutting down 
KafkaSolrRequestConsumer");
                 return false;
             } catch (Exception e) {
-                logger.error("Exception occurred in Kafka consumer thread, but 
we will continue.", e);
+                log.error("Exception occurred in Kafka consumer thread, but we 
will continue.", e);
             }
             return true;
         }
@@ -225,7 +235,9 @@ public class Consumer {
          * @param partitionRecords PartitionRecords for the specified partition
          */
         private void resetOffsetForPartition(TopicPartition partition, 
List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
-            logger.debug("Resetting offset to: {}", 
partitionRecords.get(0).offset());
+            if (log.isDebugEnabled()) {
+                log.debug("Resetting offset to: {}", 
partitionRecords.get(0).offset());
+            }
             long resetOffset = partitionRecords.get(0).offset();
             consumer.seek(partition, resetOffset);
         }
@@ -239,8 +251,10 @@ public class Consumer {
             long nextOffset = partitionRecords.get(partitionRecords.size() - 
1).offset() + 1;
             consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(nextOffset)));
 
-            logger.trace("Updated offset for topic={} partition={} to 
offset={}",
+            if (log.isTraceEnabled()) {
+                log.trace("Updated offset for topic={} partition={} to 
offset={}",
                     partition.topic(), partition.partition(), nextOffset);
+            }
         }
 
         /**
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index 650fc83..79ac32b 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.SolrResponseBase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.crossdc.ResubmitBackoffPolicy;
@@ -209,7 +210,8 @@ public class SolrMessageProcessor extends MessageProcessor 
implements IQueueHand
      * Strips fields that are problematic for replication.
      */
     private void sanitizeDocument(SolrInputDocument doc) {
-        logger.info("Removing {}", VERSION_FIELD + " : " + 
doc.getField(VERSION_FIELD).getValue());
+        SolrInputField field = doc.getField(VERSION_FIELD);
+        logger.info("Removing {} value={}", VERSION_FIELD, field == null ? 
"null" : field.getValue());
         doc.remove(VERSION_FIELD);
     }
 
diff --git 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
similarity index 77%
rename from 
crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
rename to 
crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index 695ebab..3177469 100644
--- 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
+++ 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -1,6 +1,7 @@
 package org.apache.solr.crossdc;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
@@ -8,25 +9,23 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import static org.mockito.Mockito.spy;
 
-public class IntegrationTest extends SolrCloudTestCase {
+public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
   static final String VERSION_FIELD = "_version_";
 
+  private static final int NUM_BROKERS = 1;
+
   protected static volatile MiniSolrCloudCluster cluster1;
-  protected static volatile MiniSolrCloudCluster cluster2;
+
   private static SolrMessageProcessor processor;
 
   private static ResubmitBackoffPolicy backoffPolicy = spy(new 
TestMessageProcessor.NoOpResubmitBackoffPolicy());
+  private static CloudSolrClient cloudClient1;
 
   @BeforeClass
   public static void setupIntegrationTest() throws Exception {
@@ -36,12 +35,22 @@ public class IntegrationTest extends SolrCloudTestCase {
             .addConfig("conf", 
getFile("src/resources/configs/cloud-minimal/conf").toPath())
             .configure();
 
-    processor = new SolrMessageProcessor(cluster1.getSolrClient(), 
backoffPolicy);
+    String collection = "collection1";
+    cloudClient1 = cluster1.getSolrClient();
+
+    processor = new SolrMessageProcessor(cloudClient1, backoffPolicy);
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+    cloudClient1.request(create);
+    cluster1.waitForActiveCollection(collection, 1, 1);
+
+    cloudClient1.setDefaultCollection(collection);
   }
 
   @AfterClass
   public static void tearDownIntegrationTest() throws Exception {
-    if (cluster != null) {
+    if (cluster1 != null) {
       cluster1.shutdown();
     }
   }
@@ -67,7 +76,7 @@ public class IntegrationTest extends SolrCloudTestCase {
     request.deleteById("2", 10L);
 
     request.setParam("shouldMirror", "true");
-    // The response is irrelevant, but it will fail because mocked server 
returns null when processing
+
     processor.handleItem(new MirroredSolrRequest(request));
 
     // After processing, check that all version fields are stripped
@@ -83,9 +92,4 @@ public class IntegrationTest extends SolrCloudTestCase {
       }
     }
   }
-
-  @Test
-  public void TestMethod() {
-
-  }
 }
diff --git 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
new file mode 100644
index 0000000..45f5392
--- /dev/null
+++ 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -0,0 +1,64 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+
+import static org.mockito.Mockito.spy;
+
+@ThreadLeakFilters(
+    defaultFilters = true,
+    filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, 
SolrKafkaTestsIgnoredThreadsFilter.class})
+public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+  protected static volatile MiniSolrCloudCluster cluster1;
+  protected static volatile MiniSolrCloudCluster cluster2;
+  private static SolrMessageProcessor processor;
+
+  private static ResubmitBackoffPolicy backoffPolicy = spy(new 
TestMessageProcessor.NoOpResubmitBackoffPolicy());
+
+  @BeforeClass
+  public static void setupIntegrationTest() throws Exception {
+
+    CLUSTER.start();
+
+    cluster1 =
+        new Builder(2, createTempDir())
+            .addConfig("conf", 
getFile("src/resources/configs/cloud-minimal/conf").toPath())
+            .configure();
+
+    processor = new SolrMessageProcessor(cluster1.getSolrClient(), 
backoffPolicy);
+  }
+
+  @AfterClass
+  public static void tearDownIntegrationTest() throws Exception {
+
+    CLUSTER.stop();
+
+    if (cluster1 != null) {
+      cluster1.shutdown();
+    }
+    if (cluster2 != null) {
+      cluster2.shutdown();
+    }
+  }
+
+  public void test() {
+
+  }
+}
diff --git 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
new file mode 100644
index 0000000..ca6bbb1
--- /dev/null
+++ 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc;
+
+import org.apache.lucene.search.TimeLimitingCollector.TimerThread;
+
+import com.carrotsearch.randomizedtesting.ThreadFilter;
+
+
+/**
+ * This ignores those threads in Solr for which there is no way to
+ * clean up after a suite.
+ */
+public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
+  @Override
+  public boolean reject(Thread t) {
+
+    String threadName = t.getName();
+
+    if (threadName.startsWith("metrics-meter-tick-thread")) {
+      return true;
+    }
+    
+
+    return false;
+  }
+}

Reply via email to