This is an automated email from the ASF dual-hosted git repository. noble pushed a commit to branch branch_9_1 in repository https://gitbox.apache.org/repos/asf/solr.git
commit b006bbc4304294c06eb9318ca1f1c446a87114cf Author: patsonluk <[email protected]> AuthorDate: Thu Oct 20 17:25:18 2022 -0700 SOLR-16478 : Further changes to avoid PRS entry writes from overseer (#1095) --- .../java/org/apache/solr/cloud/ZkController.java | 15 +++++++ .../apache/solr/cloud/overseer/SliceMutator.java | 17 +------- .../cloud/PerReplicaStatesIntegrationTest.java | 50 +++++++++++++++++++++- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 3ca06a17e4e..c5eedaf8220 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1861,6 +1861,21 @@ public class ZkController implements Closeable { } CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); if (removeCoreFromZk) { + // extra handling for PRS, we need to write the PRS entries from this node directly, + // as overseer does not and should not handle those entries + if (docCollection != null && docCollection.isPerReplicaState() && coreNodeName != null) { + if (log.isDebugEnabled()) { + log.debug( + "Unregistering core with coreNodeName {} of collection {} - deleting the PRS entries from ZK", + coreNodeName, + docCollection.getName()); + } + PerReplicaStates perReplicaStates = + PerReplicaStatesFetcher.fetch( + docCollection.getZNode(), zkClient, docCollection.getPerReplicaStates()); + PerReplicaStatesOps.deleteReplica(coreNodeName, perReplicaStates) + .persist(docCollection.getZNode(), zkClient); + } ZkNodeProps m = new ZkNodeProps( Overseer.QUEUE_OPERATION, diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java index 1f860abf116..50b793beca0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java +++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java @@ -31,9 +31,6 @@ import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.api.collections.Assign; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.PerReplicaStates; -import org.apache.solr.common.cloud.PerReplicaStatesFetcher; -import org.apache.solr.common.cloud.PerReplicaStatesOps; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.RoutingRule; import org.apache.solr.common.cloud.Slice; @@ -111,19 +108,7 @@ public class SliceMutator { coll, slice); - if (collection.isPerReplicaState()) { - PerReplicaStates prs = - PerReplicaStatesFetcher.fetch( - collection.getZNode(), zkClient, collection.getPerReplicaStates()); - return new ZkWriteCommand( - coll, - updateReplica(collection, sl, replica.getName(), replica), - PerReplicaStatesOps.addReplica( - replica.getName(), replica.getState(), replica.isLeader(), prs), - true); - } else { - return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica)); - } + return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica)); } public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) { diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java index 24f6039225d..f5371b50082 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/PerReplicaStatesIntegrationTest.java @@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.SolrPingResponse; import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.SolrCloudTestCase; @@ -78,11 +79,22 @@ public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase { // Now let's do an add replica CollectionAdminRequest.addReplicaToShard(testCollection, "shard1") .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(testCollection, 2, 5); prs = PerReplicaStatesFetcher.fetch( DocCollection.getCollectionPath(testCollection), cluster.getZkClient(), null); assertEquals(5, prs.states.size()); + // Test delete replica + Replica leader = c.getReplica((s, replica) -> replica.isLeader()); + CollectionAdminRequest.deleteReplica(testCollection, "shard1", leader.getName()) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(testCollection, 2, 4); + prs = + PerReplicaStatesFetcher.fetch( + DocCollection.getCollectionPath(testCollection), cluster.getZkClient(), null); + assertEquals(4, prs.states.size()); + testCollection = "perReplicaState_testv2"; new V2Request.Builder("/collections") .withMethod(POST) @@ -291,12 +303,46 @@ public class PerReplicaStatesIntegrationTest extends SolrCloudTestCase { // Hence 5 * 10 = 70. Take note that +1 for ADD, and +2 for all the UPDATE (remove the old PRS // and add new PRS entry) assertEquals(50, stat.getCversion()); + + CollectionAdminResponse response = + CollectionAdminRequest.addReplicaToShard(PRS_COLL, "shard1") + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(PRS_COLL, 10, 11); + stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); + // For the new replica: + // +2 for state.json overseer writes, even though there's no longer PRS updates from + // overseer, current code would still do a "TOUCH" on the PRS entry + // +1 for ZkController#preRegister, in ZkController#publish, direct write PRS to down + // +2 for RecoveryStrategy#doRecovery, since this is no longer a new collection, new replica + // will go through recovery, direct write PRS to RECOVERING + // +2 for ZkController#register, in ZkController#publish, direct write PRS to active + assertEquals(57, stat.getCversion()); + + String addedCore = response.getCollectionCoresStatus().entrySet().iterator().next().getKey(); + Replica addedReplica = + cluster + .getZkStateReader() + .getCollection(PRS_COLL) + .getSlice("shard1") + .getReplicas(replica -> addedCore.equals(replica.getCoreName())) + .get(0); + CollectionAdminRequest.deleteReplica(PRS_COLL, "shard1", addedReplica.getName()) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(PRS_COLL, 10, 10); + stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); + // For replica deletion + // +1 for ZkController#unregister, which delete the PRS entry from data node + // +2 for state.json overseer writes, even though there's no longer PRS updates from + // overseer, current code would still do a "TOUCH" on the PRS entry + assertEquals(60, stat.getCversion()); + for (JettySolrRunner j : cluster.getJettySolrRunners()) { j.stop(); j.start(true); stat = cluster.getZkClient().exists(DocCollection.getCollectionPath(PRS_COLL), null, true); - // ensure restart does not update the state.json - assertEquals(10, stat.getVersion()); + // ensure restart does not update the state.json, after addReplica/deleteReplica its 10 + 2 + // on state.json version + assertEquals(12, stat.getVersion()); } } finally { cluster.shutdown();
