This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 4c60874  Work around access limitations (what you can do here seems to 
vary by Java version) (#24)
4c60874 is described below

commit 4c60874ae09afbe9a5930cff0ea65d09869d1d2d
Author: Mark Robert Miller <[email protected]>
AuthorDate: Thu Jun 16 11:52:47 2022 -0500

    Work around access limitations (what you can do here seems to vary by Java 
version) (#24)
---
 .../update/processor/MirroringUpdateProcessor.java | 70 ++++++++++++++++------
 1 file changed, 52 insertions(+), 18 deletions(-)

diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index e2e09f2..cdec627 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -1,11 +1,15 @@
 package org.apache.solr.update.processor;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
@@ -39,7 +43,7 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
   /**
    * The distributed processor downstream from us so we can establish if we're 
running on a leader shard
    */
-  private DistributedUpdateProcessor distProc;
+  //private DistributedUpdateProcessor distProc;
 
   /**
    * Distribution phase of the incoming requests
@@ -57,17 +61,7 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
     this.requestMirroringHandler = requestMirroringHandler;
 
     // Find the downstream distributed update processor
-    for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
-      if (proc instanceof DistributedUpdateProcessor) {
-        distProc = (DistributedUpdateProcessor) proc;
-        break;
-      }
-    }
-    if (distProc == null) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "DistributedUpdateProcessor must follow "
-              + MirroringUpdateProcessor.class.getSimpleName());
-    }
+
   }
 
   private UpdateRequest createAndOrGetMirrorRequest() {
@@ -82,29 +76,36 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
   }
 
   @Override public void processAdd(final AddUpdateCommand cmd) throws 
IOException {
-    if (log.isDebugEnabled())
-      log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+
     super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
 
     // submit only from the leader shards so we mirror each doc once
-    if (doMirroring && distProc.isLeader()) {
+    boolean isLeader = isLeader(cmd.getReq(),  cmd.getIndexedIdStr(), null, 
cmd.getSolrInputDocument());
+    if (doMirroring && isLeader) {
       SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
       doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc 
version
       createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
     }
+
+    if (log.isDebugEnabled())
+      log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
   }
 
   @Override public void processDelete(final DeleteUpdateCommand cmd) throws 
IOException {
-    if (log.isDebugEnabled())
-      log.debug("processDelete doMirroring={} isLeader={} cmd={}", 
doMirroring, distProc.isLeader(), cmd);
     super.processDelete(cmd); // let this throw to prevent mirroring invalid 
requests
 
     if (doMirroring) {
+      boolean isLeader = false;
       if (cmd.isDeleteById()) {
+        DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
         // deleteById requests runs once per leader, so we just submit the 
request from the leader shard
-        if (distProc.isLeader()) {
+        isLeader = isLeader(cmd.getReq(),  dcmd.getId(), null != 
cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
+            ShardParams._ROUTE_), null);
+        if (isLeader) {
           createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip 
versions from deletes
         }
+        if (log.isDebugEnabled())
+          log.debug("processDelete doMirroring={} isLeader={} cmd={}", 
doMirroring, isLeader, cmd);
       } else {
         // DBQs are sent to each shard leader, so we mirror from the original 
node to only mirror once
         // In general there's no way to guarantee that these run identically 
on the mirror since there are no
@@ -113,8 +114,41 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
         if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
           createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
         }
+        if (log.isDebugEnabled())
+          log.debug("processDelete doMirroring={} cmd={}", doMirroring, cmd);
       }
+
+    }
+  }
+
+  private boolean isLeader(SolrQueryRequest req, String id, String route, 
SolrInputDocument doc) {
+    CloudDescriptor cloudDesc =
+        req.getCore().getCoreDescriptor().getCloudDescriptor();
+    String collection = cloudDesc.getCollectionName();
+    ClusterState clusterState =
+        req.getCore().getCoreContainer().getZkController().getClusterState();
+    DocCollection coll = clusterState.getCollection(collection);
+    Slice slice = coll.getRouter().getTargetSlice(id, doc, route, 
req.getParams(), coll);
+
+    if (slice == null) {
+      // No slice found.  Most strict routers will have already thrown an 
exception, so a null return is
+      // a signal to use the slice of this core.
+      // TODO: what if this core is not in the targeted collection?
+      String shardId = cloudDesc.getShardId();
+      slice = coll.getSlice(shardId);
+      if (slice == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard 
" + shardId + " in " + coll);
+      }
+    }
+    String shardId = slice.getName();
+    Replica leaderReplica = null;
+    try {
+      leaderReplica = 
req.getCore().getCoreContainer().getZkController().getZkStateReader().getLeaderRetry(collection,
 shardId);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", 
e);
     }
+    return leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
   }
 
   @Override public void processRollback(final RollbackUpdateCommand cmd) 
throws IOException {

Reply via email to