This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 9007aa6 Experiment with Producer in crossdc package. (#19)
9007aa6 is described below
commit 9007aa6b8d51384ab5f146ebdce013633bae4660
Author: Mark Robert Miller <[email protected]>
AuthorDate: Wed Jun 1 14:28:02 2022 -0500
Experiment with Producer in crossdc package. (#19)
---
crossdc-commons/build.gradle | 1 +
.../org/apache/solr/update/processor/UpdateHelper.java | 12 +++++++-----
crossdc-consumer/build.gradle | 4 +++-
.../update/processor/KafkaRequestMirroringHandler.java | 3 +--
.../update/processor/MirroringException.java | 2 +-
.../update/processor/MirroringUpdateProcessor.java | 15 +++++++++------
.../processor/MirroringUpdateRequestProcessorFactory.java | 5 ++++-
.../update/processor/RequestMirroringHandler.java | 2 +-
.../resources/configs/cloud-minimal/conf/solrconfig.xml | 2 +-
crossdc-producer/src/test/resources/log4j2.xml | 4 ++--
10 files changed, 30 insertions(+), 20 deletions(-)
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index 7ac0fd1..9e97f6c 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -29,6 +29,7 @@ repositories {
dependencies {
implementation 'org.apache.solr:solr-solrj:8.11.1'
+ implementation 'org.apache.solr:solr-core:8.11.1'
implementation 'org.apache.kafka:kafka-clients:2.8.1'
implementation 'com.google.guava:guava:14.0'
}
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
b/crossdc-commons/src/main/java/org/apache/solr/update/processor/UpdateHelper.java
similarity index 70%
copy from
crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
copy to
crossdc-commons/src/main/java/org/apache/solr/update/processor/UpdateHelper.java
index 16ca862..4466dd3 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/update/processor/UpdateHelper.java
@@ -16,10 +16,12 @@
*/
package org.apache.solr.update.processor;
-import org.apache.solr.client.solrj.request.UpdateRequest;
+public class UpdateHelper {
+ public static boolean isLeader(DistributedUpdateProcessor distProc) {
+ return distProc.isLeader();
+ }
-/** Plugin classes must implement this interface to be usable as the handlers
for request mirroring */
-public interface RequestMirroringHandler {
- /** When called, should handle submitting the request to the replica
clusters */
- void mirror(UpdateRequest request) throws Exception;
+ public static UpdateRequestProcessor next(UpdateRequestProcessor proc) {
+ return proc.next;
+ }
}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 61de292..79dc7c8 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -33,7 +33,9 @@ application {
dependencies {
implementation group: 'org.apache.solr', name: 'solr-solrj', version:
'8.11.1'
- implementation project(':crossdc-commons')
+ implementation (project(':crossdc-commons')) {
+ exclude (group: 'org.apache.solr', module: 'solr-core')
+ }
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
similarity index 95%
rename from
crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
rename to
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
index 34bc29b..4913fa3 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.crossdc.common.MirroringException;
-import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringException.java
similarity index 96%
rename from
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
rename to
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringException.java
index 6f00df9..d5e7c68 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringException.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
/**
* Wrapper class for Mirroring exceptions.
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
similarity index 91%
rename from
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
rename to
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
index e2e09f2..06a3344 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
@@ -10,6 +10,9 @@ import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.UpdateHelper;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +60,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
this.requestMirroringHandler = requestMirroringHandler;
// Find the downstream distributed update processor
- for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
+ for (UpdateRequestProcessor proc = next; proc != null; proc =
UpdateHelper.next(proc)) {
if (proc instanceof DistributedUpdateProcessor) {
distProc = (DistributedUpdateProcessor) proc;
break;
@@ -83,11 +86,11 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
@Override public void processAdd(final AddUpdateCommand cmd) throws
IOException {
if (log.isDebugEnabled())
- log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+ log.debug("processAdd isLeader={} cmd={}",
UpdateHelper.isLeader(distProc), cmd);
super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
// submit only from the leader shards so we mirror each doc once
- if (doMirroring && distProc.isLeader()) {
+ if (doMirroring && UpdateHelper.isLeader(distProc)) {
SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc
version
createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
@@ -96,13 +99,13 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
@Override public void processDelete(final DeleteUpdateCommand cmd) throws
IOException {
if (log.isDebugEnabled())
- log.debug("processDelete doMirroring={} isLeader={} cmd={}",
doMirroring, distProc.isLeader(), cmd);
+ log.debug("processDelete doMirroring={} isLeader={} cmd={}",
doMirroring, UpdateHelper.isLeader(distProc), cmd);
super.processDelete(cmd); // let this throw to prevent mirroring invalid
requests
if (doMirroring) {
if (cmd.isDeleteById()) {
// deleteById requests runs once per leader, so we just submit the
request from the leader shard
- if (distProc.isLeader()) {
+ if (UpdateHelper.isLeader(distProc)) {
createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip
versions from deletes
}
} else {
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateRequestProcessorFactory.java
similarity index 96%
rename from
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
rename to
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateRequestProcessorFactory.java
index 2d91531..43b836d 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
@@ -27,6 +27,9 @@ import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
+import
org.apache.solr.update.processor.DocBasedVersionConstraintsProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/RequestMirroringHandler.java
similarity index 95%
rename from
crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
rename to
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/RequestMirroringHandler.java
index 16ca862..ff55b6d 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/RequestMirroringHandler.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
import org.apache.solr.client.solrj.request.UpdateRequest;
diff --git
a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
index 6e057ab..9090598 100644
---
a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
+++
b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -109,7 +109,7 @@
</query>
<updateRequestProcessorChain name="mirrorUpdateChain" default="true">
- <processor
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+ <processor
class="org.apache.solr.crossdc.update.processor.MirroringUpdateRequestProcessorFactory">
</processor>
<processor class="solr.LogUpdateProcessorFactory" />
diff --git a/crossdc-producer/src/test/resources/log4j2.xml
b/crossdc-producer/src/test/resources/log4j2.xml
index d3705d4..5f7ec24 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -60,8 +60,8 @@
<Logger name="org.eclipse.jetty" level="INFO"/>
<Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer"
level="TRACE"/>
- <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor"
level="TRACE"/>
- <Logger
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler"
level="TRACE"/>
+ <Logger
name="org.apache.solr.crossdc.update.processor.MirroringUpdateProcessor"
level="TRACE"/>
+ <Logger
name="org.apache.solr.crossdc.update.processor.KafkaRequestMirroringHandler"
level="TRACE"/>
<Logger
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor"
level="TRACE"/>
<Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink"
level="TRACE"/>