This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 9007aa6  Experiment with Producer in crossdc package. (#19)
9007aa6 is described below

commit 9007aa6b8d51384ab5f146ebdce013633bae4660
Author: Mark Robert Miller <[email protected]>
AuthorDate: Wed Jun 1 14:28:02 2022 -0500

    Experiment with Producer in crossdc package. (#19)
---
 crossdc-commons/build.gradle                              |  1 +
 .../org/apache/solr/update/processor/UpdateHelper.java    | 12 +++++++-----
 crossdc-consumer/build.gradle                             |  4 +++-
 .../update/processor/KafkaRequestMirroringHandler.java    |  3 +--
 .../update/processor/MirroringException.java              |  2 +-
 .../update/processor/MirroringUpdateProcessor.java        | 15 +++++++++------
 .../processor/MirroringUpdateRequestProcessorFactory.java |  5 ++++-
 .../update/processor/RequestMirroringHandler.java         |  2 +-
 .../resources/configs/cloud-minimal/conf/solrconfig.xml   |  2 +-
 crossdc-producer/src/test/resources/log4j2.xml            |  4 ++--
 10 files changed, 30 insertions(+), 20 deletions(-)

diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index 7ac0fd1..9e97f6c 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -29,6 +29,7 @@ repositories {
 
 dependencies {
     implementation 'org.apache.solr:solr-solrj:8.11.1'
+    implementation 'org.apache.solr:solr-core:8.11.1'
     implementation 'org.apache.kafka:kafka-clients:2.8.1'
     implementation 'com.google.guava:guava:14.0'
 }
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
 
b/crossdc-commons/src/main/java/org/apache/solr/update/processor/UpdateHelper.java
similarity index 70%
copy from 
crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
copy to 
crossdc-commons/src/main/java/org/apache/solr/update/processor/UpdateHelper.java
index 16ca862..4466dd3 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/update/processor/UpdateHelper.java
@@ -16,10 +16,12 @@
  */
 package org.apache.solr.update.processor;
 
-import org.apache.solr.client.solrj.request.UpdateRequest;
+public class UpdateHelper {
+  public static boolean isLeader(DistributedUpdateProcessor distProc) {
+    return distProc.isLeader();
+  }
 
-/** Plugin classes must implement this interface to be usable as the handlers 
for request mirroring */
-public interface RequestMirroringHandler {
-    /** When called, should handle submitting the request to the replica 
clusters  */
-    void mirror(UpdateRequest request) throws Exception;
+  public static UpdateRequestProcessor next(UpdateRequestProcessor proc) {
+    return proc.next;
+  }
 }
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 61de292..79dc7c8 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -33,7 +33,9 @@ application {
 
 dependencies {
     implementation group: 'org.apache.solr', name: 'solr-solrj', version: 
'8.11.1'
-    implementation project(':crossdc-commons')
+    implementation (project(':crossdc-commons')) {
+        exclude (group: 'org.apache.solr', module: 'solr-core')
+    }
 
     implementation 'org.slf4j:slf4j-api:1.7.36'
     implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
similarity index 95%
rename from 
crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
rename to 
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
index 34bc29b..4913fa3 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/KafkaRequestMirroringHandler.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.crossdc.common.MirroringException;
-import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringException.java
similarity index 96%
rename from 
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
rename to 
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringException.java
index 6f00df9..d5e7c68 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringException.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
 
 /**
  * Wrapper class for Mirroring exceptions.
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
similarity index 91%
rename from 
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
rename to 
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
index e2e09f2..06a3344 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
@@ -10,6 +10,9 @@ import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.UpdateHelper;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +60,7 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
     this.requestMirroringHandler = requestMirroringHandler;
 
     // Find the downstream distributed update processor
-    for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
+    for (UpdateRequestProcessor proc = next; proc != null; proc = 
UpdateHelper.next(proc)) {
       if (proc instanceof DistributedUpdateProcessor) {
         distProc = (DistributedUpdateProcessor) proc;
         break;
@@ -83,11 +86,11 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
 
   @Override public void processAdd(final AddUpdateCommand cmd) throws 
IOException {
     if (log.isDebugEnabled())
-      log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+      log.debug("processAdd isLeader={} cmd={}", 
UpdateHelper.isLeader(distProc), cmd);
     super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
 
     // submit only from the leader shards so we mirror each doc once
-    if (doMirroring && distProc.isLeader()) {
+    if (doMirroring && UpdateHelper.isLeader(distProc)) {
       SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
       doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc 
version
       createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
@@ -96,13 +99,13 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
 
   @Override public void processDelete(final DeleteUpdateCommand cmd) throws 
IOException {
     if (log.isDebugEnabled())
-      log.debug("processDelete doMirroring={} isLeader={} cmd={}", 
doMirroring, distProc.isLeader(), cmd);
+      log.debug("processDelete doMirroring={} isLeader={} cmd={}", 
doMirroring, UpdateHelper.isLeader(distProc), cmd);
     super.processDelete(cmd); // let this throw to prevent mirroring invalid 
requests
 
     if (doMirroring) {
       if (cmd.isDeleteById()) {
         // deleteById requests runs once per leader, so we just submit the 
request from the leader shard
-        if (distProc.isLeader()) {
+        if (UpdateHelper.isLeader(distProc)) {
           createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip 
versions from deletes
         }
       } else {
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateRequestProcessorFactory.java
similarity index 96%
rename from 
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
rename to 
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateRequestProcessorFactory.java
index 2d91531..43b836d 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
@@ -27,6 +27,9 @@ import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
+import 
org.apache.solr.update.processor.DocBasedVersionConstraintsProcessorFactory;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/RequestMirroringHandler.java
similarity index 95%
rename from 
crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
rename to 
crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/RequestMirroringHandler.java
index 16ca862..ff55b6d 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/crossdc/update/processor/RequestMirroringHandler.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.update.processor;
+package org.apache.solr.crossdc.update.processor;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
 
diff --git 
a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml 
b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
index 6e057ab..9090598 100644
--- 
a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
+++ 
b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -109,7 +109,7 @@
   </query>
 
   <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
-    <processor 
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+    <processor 
class="org.apache.solr.crossdc.update.processor.MirroringUpdateRequestProcessorFactory">
 
     </processor>
     <processor class="solr.LogUpdateProcessorFactory" />
diff --git a/crossdc-producer/src/test/resources/log4j2.xml 
b/crossdc-producer/src/test/resources/log4j2.xml
index d3705d4..5f7ec24 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -60,8 +60,8 @@
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
     <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" 
level="TRACE"/>
-    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" 
level="TRACE"/>
-    <Logger 
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" 
level="TRACE"/>
+    <Logger 
name="org.apache.solr.crossdc.update.processor.MirroringUpdateProcessor" 
level="TRACE"/>
+    <Logger 
name="org.apache.solr.crossdc.update.processor.KafkaRequestMirroringHandler" 
level="TRACE"/>
     <Logger 
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" 
level="TRACE"/>
     <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" 
level="TRACE"/>
 

Reply via email to