This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 4a469af Add back in the EmbeddedKafkaCluster. (#7)
4a469af is described below
commit 4a469afb0d4b9bf8c60b1c01ca15c680b845ac16
Author: Mark Robert Miller <[email protected]>
AuthorDate: Sat Apr 30 14:48:08 2022 -0500
Add back in the EmbeddedKafkaCluster. (#7)
---
crossdc-consumer/build.gradle | 10 ++++
.../apache/solr/crossdc/KafkaMirroringSink.java | 8 ++-
.../org/apache/solr/crossdc/consumer/Consumer.java | 58 ++++++++++++--------
.../messageprocessor/SolrMessageProcessor.java | 4 +-
...ionTest.java => SimpleSolrIntegrationTest.java} | 34 +++++++-----
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 64 ++++++++++++++++++++++
.../SolrKafkaTestsIgnoredThreadsFilter.java | 41 ++++++++++++++
7 files changed, 180 insertions(+), 39 deletions(-)
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 9366996..141b060 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -44,6 +44,16 @@ dependencies {
testImplementation group: 'org.apache.solr', name: 'solr-core', version:
'8.11.1'
testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: '8.11.1'
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
+ testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
+
+ testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
+ testImplementation ('org.apache.kafka:kafka-clients:2.8.1:test') {
+ //exclude group: 'org.apache.kafka' // or finer grained, if we like
+ }
+ testImplementation ('org.apache.kafka:kafka-streams:2.8.1:test') {
+ //exclude group: 'org.apache.kafka' // or finer grained, if we like
+ }
+ // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
}
subprojects {
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
index 1b6109a..c5beb49 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -23,10 +23,12 @@ import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-public class KafkaMirroringSink implements RequestMirroringSink {
+public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
private static final Logger logger =
LoggerFactory.getLogger(KafkaMirroringSink.class);
private long lastSuccessfulEnqueueNanos;
@@ -89,4 +91,8 @@ public class KafkaMirroringSink implements
RequestMirroringSink {
conf.getSlowSubmitThresholdInMillis(),
elapsedTimeMillis);
}
+
+ @Override public void close() throws IOException {
+ producer.close();
+ }
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 3bb62b3..f82eebb 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -35,6 +35,7 @@ import org.eclipse.jetty.server.ServerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collections;
@@ -59,12 +60,15 @@ public class Consumer {
public void start(String[] args) {
- // TODO: use args for config
+ boolean enableDataEncryption =
Boolean.getBoolean("enableDataEncryption");
+ String topicName = System.getProperty("topicName");
+ String zkConnectString = System.getProperty("zkConnectString");
+
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(8090);
server.setConnectors(new Connector[] {connector});
- crossDcConsumer = getCrossDcConsumer();
+ crossDcConsumer = getCrossDcConsumer(zkConnectString, topicName,
enableDataEncryption);
// Start consumer thread
consumerThreadExecutor = Executors.newSingleThreadExecutor();
@@ -75,9 +79,10 @@ public class Consumer {
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
- private CrossDcConsumer getCrossDcConsumer() {
- // nocommit - hardcoded conf
- KafkaCrossDcConf conf = new KafkaCrossDcConf("test-topic", true,
"localhost:2181");
+ private CrossDcConsumer getCrossDcConsumer(String zkConnectString, String
topicName,
+ boolean enableDataEncryption) {
+
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName,
enableDataEncryption, zkConnectString);
return new KafkaCrossDcConsumer(conf);
}
@@ -108,7 +113,7 @@ public class Consumer {
* resubmitting to the queue in case of temporary failures.
*/
public static class KafkaCrossDcConsumer extends CrossDcConsumer {
- private static final Logger logger =
LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
+ private static final Logger log =
LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
private final KafkaConsumer<String, MirroredSolrRequest> consumer;
private final KafkaMirroringSink kafkaMirroringSink;
@@ -121,13 +126,13 @@ public class Consumer {
*/
public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
final Properties kafkaConsumerProp = new Properties();
- logger.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProp);
+ log.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProp);
consumer = createConsumer(kafkaConsumerProp);
// Create producer for resubmitting failed requests
- logger.info("Creating Kafka resubmit producer");
+ log.info("Creating Kafka resubmit producer");
this.kafkaMirroringSink = new KafkaMirroringSink(conf);
- logger.info("Created Kafka resubmit producer");
+ log.info("Created Kafka resubmit producer");
}
@@ -144,18 +149,23 @@ public class Consumer {
*/
@Override
public void run() {
- logger.info("About to start Kafka consumer thread ");
+ log.info("About to start Kafka consumer thread...");
String topic="topic";
- logger.info("Kafka consumer subscribing to topic topic={}", topic);
+ log.info("Kafka consumer subscribing to topic topic={}", topic);
consumer.subscribe(Collections.singleton(topic));
while (pollAndProcessRequests()) {
//no-op within this loop: everything is done in
pollAndProcessRequests method defined above.
}
- logger.info("Closed kafka consumer. Exiting now.");
+ log.info("Closed kafka consumer. Exiting now.");
consumer.close();
+ try {
+ kafkaMirroringSink.close();
+ } catch (IOException e) {
+ log.error("Failed to close kafka mirroring sink", e);
+ }
}
@@ -170,7 +180,7 @@ public class Consumer {
List<ConsumerRecord<String, MirroredSolrRequest>>
partitionRecords = records.records(partition);
try {
for (ConsumerRecord<String, MirroredSolrRequest>
record : partitionRecords) {
- logger.trace("Fetched record from topic={}
partition={} key={} value={}",
+ log.trace("Fetched record from topic={}
partition={} key={} value={}",
record.topic(), record.partition(),
record.key(), record.value());
IQueueHandler.Result result =
messageProcessor.handleItem(record.value());
switch (result.status()) {
@@ -182,7 +192,7 @@ public class Consumer {
break;
case NOT_HANDLED_SHUTDOWN:
case FAILED_RETRY:
- logger.error("Unexpected response while
processing request. We never expect {}.",
+ log.error("Unexpected response while
processing request. We never expect {}.",
result.status().toString());
break;
default:
@@ -193,28 +203,28 @@ public class Consumer {
// handleItem sets the thread interrupt, let's exit if
there has been an interrupt set
if(Thread.currentThread().isInterrupted()) {
- logger.info("Kafka Consumer thread interrupted,
shutting down Kafka consumer.");
+ log.info("Kafka Consumer thread interrupted,
shutting down Kafka consumer.");
return false;
}
} catch (MirroringException e) {
// We don't really know what to do here, so it's wiser
to just break out.
- logger.error("Mirroring exception occured while
resubmitting to Kafka. We are going to stop the consumer thread now.", e);
+ log.error("Mirroring exception occured while
resubmitting to Kafka. We are going to stop the consumer thread now.", e);
return false;
} catch (WakeupException e) {
- logger.info("Caught wakeup exception, shutting down
KafkaSolrRequestConsumer.");
+ log.info("Caught wakeup exception, shutting down
KafkaSolrRequestConsumer.");
return false;
} catch (Exception e) {
// If there is any exception returned by handleItem,
then reset the offset.
- logger.warn("Exception occurred in Kafka consumer
thread, but we will continue.", e);
+ log.warn("Exception occurred in Kafka consumer thread,
but we will continue.", e);
resetOffsetForPartition(partition, partitionRecords);
break;
}
}
} catch (WakeupException e) {
- logger.info("Caught wakeup exception, shutting down
KafkaSolrRequestConsumer");
+ log.info("Caught wakeup exception, shutting down
KafkaSolrRequestConsumer");
return false;
} catch (Exception e) {
- logger.error("Exception occurred in Kafka consumer thread, but
we will continue.", e);
+ log.error("Exception occurred in Kafka consumer thread, but we
will continue.", e);
}
return true;
}
@@ -225,7 +235,9 @@ public class Consumer {
* @param partitionRecords PartitionRecords for the specified partition
*/
private void resetOffsetForPartition(TopicPartition partition,
List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
- logger.debug("Resetting offset to: {}",
partitionRecords.get(0).offset());
+ if (log.isDebugEnabled()) {
+ log.debug("Resetting offset to: {}",
partitionRecords.get(0).offset());
+ }
long resetOffset = partitionRecords.get(0).offset();
consumer.seek(partition, resetOffset);
}
@@ -239,8 +251,10 @@ public class Consumer {
long nextOffset = partitionRecords.get(partitionRecords.size() -
1).offset() + 1;
consumer.commitSync(Collections.singletonMap(partition, new
OffsetAndMetadata(nextOffset)));
- logger.trace("Updated offset for topic={} partition={} to
offset={}",
+ if (log.isTraceEnabled()) {
+ log.trace("Updated offset for topic={} partition={} to
offset={}",
partition.topic(), partition.partition(), nextOffset);
+ }
}
/**
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index 650fc83..79ac32b 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.SolrResponseBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.crossdc.ResubmitBackoffPolicy;
@@ -209,7 +210,8 @@ public class SolrMessageProcessor extends MessageProcessor
implements IQueueHand
* Strips fields that are problematic for replication.
*/
private void sanitizeDocument(SolrInputDocument doc) {
- logger.info("Removing {}", VERSION_FIELD + " : " +
doc.getField(VERSION_FIELD).getValue());
+ SolrInputField field = doc.getField(VERSION_FIELD);
+ logger.info("Removing {} value={}", VERSION_FIELD, field == null ?
"null" : field.getValue());
doc.remove(VERSION_FIELD);
}
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
similarity index 77%
rename from
crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
rename to
crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index 695ebab..3177469 100644
---
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -1,6 +1,7 @@
package org.apache.solr.crossdc;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
@@ -8,25 +9,23 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.spy;
-public class IntegrationTest extends SolrCloudTestCase {
+public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
static final String VERSION_FIELD = "_version_";
+ private static final int NUM_BROKERS = 1;
+
protected static volatile MiniSolrCloudCluster cluster1;
- protected static volatile MiniSolrCloudCluster cluster2;
+
private static SolrMessageProcessor processor;
private static ResubmitBackoffPolicy backoffPolicy = spy(new
TestMessageProcessor.NoOpResubmitBackoffPolicy());
+ private static CloudSolrClient cloudClient1;
@BeforeClass
public static void setupIntegrationTest() throws Exception {
@@ -36,12 +35,22 @@ public class IntegrationTest extends SolrCloudTestCase {
.addConfig("conf",
getFile("src/resources/configs/cloud-minimal/conf").toPath())
.configure();
- processor = new SolrMessageProcessor(cluster1.getSolrClient(),
backoffPolicy);
+ String collection = "collection1";
+ cloudClient1 = cluster1.getSolrClient();
+
+ processor = new SolrMessageProcessor(cloudClient1, backoffPolicy);
+
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+ cloudClient1.request(create);
+ cluster1.waitForActiveCollection(collection, 1, 1);
+
+ cloudClient1.setDefaultCollection(collection);
}
@AfterClass
public static void tearDownIntegrationTest() throws Exception {
- if (cluster != null) {
+ if (cluster1 != null) {
cluster1.shutdown();
}
}
@@ -67,7 +76,7 @@ public class IntegrationTest extends SolrCloudTestCase {
request.deleteById("2", 10L);
request.setParam("shouldMirror", "true");
- // The response is irrelevant, but it will fail because mocked server
returns null when processing
+
processor.handleItem(new MirroredSolrRequest(request));
// After processing, check that all version fields are stripped
@@ -83,9 +92,4 @@ public class IntegrationTest extends SolrCloudTestCase {
}
}
}
-
- @Test
- public void TestMethod() {
-
- }
}
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
new file mode 100644
index 0000000..45f5392
--- /dev/null
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -0,0 +1,64 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+
+import static org.mockito.Mockito.spy;
+
+@ThreadLeakFilters(
+ defaultFilters = true,
+ filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class,
SolrKafkaTestsIgnoredThreadsFilter.class})
+public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
+
+ protected static volatile MiniSolrCloudCluster cluster1;
+ protected static volatile MiniSolrCloudCluster cluster2;
+ private static SolrMessageProcessor processor;
+
+ private static ResubmitBackoffPolicy backoffPolicy = spy(new
TestMessageProcessor.NoOpResubmitBackoffPolicy());
+
+ @BeforeClass
+ public static void setupIntegrationTest() throws Exception {
+
+ CLUSTER.start();
+
+ cluster1 =
+ new Builder(2, createTempDir())
+ .addConfig("conf",
getFile("src/resources/configs/cloud-minimal/conf").toPath())
+ .configure();
+
+ processor = new SolrMessageProcessor(cluster1.getSolrClient(),
backoffPolicy);
+ }
+
+ @AfterClass
+ public static void tearDownIntegrationTest() throws Exception {
+
+ CLUSTER.stop();
+
+ if (cluster1 != null) {
+ cluster1.shutdown();
+ }
+ if (cluster2 != null) {
+ cluster2.shutdown();
+ }
+ }
+
+ public void test() {
+
+ }
+}
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
new file mode 100644
index 0000000..ca6bbb1
--- /dev/null
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc;
+
+import org.apache.lucene.search.TimeLimitingCollector.TimerThread;
+
+import com.carrotsearch.randomizedtesting.ThreadFilter;
+
+
+/**
+ * This ignores those threads in Solr for which there is no way to
+ * clean up after a suite.
+ */
+public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
+ @Override
+ public boolean reject(Thread t) {
+
+ String threadName = t.getName();
+
+ if (threadName.startsWith("metrics-meter-tick-thread")) {
+ return true;
+ }
+
+
+ return false;
+ }
+}