gerlowskija commented on code in PR #4069:
URL: https://github.com/apache/solr/pull/4069#discussion_r2741757775
##########
solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java:
##########
@@ -182,8 +182,7 @@ void runLeaderProcess(boolean weAreReplacement) throws
KeeperException, Interrup
.getClusterState()
.getCollection(collection)
.getSlice(shardId)
- .getReplicas()
- .size()
+ .getNumLeaderReplicas()
Review Comment:
[+1] Good catch, clear logic error here.
##########
solr/core/src/test-files/log4j2.xml:
##########
@@ -22,7 +22,7 @@
<PatternLayout>
<Pattern>
%maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{
c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{
x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
- =>%ex{short}}}{10240}%n
+ =>%ex}}{10240}%n
Review Comment:
[Q] If I understand this correctly, this is changing how test runs in 'core'
log exceptions?
No problem with changing that but we have one of these files in each module
and I imagine if we're updating one of them we should update them all?
I know your time is limited for the next few weeks and you've taken a lot of
pains to pare this PR down. With your agreement let's drop this change from
the PR and I'll open another PR to make the change across the board for our
test files and shepherd it through.
##########
solr/core/src/java/org/apache/solr/cloud/ShardTerms.java:
##########
@@ -124,6 +124,28 @@ private boolean skipIncreaseTermOf(String key, Set<String>
replicasNeedingRecove
return replicasNeedingRecovery.contains(key);
}
+ public ShardTerms setHighestTerms(Set<String> highestTermKeys) {
Review Comment:
The whole "term" algorithm makes some pretty strict assumptions about who
can update term values, and on what conditions. From class javadocs on
ZkShardTerms:
```
* <p>Terms can only updated in two strict ways:
*
* <ul>
* <li>A replica sets its term equals to leader's term
* <li>The leader increase its term and some other replicas by 1
* </ul>
```
This method seems to fit under the latter provision, which is good. But
could we add Javadocs here to indicate that this method should only be called
by current shard-leaders? Or if this is safe for non-leaders to call in
certain situations, add javadocs to describe what those are and why.
Just trying to defend against the possibility of someone coming back through
here in a month or two and thinking: "Hey this doesn't fit with the documented
algorithm at all"
##########
solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java:
##########
@@ -120,6 +120,11 @@ public void ensureTermsIsHigher(String leader, Set<String>
replicasNeedingRecove
mutate(terms -> terms.increaseTerms(leader, replicasNeedingRecovery));
}
+ public void ensureHighestTerms(Set<String> mostUpToDateCores) {
Review Comment:
Ditto, re: my previous comment on `ShardTerms.setHighestTerms`. We should
add some Javadocs to make sure this only called by leaders, or if it's actually
safe to call elsewhere describe where and why.
##########
solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import static org.hamcrest.Matchers.in;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ZkShardTermsRecoveryTest extends SolrCloudTestCase {
+ private static final String COLLECTION = "collection1";
+ private static final int NUM_SHARDS = 2;
+ private static final int NUM_REPLICAS = 5;
+ private static int NUM_DOCS = 0;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(3).addConfig("conf",
configset("cloud-minimal")).configure();
+ assertEquals(
+ 0,
+ CollectionAdminRequest.createCollection(COLLECTION, "conf",
NUM_SHARDS, NUM_REPLICAS)
+ .process(cluster.getSolrClient())
+ .getStatus());
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2,
NUM_SHARDS * NUM_REPLICAS);
+ }
+
+ @Before
+ public void waitForActiveState() {
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2,
NUM_SHARDS * NUM_REPLICAS);
+ }
+
+ @Test
+ public void testShardTermsInducedReplication() throws Exception {
+ String shard = "shard2";
+ if (random().nextBoolean()) {
+ // Add uncommitted/committed documents, to test that part of the recovery
+ UpdateRequest up = new UpdateRequest();
+ for (int i = 0; i < 1000; i++) {
+ up.add("id", "id2-" + i);
+ }
+ up.process(cluster.getSolrClient(), COLLECTION);
+ NUM_DOCS += 1000;
+ if (random().nextBoolean()) {
+ cluster.getSolrClient().commit(COLLECTION);
+ }
+ }
+
+ DocCollection docCollection =
cluster.getZkStateReader().getCollection(COLLECTION);
+ JettySolrRunner jetty = cluster.getRandomJetty(random());
+
+ Slice shard1 = docCollection.getSlice(shard);
Review Comment:
[0] Should probably be "shard2", given the value of the string variable set
on L64?
Idrc about the name, just mentioning it in case it's a bug in your test
logic.
##########
solr/core/src/test/org/apache/solr/cloud/ZkShardTermsRecoveryTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import static org.hamcrest.Matchers.in;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ZkShardTermsRecoveryTest extends SolrCloudTestCase {
+ private static final String COLLECTION = "collection1";
+ private static final int NUM_SHARDS = 2;
+ private static final int NUM_REPLICAS = 5;
+ private static int NUM_DOCS = 0;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(3).addConfig("conf",
configset("cloud-minimal")).configure();
+ assertEquals(
+ 0,
+ CollectionAdminRequest.createCollection(COLLECTION, "conf",
NUM_SHARDS, NUM_REPLICAS)
+ .process(cluster.getSolrClient())
+ .getStatus());
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2,
NUM_SHARDS * NUM_REPLICAS);
+ }
+
+ @Before
+ public void waitForActiveState() {
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, 2,
NUM_SHARDS * NUM_REPLICAS);
+ }
+
+ @Test
+ public void testShardTermsInducedReplication() throws Exception {
+ String shard = "shard2";
+ if (random().nextBoolean()) {
+ // Add uncommitted/committed documents, to test that part of the recovery
+ UpdateRequest up = new UpdateRequest();
+ for (int i = 0; i < 1000; i++) {
+ up.add("id", "id2-" + i);
+ }
+ up.process(cluster.getSolrClient(), COLLECTION);
+ NUM_DOCS += 1000;
+ if (random().nextBoolean()) {
+ cluster.getSolrClient().commit(COLLECTION);
+ }
+ }
+
+ DocCollection docCollection =
cluster.getZkStateReader().getCollection(COLLECTION);
+ JettySolrRunner jetty = cluster.getRandomJetty(random());
+
+ Slice shard1 = docCollection.getSlice(shard);
+ Replica leader = shard1.getLeader();
+ Replica replica = shard1.getReplicas(r -> !r.isLeader()).getFirst();
+ List<Replica> recoveryReplicas =
+ shard1.getReplicas(
+ r -> r.getType().leaderEligible && !(r.equals(leader) ||
r.equals(replica)));
+
+ ZkShardTerms shardTerms =
+ jetty.getCoreContainer().getZkController().getShardTerms(COLLECTION,
shard);
+ // Increase the leader and another replica's shardTerms
+ shardTerms.ensureHighestTerms(Set.of(leader.getName(), replica.getName()));
+
+ waitForState(
+ "Waiting for replicas to go into recovery",
+ COLLECTION,
+ 5,
+ TimeUnit.SECONDS,
+ state -> {
+ Slice shardState = state.getSlice(shard);
+ for (Replica r : recoveryReplicas) {
+ if (shardState.getReplica(r.name).getState() !=
Replica.State.RECOVERING) {
Review Comment:
[Q] Since "recovering" is a transient state and the doc/index size here is
very small, is it possible that the replica would go into recovery as expected
and `waitForState` would just miss it based on when it polls?
##########
solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java:
##########
@@ -250,7 +222,7 @@ void runLeaderProcess(boolean weAreReplacement) throws
KeeperException, Interrup
try {
if (log.isDebugEnabled()) {
log.debug(
- "{} synched {}",
+ "{} synced {}",
Review Comment:
lol
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]