gerlowskija commented on code in PR #4069:
URL: https://github.com/apache/solr/pull/4069#discussion_r2741757775


##########
solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java:
##########
@@ -182,8 +182,7 @@ void runLeaderProcess(boolean weAreReplacement) throws 
KeeperException, Interrup
                 .getClusterState()
                 .getCollection(collection)
                 .getSlice(shardId)
-                .getReplicas()
-                .size()
+                .getNumLeaderReplicas()

Review Comment:
   [+1] Good catch, clear logic error here.



##########
solr/core/src/test-files/log4j2.xml:
##########
@@ -22,7 +22,7 @@
       <PatternLayout>
         <Pattern>
           %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ 
c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ 
x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
-          =>%ex{short}}}{10240}%n
+          =>%ex}}{10240}%n

Review Comment:
   [Q] If I understand this correctly, this is changing how test runs in 'core' 
log exceptions?
   
   No problem with changing that but we have one of these files in each module 
and I imagine if we're updating one of them we should update them all?
   
   I know your time is limited for the next few weeks and you've taken a lot of 
pains to pare this PR down.  With your agreement let's drop this change from 
the PR and I'll open another PR to make the change across the board for our 
test files and shepherd it through.



##########
solr/core/src/java/org/apache/solr/cloud/ShardTerms.java:
##########
@@ -124,6 +124,28 @@ private boolean skipIncreaseTermOf(String key, Set<String> 
replicasNeedingRecove
     return replicasNeedingRecovery.contains(key);
   }
 
+  public ShardTerms setHighestTerms(Set<String> highestTermKeys) {

Review Comment:
   The whole "term" algorithm makes some pretty strict assumptions about who 
can update term values, and on what conditions.  From class javadocs on 
ZkShardTerms:
   
   ```
    * <p>Terms can only updated in two strict ways:
    *
    * <ul>
    *   <li>A replica sets its term equals to leader's term
    *   <li>The leader increase its term and some other replicas by 1
    * </ul>
   ```
   
   This method seems to fit under the latter provision, which is good.  But 
could we add Javadocs here to indicate that this method should only be called 
by current shard-leaders?  Or if this is safe for non-leaders to call in 
certain situations, add javadocs to describe what those are and why.
   
   Just trying to defend against the possibility of someone coming back through 
here in a month or two and thinking: "Hey this doesn't fit with the documented 
algorithm at all" 



##########
solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java:
##########
@@ -120,6 +120,11 @@ public void ensureTermsIsHigher(String leader, Set<String> 
replicasNeedingRecove
     mutate(terms -> terms.increaseTerms(leader, replicasNeedingRecovery));
   }
 
+  public void ensureHighestTerms(Set<String> mostUpToDateCores) {

Review Comment:
   Ditto, re: my previous comment on `ShardTerms.setHighestTerms`.  We should 
add some Javadocs to make sure this only called by leaders, or if it's actually 
safe to call elsewhere describe where and why.



##########
solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import static org.hamcrest.Matchers.in;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ZkShardTermsRecoveryTest extends SolrCloudTestCase {
+  private static final String COLLECTION = "collection1";
+  private static final int NUM_SHARDS = 2;
+  private static final int NUM_REPLICAS = 5;
+  private static int NUM_DOCS = 0;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(3).addConfig("conf", 
configset("cloud-minimal")).configure();
+    assertEquals(
+        0,
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 
NUM_SHARDS, NUM_REPLICAS)
+            .process(cluster.getSolrClient())
+            .getStatus());
+    cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, 
NUM_SHARDS * NUM_REPLICAS);
+  }
+
+  @Before
+  public void waitForActiveState() {
+    cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, 
NUM_SHARDS * NUM_REPLICAS);
+  }
+
+  @Test
+  public void testShardTermsInducedReplication() throws Exception {
+    String shard = "shard2";
+    if (random().nextBoolean()) {
+      // Add uncommitted/committed documents, to test that part of the recovery
+      UpdateRequest up = new UpdateRequest();
+      for (int i = 0; i < 1000; i++) {
+        up.add("id", "id2-" + i);
+      }
+      up.process(cluster.getSolrClient(), COLLECTION);
+      NUM_DOCS += 1000;
+      if (random().nextBoolean()) {
+        cluster.getSolrClient().commit(COLLECTION);
+      }
+    }
+
+    DocCollection docCollection = 
cluster.getZkStateReader().getCollection(COLLECTION);
+    JettySolrRunner jetty = cluster.getRandomJetty(random());
+
+    Slice shard1 = docCollection.getSlice(shard);

Review Comment:
   [0] Should probably be "shard2", given the value of the string variable set 
on L64?
   
   Idrc about the name, just mentioning it in case it's a bug in your test 
logic.



##########
solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import static org.hamcrest.Matchers.in;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ZkShardTermsRecoveryTest extends SolrCloudTestCase {
+  private static final String COLLECTION = "collection1";
+  private static final int NUM_SHARDS = 2;
+  private static final int NUM_REPLICAS = 5;
+  private static int NUM_DOCS = 0;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(3).addConfig("conf", 
configset("cloud-minimal")).configure();
+    assertEquals(
+        0,
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 
NUM_SHARDS, NUM_REPLICAS)
+            .process(cluster.getSolrClient())
+            .getStatus());
+    cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, 
NUM_SHARDS * NUM_REPLICAS);
+  }
+
+  @Before
+  public void waitForActiveState() {
+    cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2, 
NUM_SHARDS * NUM_REPLICAS);
+  }
+
+  @Test
+  public void testShardTermsInducedReplication() throws Exception {
+    String shard = "shard2";
+    if (random().nextBoolean()) {
+      // Add uncommitted/committed documents, to test that part of the recovery
+      UpdateRequest up = new UpdateRequest();
+      for (int i = 0; i < 1000; i++) {
+        up.add("id", "id2-" + i);
+      }
+      up.process(cluster.getSolrClient(), COLLECTION);
+      NUM_DOCS += 1000;
+      if (random().nextBoolean()) {
+        cluster.getSolrClient().commit(COLLECTION);
+      }
+    }
+
+    DocCollection docCollection = 
cluster.getZkStateReader().getCollection(COLLECTION);
+    JettySolrRunner jetty = cluster.getRandomJetty(random());
+
+    Slice shard1 = docCollection.getSlice(shard);
+    Replica leader = shard1.getLeader();
+    Replica replica = shard1.getReplicas(r -> !r.isLeader()).getFirst();
+    List<Replica> recoveryReplicas =
+        shard1.getReplicas(
+            r -> r.getType().leaderEligible && !(r.equals(leader) || 
r.equals(replica)));
+
+    ZkShardTerms shardTerms =
+        jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION, 
shard);
+    // Increase the leader and another replica's shardTerms
+    shardTerms.ensureHighestTerms(Set.of(leader.getName(), replica.getName()));
+
+    waitForState(
+        "Waiting for replicas to go into recovery",
+        COLLECTION,
+        5,
+        TimeUnit.SECONDS,
+        state -> {
+          Slice shardState = state.getSlice(shard);
+          for (Replica r : recoveryReplicas) {
+            if (shardState.getReplica(r.name).getState() != 
Replica.State.RECOVERING) {

Review Comment:
   [Q] Since "recovering" is a transient state and the doc/index size here is 
very small, is it possible that the replica would go into recovery as expected 
and `waitForState` would just miss it based on when it polls?



##########
solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java:
##########
@@ -250,7 +222,7 @@ void runLeaderProcess(boolean weAreReplacement) throws 
KeeperException, Interrup
             try {
               if (log.isDebugEnabled()) {
                 log.debug(
-                    "{} synched {}",
+                    "{} synced {}",

Review Comment:
   lol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to