This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr15996_1
in repository https://gitbox.apache.org/repos/asf/solr.git

commit 1fa699d139a7459c1e43b0eee7a2c52beafefca1
Author: Noble Paul <[email protected]>
AuthorDate: Tue Feb 22 15:17:13 2022 +1100

    refactored to avoid race conditions
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  15 ++-
 .../solr/cloud/overseer/CollectionMutator.java     | 117 ++++++++++++++++-----
 .../solr/common/cloud/PerReplicaStatesOps.java     |  41 ++++----
 3 files changed, 124 insertions(+), 49 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java 
b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 1cf6ab8..8fca355 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -383,6 +383,9 @@ public class Overseer implements SolrCloseable {
         zkWriteCommands = processMessage(clusterState, message, operation, 
zkStateWriter);
         if (zkWriteCommands instanceof CommandWithClusterState) {
           clusterState = ((CommandWithClusterState) 
zkWriteCommands).clusterState;
+          if(zkWriteCommands.isEmpty()) {
+            zkWriteCommands = null;
+          }
         }
 
         stats.success(operation);
@@ -454,7 +457,9 @@ public class Overseer implements SolrCloseable {
 
       CommandWithClusterState(ZkWriteCommand cmd, ClusterState state) {
         super(1);
-        super.add(cmd);
+        if(cmd != null) {
+          super.add(cmd);
+        }
         this.clusterState = state;
       }
     }
@@ -491,7 +496,13 @@ public class Overseer implements SolrCloseable {
             }  catch (Exception e) {
               throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Unable to write updates");
             }
-            return new CommandWithClusterState(new 
CollectionMutator(getSolrCloudManager()).modifyCollection(clusterState,message),
 clusterState);
+            CollectionMutator collectionMutator = new 
CollectionMutator(getSolrCloudManager());
+            if(collectionMutator.isModifyPrs(message)) {
+               return new CommandWithClusterState(null, 
collectionMutator.doModifyPrs(message, clusterState));
+            } else {
+              return new 
CommandWithClusterState(collectionMutator.modifyCollection(clusterState, 
message), clusterState);
+            }
+
           default:
             throw new RuntimeException("unknown operation:" + operation
                 + " contents:" + message.getProperties());
diff --git 
a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java 
b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 9c84e8b..2cdd3bc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -20,15 +20,22 @@ import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Collections.singletonMap;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CONFIGNAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
@@ -41,6 +48,7 @@ public class CollectionMutator {
   protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
   protected final SolrZkClient zkClient;
+  ClusterState modifiedClusterState;
 
   public CollectionMutator(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
@@ -55,8 +63,7 @@ public class CollectionMutator {
     DocCollection collection = clusterState.getCollection(collectionName);
     Slice slice = collection.getSlice(shardId);
     if (slice == null) {
-      @SuppressWarnings({"unchecked"})
-      Map<String, Replica> replicas = Collections.EMPTY_MAP;
+      Map<String, Replica> replicas = Collections.emptyMap();
       Map<String, Object> sliceProps = new HashMap<>();
       String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
       String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
@@ -101,26 +108,9 @@ public class CollectionMutator {
   public ZkWriteCommand modifyCollection(final ClusterState clusterState, 
ZkNodeProps message) {
     if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
     DocCollection coll = 
clusterState.getCollection(message.getStr(COLLECTION_PROP));
-    Map<String, Object> props = coll.shallowCopy();
     boolean hasAnyOps = false;
-    PerReplicaStatesOps replicaOps = null;
+    Map<String, Object> props = coll.shallowCopy();
     for (String prop : 
CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
-      if (prop.equals(DocCollection.PER_REPLICA_STATE)) {
-        String val = message.getStr(DocCollection.PER_REPLICA_STATE);
-        if (val == null) continue;
-        boolean enable = Boolean.parseBoolean(val);
-        if (enable == coll.isPerReplicaState()) {
-          //already enabled
-          log.error("trying to set perReplicaState to {} from {}", val, 
coll.isPerReplicaState());
-          continue;
-        }
-        PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), 
zkClient, null);
-        replicaOps = PerReplicaStatesOps.modifyCollection(coll, enable, prs);
-        if(!enable) {
-          coll = updateReplicasFromPrs(coll, prs);
-        }
-      }
-
       if (message.containsKey(prop)) {
         hasAnyOps = true;
         if (message.get(prop) == null)  {
@@ -157,11 +147,85 @@ public class CollectionMutator {
     assert !props.containsKey(COLL_CONF);
 
     DocCollection collection = new DocCollection(coll.getName(), 
coll.getSlicesMap(), props, coll.getRouter(), coll.getZNodeVersion());
-    if (replicaOps == null){
-      return new ZkWriteCommand(coll.getName(), collection);
-    } else {
-      return new ZkWriteCommand(coll.getName(), collection, replicaOps, true);
+    return new ZkWriteCommand(coll.getName(), collection);
+
+  }
+
+  /**
+   * Check whether a PRS modify is requested
+   */
+  public boolean isModifyPrs(ZkNodeProps message) {
+    boolean hasPrs = false;
+    boolean hasOther = false;
+    for (String prop : 
CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
+      String val = message.getStr(prop);
+      if (val != null) {
+        if (DocCollection.PER_REPLICA_STATE.equals(prop)) {
+          hasPrs = true;
+        } else {
+          hasOther = true;
+        }
+      }
     }
+    if (hasOther && hasPrs) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Cannot modify collection with perReplicaState and other 
properties in same command");
+    }
+    return hasPrs;
+
+  }
+
+  public ClusterState doModifyPrs(ZkNodeProps message, ClusterState 
clusterState) {
+    String prsVal = message.getStr(DocCollection.PER_REPLICA_STATE);
+    boolean enable  = Boolean.parseBoolean(prsVal);
+    final DocCollection coll = 
clusterState.getCollection(message.getStr(COLLECTION_PROP));
+    if(coll.isPerReplicaState() == enable) {
+      //idempotent call
+      return clusterState;
+    }
+    Map<String, Object> props = coll.shallowCopy();
+    props.put(DocCollection.PER_REPLICA_STATE, enable);
+    String name = coll.getName();
+    final AtomicReference<DocCollection> result = new AtomicReference<>(new 
DocCollection(name, coll.getSlicesMap(), props, coll.getRouter(), 
coll.getZNodeVersion()));
+    PerReplicaStates prsStates = PerReplicaStates.fetch(coll.getZNode(), 
zkClient, null);
+    PerReplicaStatesOps prsOps = enable?
+            enablePrsOps(result, prsStates) :
+            disablePrsOps(result);
+
+    try {
+      prsOps.init(prsStates);
+      prsOps.persist(coll.getZNode(), zkClient);
+      Stat stat = zkClient.exists(coll.getZNode(), null, true);
+      result.set(new DocCollection(name, result.get().getSlicesMap(), 
result.get().getProperties(), result.get().getRouter(), stat.getVersion()));
+      clusterState.copyWith(name, result.get());
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to 
MODIFY perResplicaStates on Collection : "+ name);
+    }
+
+    return null;
+  }
+
+  private static PerReplicaStatesOps disablePrsOps( 
AtomicReference<DocCollection> coll) {
+    return new PerReplicaStatesOps(PerReplicaStatesOps::disablePrs) {
+      @Override
+      protected List<Op> getZkOps(List<PerReplicaStates.Operation> operations, 
String znode, SolrZkClient zkClient) {
+        
coll.set(updateReplicasFromPrs(coll.get(),PerReplicaStates.fetch(coll.get().getZNode(),
 zkClient, null)));
+        List<Op> zkOps = super.getZkOps(operations, znode, zkClient);
+        zkOps.add(Op.setData(znode, 
Utils.toJSON(singletonMap(coll.get().getName(), coll.get())), 
coll.get().getZNodeVersion()));
+        return zkOps;
+      }
+    };
+  }
+
+  private static PerReplicaStatesOps 
enablePrsOps(AtomicReference<DocCollection> coll, PerReplicaStates prsStates) {
+    return new PerReplicaStatesOps(prs -> 
PerReplicaStatesOps.enablePrs(coll.get(), prsStates)) {
+      @Override
+      protected List<Op> getZkOps(List<PerReplicaStates.Operation> operations, 
String znode, SolrZkClient zkClient) {
+        List<Op> ops = super.getZkOps(operations, znode, zkClient);
+        ops.add(Op.setData(znode, 
Utils.toJSON(singletonMap(coll.get().getName(), coll.get())), 
coll.get().getZNodeVersion()));
+        return ops;
+      }
+    };
   }
 
   /**
@@ -196,6 +260,7 @@ public class CollectionMutator {
     return coll;
   }
 
+
   public static DocCollection updateSlice(String collectionName, DocCollection 
collection, Slice slice) {
     Map<String, Slice> slices = new 
LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
     slices.put(slice.getName(), slice);
@@ -214,4 +279,8 @@ public class CollectionMutator {
     }
     return true;
   }
+
+  static class NoOpCommand extends RuntimeException {
+
+  }
 }
diff --git 
a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java 
b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index eabec492..2c85318 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -44,7 +44,7 @@ public class PerReplicaStatesOps {
     List<PerReplicaStates.Operation> ops;
     final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
 
-    PerReplicaStatesOps(Function<PerReplicaStates, 
List<PerReplicaStates.Operation>> fun) {
+    public PerReplicaStatesOps(Function<PerReplicaStates, 
List<PerReplicaStates.Operation>> fun) {
         this.fun = fun;
     }
 
@@ -57,6 +57,18 @@ public class PerReplicaStatesOps {
             log.debug("Per-replica state being persisted for : '{}', ops: {}", 
znode, operations);
         }
 
+        List<Op> ops = getZkOps(operations, znode, zkClient);
+        try {
+            zkClient.multi(ops, true);
+        } catch (KeeperException e) {
+          log.error("Multi-op exception: {}", zkClient.getChildren(znode, 
null, true));
+          throw e;
+        }
+
+    }
+
+
+    protected List<Op> getZkOps(List<PerReplicaStates.Operation> operations, 
String znode, SolrZkClient zkClient) {
         List<Op> ops = new ArrayList<>(operations.size());
         for (PerReplicaStates.Operation op : operations) {
             // the state of the replica is being updated
@@ -65,13 +77,7 @@ public class PerReplicaStatesOps {
                     Op.create(path, null, 
zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
                     Op.delete(path, -1));
         }
-        try {
-            zkClient.multi(ops, true);
-        } catch (KeeperException e) {
-          log.error("Multi-op exception: {}", zkClient.getChildren(znode, 
null, true));
-          throw e;
-        }
-
+        return ops;
     }
 
     /**
@@ -130,19 +136,8 @@ public class PerReplicaStatesOps {
         }).init(rs);
     }
 
-    /**
-     * Switch a collection from/to perReplicaState=true
-     */
-    public static PerReplicaStatesOps modifyCollection(DocCollection coll, 
boolean enable, PerReplicaStates rs) {
-        return new PerReplicaStatesOps(prs -> enable ?
-                enable(coll,prs) :
-                disable(prs))
-                .init(rs);
-
-    }
-
-    private static List<PerReplicaStates.Operation> enable(DocCollection coll, 
PerReplicaStates prs) {
-        log.info("ENABLING_PRS ");
+    public static List<PerReplicaStates.Operation> enablePrs(DocCollection 
coll, PerReplicaStates prs) {
+        log.info("Enabling PRS for {} ", coll.getName());
         List<PerReplicaStates.Operation> result = new ArrayList<>();
         coll.forEachReplica((s, r) -> {
             PerReplicaStates.State st = prs.get(r.getName());
@@ -158,7 +153,7 @@ public class PerReplicaStatesOps {
         return result;
     }
 
-    private static List<PerReplicaStates.Operation> disable(PerReplicaStates 
prs) {
+    public static List<PerReplicaStates.Operation> disablePrs(PerReplicaStates 
prs) {
         List<PerReplicaStates.Operation> result = new ArrayList<>();
         prs.states.forEachEntry((s, state) -> result.add(new 
PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, state)));
         return result;
@@ -272,7 +267,7 @@ public class PerReplicaStatesOps {
         return result;
     }
 
-    PerReplicaStatesOps init(PerReplicaStates rs) {
+    public PerReplicaStatesOps init(PerReplicaStates rs) {
         if (rs == null) return null;
         get(rs);
         return this;

Reply via email to