This is an automated email from the ASF dual-hosted git repository. noble pushed a commit to branch jira/solr15996_1 in repository https://gitbox.apache.org/repos/asf/solr.git
commit 1fa699d139a7459c1e43b0eee7a2c52beafefca1 Author: Noble Paul <[email protected]> AuthorDate: Tue Feb 22 15:17:13 2022 +1100 refactored to avoid race conditions --- .../src/java/org/apache/solr/cloud/Overseer.java | 15 ++- .../solr/cloud/overseer/CollectionMutator.java | 117 ++++++++++++++++----- .../solr/common/cloud/PerReplicaStatesOps.java | 41 ++++---- 3 files changed, 124 insertions(+), 49 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 1cf6ab8..8fca355 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -383,6 +383,9 @@ public class Overseer implements SolrCloseable { zkWriteCommands = processMessage(clusterState, message, operation, zkStateWriter); if (zkWriteCommands instanceof CommandWithClusterState) { clusterState = ((CommandWithClusterState) zkWriteCommands).clusterState; + if(zkWriteCommands.isEmpty()) { + zkWriteCommands = null; + } } stats.success(operation); @@ -454,7 +457,9 @@ public class Overseer implements SolrCloseable { CommandWithClusterState(ZkWriteCommand cmd, ClusterState state) { super(1); - super.add(cmd); + if(cmd != null) { + super.add(cmd); + } this.clusterState = state; } } @@ -491,7 +496,13 @@ public class Overseer implements SolrCloseable { } catch (Exception e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to write updates"); } - return new CommandWithClusterState(new CollectionMutator(getSolrCloudManager()).modifyCollection(clusterState,message), clusterState); + CollectionMutator collectionMutator = new CollectionMutator(getSolrCloudManager()); + if(collectionMutator.isModifyPrs(message)) { + return new CommandWithClusterState(null, collectionMutator.doModifyPrs(message, clusterState)); + } else { + return new CommandWithClusterState(collectionMutator.modifyCollection(clusterState, message), clusterState); + } + default: throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties()); diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java index 9c84e8b..2cdd3bc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java @@ -20,15 +20,22 @@ import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.*; +import org.apache.solr.common.util.Utils; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Collections.singletonMap; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.CONFIGNAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; @@ -41,6 +48,7 @@ public class CollectionMutator { protected final SolrCloudManager cloudManager; protected final DistribStateManager stateManager; protected final SolrZkClient zkClient; + ClusterState modifiedClusterState; public CollectionMutator(SolrCloudManager cloudManager) { this.cloudManager = cloudManager; @@ -55,8 +63,7 @@ public class CollectionMutator { DocCollection collection = clusterState.getCollection(collectionName); Slice slice = collection.getSlice(shardId); if (slice == null) { - @SuppressWarnings({"unchecked"}) - Map<String, Replica> replicas = Collections.EMPTY_MAP; + Map<String, Replica> replicas = Collections.emptyMap(); Map<String, Object> sliceProps = new HashMap<>(); String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP); String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP); @@ -101,26 +108,9 @@ public class CollectionMutator { public ZkWriteCommand modifyCollection(final ClusterState clusterState, ZkNodeProps message) { if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP; DocCollection coll = clusterState.getCollection(message.getStr(COLLECTION_PROP)); - Map<String, Object> props = coll.shallowCopy(); boolean hasAnyOps = false; - PerReplicaStatesOps replicaOps = null; + Map<String, Object> props = coll.shallowCopy(); for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) { - if (prop.equals(DocCollection.PER_REPLICA_STATE)) { - String val = message.getStr(DocCollection.PER_REPLICA_STATE); - if (val == null) continue; - boolean enable = Boolean.parseBoolean(val); - if (enable == coll.isPerReplicaState()) { - //already enabled - log.error("trying to set perReplicaState to {} from {}", val, coll.isPerReplicaState()); - continue; - } - PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, null); - replicaOps = PerReplicaStatesOps.modifyCollection(coll, enable, prs); - if(!enable) { - coll = updateReplicasFromPrs(coll, prs); - } - } - if (message.containsKey(prop)) { hasAnyOps = true; if (message.get(prop) == null) { @@ -157,11 +147,85 @@ public class CollectionMutator { assert !props.containsKey(COLL_CONF); DocCollection collection = new DocCollection(coll.getName(), coll.getSlicesMap(), props, coll.getRouter(), coll.getZNodeVersion()); - if (replicaOps == null){ - return new ZkWriteCommand(coll.getName(), collection); - } else { - return new ZkWriteCommand(coll.getName(), collection, replicaOps, true); + return new ZkWriteCommand(coll.getName(), collection); + + } + + /** + * Check whether a PRS modify is requested + */ + public boolean isModifyPrs(ZkNodeProps message) { + boolean hasPrs = false; + boolean hasOther = false; + for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) { + String val = message.getStr(prop); + if (val != null) { + if (DocCollection.PER_REPLICA_STATE.equals(prop)) { + hasPrs = true; + } else { + hasOther = true; + } + } } + if (hasOther && hasPrs) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Cannot modify collection with perReplicaState and other properties in same command"); + } + return hasPrs; + + } + + public ClusterState doModifyPrs(ZkNodeProps message, ClusterState clusterState) { + String prsVal = message.getStr(DocCollection.PER_REPLICA_STATE); + boolean enable = Boolean.parseBoolean(prsVal); + final DocCollection coll = clusterState.getCollection(message.getStr(COLLECTION_PROP)); + if(coll.isPerReplicaState() == enable) { + //idempotent call + return clusterState; + } + Map<String, Object> props = coll.shallowCopy(); + props.put(DocCollection.PER_REPLICA_STATE, enable); + String name = coll.getName(); + final AtomicReference<DocCollection> result = new AtomicReference<>(new DocCollection(name, coll.getSlicesMap(), props, coll.getRouter(), coll.getZNodeVersion())); + PerReplicaStates prsStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, null); + PerReplicaStatesOps prsOps = enable? + enablePrsOps(result, prsStates) : + disablePrsOps(result); + + try { + prsOps.init(prsStates); + prsOps.persist(coll.getZNode(), zkClient); + Stat stat = zkClient.exists(coll.getZNode(), null, true); + result.set(new DocCollection(name, result.get().getSlicesMap(), result.get().getProperties(), result.get().getRouter(), stat.getVersion())); + clusterState.copyWith(name, result.get()); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to MODIFY perResplicaStates on Collection : "+ name); + } + + return null; + } + + private static PerReplicaStatesOps disablePrsOps( AtomicReference<DocCollection> coll) { + return new PerReplicaStatesOps(PerReplicaStatesOps::disablePrs) { + @Override + protected List<Op> getZkOps(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) { + coll.set(updateReplicasFromPrs(coll.get(),PerReplicaStates.fetch(coll.get().getZNode(), zkClient, null))); + List<Op> zkOps = super.getZkOps(operations, znode, zkClient); + zkOps.add(Op.setData(znode, Utils.toJSON(singletonMap(coll.get().getName(), coll.get())), coll.get().getZNodeVersion())); + return zkOps; + } + }; + } + + private static PerReplicaStatesOps enablePrsOps(AtomicReference<DocCollection> coll, PerReplicaStates prsStates) { + return new PerReplicaStatesOps(prs -> PerReplicaStatesOps.enablePrs(coll.get(), prsStates)) { + @Override + protected List<Op> getZkOps(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) { + List<Op> ops = super.getZkOps(operations, znode, zkClient); + ops.add(Op.setData(znode, Utils.toJSON(singletonMap(coll.get().getName(), coll.get())), coll.get().getZNodeVersion())); + return ops; + } + }; } /** @@ -196,6 +260,7 @@ public class CollectionMutator { return coll; } + public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) { Map<String, Slice> slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy slices.put(slice.getName(), slice); @@ -214,4 +279,8 @@ public class CollectionMutator { } return true; } + + static class NoOpCommand extends RuntimeException { + + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java index eabec492..2c85318 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java @@ -44,7 +44,7 @@ public class PerReplicaStatesOps { List<PerReplicaStates.Operation> ops; final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun; - PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) { + public PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) { this.fun = fun; } @@ -57,6 +57,18 @@ public class PerReplicaStatesOps { log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations); } + List<Op> ops = getZkOps(operations, znode, zkClient); + try { + zkClient.multi(ops, true); + } catch (KeeperException e) { + log.error("Multi-op exception: {}", zkClient.getChildren(znode, null, true)); + throw e; + } + + } + + + protected List<Op> getZkOps(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) { List<Op> ops = new ArrayList<>(operations.size()); for (PerReplicaStates.Operation op : operations) { // the state of the replica is being updated @@ -65,13 +77,7 @@ public class PerReplicaStatesOps { Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) : Op.delete(path, -1)); } - try { - zkClient.multi(ops, true); - } catch (KeeperException e) { - log.error("Multi-op exception: {}", zkClient.getChildren(znode, null, true)); - throw e; - } - + return ops; } /** @@ -130,19 +136,8 @@ public class PerReplicaStatesOps { }).init(rs); } - /** - * Switch a collection from/to perReplicaState=true - */ - public static PerReplicaStatesOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates rs) { - return new PerReplicaStatesOps(prs -> enable ? - enable(coll,prs) : - disable(prs)) - .init(rs); - - } - - private static List<PerReplicaStates.Operation> enable(DocCollection coll, PerReplicaStates prs) { - log.info("ENABLING_PRS "); + public static List<PerReplicaStates.Operation> enablePrs(DocCollection coll, PerReplicaStates prs) { + log.info("Enabling PRS for {} ", coll.getName()); List<PerReplicaStates.Operation> result = new ArrayList<>(); coll.forEachReplica((s, r) -> { PerReplicaStates.State st = prs.get(r.getName()); @@ -158,7 +153,7 @@ public class PerReplicaStatesOps { return result; } - private static List<PerReplicaStates.Operation> disable(PerReplicaStates prs) { + public static List<PerReplicaStates.Operation> disablePrs(PerReplicaStates prs) { List<PerReplicaStates.Operation> result = new ArrayList<>(); prs.states.forEachEntry((s, state) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, state))); return result; @@ -272,7 +267,7 @@ public class PerReplicaStatesOps { return result; } - PerReplicaStatesOps init(PerReplicaStates rs) { + public PerReplicaStatesOps init(PerReplicaStates rs) { if (rs == null) return null; get(rs); return this;
