This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 4c60874 Work around access limitations (what you can do here seems to
vary by Java version) (#24)
4c60874 is described below
commit 4c60874ae09afbe9a5930cff0ea65d09869d1d2d
Author: Mark Robert Miller <[email protected]>
AuthorDate: Thu Jun 16 11:52:47 2022 -0500
Work around access limitations (what you can do here seems to vary by Java
version) (#24)
---
.../update/processor/MirroringUpdateProcessor.java | 70 ++++++++++++++++------
1 file changed, 52 insertions(+), 18 deletions(-)
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index e2e09f2..cdec627 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -1,11 +1,15 @@
package org.apache.solr.update.processor;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@@ -39,7 +43,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
/**
* The distributed processor downstream from us so we can establish if we're
running on a leader shard
*/
- private DistributedUpdateProcessor distProc;
+ //private DistributedUpdateProcessor distProc;
/**
* Distribution phase of the incoming requests
@@ -57,17 +61,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
this.requestMirroringHandler = requestMirroringHandler;
// Find the downstream distributed update processor
- for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
- if (proc instanceof DistributedUpdateProcessor) {
- distProc = (DistributedUpdateProcessor) proc;
- break;
- }
- }
- if (distProc == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "DistributedUpdateProcessor must follow "
- + MirroringUpdateProcessor.class.getSimpleName());
- }
+
}
private UpdateRequest createAndOrGetMirrorRequest() {
@@ -82,29 +76,36 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
}
@Override public void processAdd(final AddUpdateCommand cmd) throws
IOException {
- if (log.isDebugEnabled())
- log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+
super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
// submit only from the leader shards so we mirror each doc once
- if (doMirroring && distProc.isLeader()) {
+ boolean isLeader = isLeader(cmd.getReq(), cmd.getIndexedIdStr(), null,
cmd.getSolrInputDocument());
+ if (doMirroring && isLeader) {
SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc
version
createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
}
+
+ if (log.isDebugEnabled())
+ log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
}
@Override public void processDelete(final DeleteUpdateCommand cmd) throws
IOException {
- if (log.isDebugEnabled())
- log.debug("processDelete doMirroring={} isLeader={} cmd={}",
doMirroring, distProc.isLeader(), cmd);
super.processDelete(cmd); // let this throw to prevent mirroring invalid
requests
if (doMirroring) {
+ boolean isLeader = false;
if (cmd.isDeleteById()) {
+ DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
// deleteById requests runs once per leader, so we just submit the
request from the leader shard
- if (distProc.isLeader()) {
+ isLeader = isLeader(cmd.getReq(), dcmd.getId(), null !=
cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
+ ShardParams._ROUTE_), null);
+ if (isLeader) {
createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip
versions from deletes
}
+ if (log.isDebugEnabled())
+ log.debug("processDelete doMirroring={} isLeader={} cmd={}",
doMirroring, isLeader, cmd);
} else {
// DBQs are sent to each shard leader, so we mirror from the original
node to only mirror once
// In general there's no way to guarantee that these run identically
on the mirror since there are no
@@ -113,8 +114,41 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
}
+ if (log.isDebugEnabled())
+ log.debug("processDelete doMirroring={} cmd={}", doMirroring, cmd);
}
+
+ }
+ }
+
+ private boolean isLeader(SolrQueryRequest req, String id, String route,
SolrInputDocument doc) {
+ CloudDescriptor cloudDesc =
+ req.getCore().getCoreDescriptor().getCloudDescriptor();
+ String collection = cloudDesc.getCollectionName();
+ ClusterState clusterState =
+ req.getCore().getCoreContainer().getZkController().getClusterState();
+ DocCollection coll = clusterState.getCollection(collection);
+ Slice slice = coll.getRouter().getTargetSlice(id, doc, route,
req.getParams(), coll);
+
+ if (slice == null) {
+ // No slice found. Most strict routers will have already thrown an
exception, so a null return is
+ // a signal to use the slice of this core.
+ // TODO: what if this core is not in the targeted collection?
+ String shardId = cloudDesc.getShardId();
+ slice = coll.getSlice(shardId);
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard
" + shardId + " in " + coll);
+ }
+ }
+ String shardId = slice.getName();
+ Replica leaderReplica = null;
+ try {
+ leaderReplica =
req.getCore().getCoreContainer().getZkController().getZkStateReader().getLeaderRetry(collection,
shardId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
}
+ return leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
}
@Override public void processRollback(final RollbackUpdateCommand cmd)
throws IOException {