Can we please add proper javadoc and @lucene.internal tag when making methods public, especially in classes like IndexWriter like this?
On Tue, Dec 15, 2020 at 5:04 PM <[email protected]> wrote: > > This is an automated email from the ASF dual-hosted git repository. > > mdrob pushed a commit to branch branch_8x > in repository https://gitbox.apache.org/repos/asf/lucene-solr.git > > > The following commit(s) were added to refs/heads/branch_8x by this push: > new b090971 SOLR-15029 Trigger leader election on index writer tragedy > b090971 is described below > > commit b090971259f57973941d70d13612e22985a09a8d > Author: Mike Drob <[email protected]> > AuthorDate: Fri Dec 4 15:19:49 2020 -0800 > > SOLR-15029 Trigger leader election on index writer tragedy > > SOLR-13027 Use TestInjection so that we always have a Tragic Event > > When we encounter a tragic error in the index writer, we can trigger a > leader election instead of queing up a delete and re-add of the node in > question. This should result in a more graceful transition, and the > previous leader will eventually be put into recovery by a new leader. > > Backport removes additional logging from ShardTerms.save because we do > not have StackWalker in Java 8. > --- > .../java/org/apache/lucene/index/IndexWriter.java | 7 +- > .../java/org/apache/solr/cloud/LeaderElector.java | 11 +- > .../org/apache/solr/cloud/RecoveryStrategy.java | 157 +++++++----------- > .../solr/cloud/ShardLeaderElectionContext.java | 8 +- > .../java/org/apache/solr/cloud/ZkController.java | 83 ++++++---- > .../java/org/apache/solr/cloud/ZkShardTerms.java | 76 ++++----- > .../java/org/apache/solr/core/CoreContainer.java | 10 +- > .../apache/solr/handler/RequestHandlerBase.java | 2 + > .../solr/handler/admin/CollectionsHandler.java | 1 + > .../solr/handler/admin/RebalanceLeaders.java | 2 +- > .../org/apache/solr/servlet/ResponseUtils.java | 2 +- > .../java/org/apache/solr/util/TestInjection.java | 39 +++++ > .../apache/solr/cloud/LeaderTragicEventTest.java | 177 > +++++++++------------ > .../test/org/apache/solr/cloud/ShardTermsTest.java | 48 ++++++ > .../org/apache/solr/cloud/ZkShardTermsTest.java | 8 - > .../apache/solr/client/solrj/cloud/ShardTerms.java | 19 ++- > 16 files changed, 338 insertions(+), 312 deletions(-) > > diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java > b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java > index 06ab80e..5e3182a 100644 > --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java > +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java > @@ -5172,11 +5172,12 @@ public class IndexWriter implements Closeable, > TwoPhaseCommit, Accountable, > } > > /** > - * This method should be called on a tragic event ie. if a downstream > class of the writer > + * <p>This method should be called on a tragic event ie. if a downstream > class of the writer > * hits an unrecoverable exception. This method does not rethrow the > tragic event exception. > - * Note: This method will not close the writer but can be called from any > location without respecting any lock order > + * <p>Note: This method will not close the writer but can be called from > any location without respecting any lock order > + * <p>This method is visible for testing, and is not expected to be called > by client code > */ > - private void onTragicEvent(Throwable tragedy, String location) { > + public void onTragicEvent(Throwable tragedy, String location) { > // This is not supposed to be tragic: IW is supposed to catch this and > // ignore, because it means we asked the merge to abort: > assert tragedy instanceof MergePolicy.MergeAbortedException == false; > diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java > b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java > index f50aa11..e55ce2b 100644 > --- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java > +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java > @@ -18,10 +18,11 @@ package org.apache.solr.cloud; > > import java.io.IOException; > import java.lang.invoke.MethodHandles; > -import java.util.Collections; > +import java.util.Comparator; > import java.util.Iterator; > import java.util.List; > import java.util.Map; > +import java.util.function.Function; > import java.util.regex.Matcher; > import java.util.regex.Pattern; > > @@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory; > > /** > * Leader Election process. This class contains the logic by which a > - * leader is chosen. First call * {@link #setup(ElectionContext)} to ensure > + * leader is chosen. First call {@link #setup(ElectionContext)} to ensure > * the election process is init'd. Next call > * {@link #joinElection(ElectionContext, boolean)} to start the leader > election. > * > @@ -166,7 +167,6 @@ public class LeaderElector { > } > } > > - // TODO: get this core param out of here > protected void runIamLeaderProcess(final ElectionContext context, boolean > weAreReplacement) throws KeeperException, > InterruptedException, IOException { > context.runLeaderProcess(weAreReplacement,0); > @@ -378,10 +378,7 @@ public class LeaderElector { > * Sort n string sequence list. > */ > public static void sortSeqs(List<String> seqs) { > - Collections.sort(seqs, (o1, o2) -> { > - int i = getSeq(o1) - getSeq(o2); > - return i == 0 ? o1.compareTo(o2) : i; > - }); > + > seqs.sort(Comparator.comparingInt(LeaderElector::getSeq).thenComparing(Function.identity())); > } > > void retryElection(ElectionContext context, boolean joinAtHead) throws > KeeperException, InterruptedException, IOException { > diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java > b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java > index f70276c..1366970 100644 > --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java > +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java > @@ -197,9 +197,8 @@ public class RecoveryStrategy implements Runnable, > Closeable { > log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, > coreZkNodeName); > } > > - final private void recoveryFailed(final SolrCore core, > - final ZkController zkController, final String baseUrl, > - final String shardZkNodeName, final CoreDescriptor cd) throws > Exception { > + final private void recoveryFailed(final ZkController zkController, > + final CoreDescriptor cd) throws > Exception { > SolrException.log(log, "Recovery failed - I give up."); > try { > zkController.publish(cd, Replica.State.RECOVERY_FAILED); > @@ -423,63 +422,64 @@ public class RecoveryStrategy implements Runnable, > Closeable { > } > > if (!successfulRecovery) { > - // lets pause for a moment and we need to try again... > - // TODO: we don't want to retry for some problems? > - // Or do a fall off retry... > - try { > + if (waitBetweenRecoveries(core.getName())) break; > + } > + } > + // We skip core.seedVersionBuckets(); We don't have a transaction log > + log.info("Finished recovery process, successful=[{}]", > successfulRecovery); > + } > > - if (isClosed()) { > - if (log.isInfoEnabled()) { > - log.info("Recovery for core {} has been closed", > core.getName()); > - } > - break; > - } > + /** > + * @return true if we have reached max attempts or should stop recovering > for some other reason > + */ > + private boolean waitBetweenRecoveries(String coreName) { > + // lets pause for a moment and we need to try again... > + // TODO: we don't want to retry for some problems? > + // Or do a fall off retry... > + try { > + if (isClosed()) { > + log.info("Recovery for core {} has been closed", coreName); > + return true; > + } > > - log.error("Recovery failed - trying again... ({})", retries); > + log.error("Recovery failed - trying again... ({})", retries); > > - retries++; > - if (retries >= maxRetries) { > - SolrException.log(log, "Recovery failed - max retries exceeded > (" + retries + ")."); > - try { > - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, > this.coreDescriptor); > - } catch (Exception e) { > - SolrException.log(log, "Could not publish that recovery > failed", e); > - } > - break; > - } > + retries++; > + if (retries >= maxRetries) { > + SolrException.log(log, "Recovery failed - max retries exceeded (" + > retries + ")."); > + try { > + recoveryFailed(zkController, this.coreDescriptor); > } catch (Exception e) { > - SolrException.log(log, "An error has occurred during recovery", e); > + SolrException.log(log, "Could not publish that recovery failed", > e); > } > + return true; > + } > + } catch (Exception e) { > + SolrException.log(log, "An error has occurred during recovery", e); > + } > > - try { > - // Wait an exponential interval between retries, start at 5 > seconds and work up to a minute. > - // If we're at attempt >= 4, there's no point computing pow(2, > retries) because the result > - // will always be the minimum of the two (12). Since we sleep at 5 > seconds sub-intervals in > - // order to check if we were closed, 12 is chosen as the maximum > loopCount (5s * 12 = 1m). > - int loopCount = retries < 4 ? (int) Math.min(Math.pow(2, retries), > 12) : 12; > - if (log.isInfoEnabled()) { > - log.info("Wait [{}] seconds before trying to recover again > (attempt={})", > + try { > + // Wait an exponential interval between retries, start at 4 seconds > and work up to a minute. > + // Meanwhile we will check in 2s sub-intervals to see if we've been > closed > + // Maximum loop count is 30 because we never want to wait longer than > a minute (2s * 30 = 1m) > + int loopCount = retries < 5 ? (int) Math.pow(2, retries) : 30; > + if (log.isInfoEnabled()) { > + log.info("Wait [{}] seconds before trying to recover again > (attempt={})", > TimeUnit.MILLISECONDS.toSeconds(loopCount * > startingRecoveryDelayMilliSeconds), retries); > - } > - for (int i = 0; i < loopCount; i++) { > - if (isClosed()) { > - if (log.isInfoEnabled()) { > - log.info("Recovery for core {} has been closed", > core.getName()); > - } > - break; // check if someone closed us > - } > - Thread.sleep(startingRecoveryDelayMilliSeconds); > - } > - } catch (InterruptedException e) { > - Thread.currentThread().interrupt(); > - log.warn("Recovery was interrupted.", e); > - close = true; > + } > + for (int i = 0; i < loopCount; i++) { > + if (isClosed()) { > + log.info("Recovery for core {} has been closed", coreName); > + break; // check if someone closed us > } > + Thread.sleep(startingRecoveryDelayMilliSeconds); > } > - > + } catch (InterruptedException e) { > + Thread.currentThread().interrupt(); > + log.warn("Recovery was interrupted.", e); > + close = true; > } > - // We skip core.seedVersionBuckets(); We don't have a transaction log > - log.info("Finished recovery process, successful=[{}]", > successfulRecovery); > + return false; > } > > // TODO: perhaps make this grab a new core each time through the loop to > handle core reloads? > @@ -490,8 +490,8 @@ public class RecoveryStrategy implements Runnable, > Closeable { > ulog = core.getUpdateHandler().getUpdateLog(); > if (ulog == null) { > SolrException.log(log, "No UpdateLog found - cannot recover."); > - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, > - this.coreDescriptor); > + recoveryFailed(zkController, > + this.coreDescriptor); > return; > } > > @@ -709,7 +709,7 @@ public class RecoveryStrategy implements Runnable, > Closeable { > } > zkController.publish(this.coreDescriptor, Replica.State.ACTIVE); > } catch (Exception e) { > - log.error("Could not publish as ACTIVE after succesful > recovery", e); > + log.error("Could not publish as ACTIVE after successful > recovery", e); > successfulRecovery = false; > } > > @@ -721,53 +721,8 @@ public class RecoveryStrategy implements Runnable, > Closeable { > } > > if (!successfulRecovery) { > - // lets pause for a moment and we need to try again... > - // TODO: we don't want to retry for some problems? > - // Or do a fall off retry... > - try { > - > - if (isClosed()) { > - log.info("RecoveryStrategy has been closed"); > - break; > - } > - > - log.error("Recovery failed - trying again... ({})", retries); > - > - retries++; > - if (retries >= maxRetries) { > - SolrException.log(log, "Recovery failed - max retries exceeded > (" + retries + ")."); > - try { > - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, > this.coreDescriptor); > - } catch (Exception e) { > - SolrException.log(log, "Could not publish that recovery > failed", e); > - } > - break; > - } > - } catch (Exception e) { > - SolrException.log(log, "An error has occurred during recovery", e); > - } > - > - try { > - // Wait an exponential interval between retries, start at 2 > seconds and work up to a minute. > - // Since we sleep at 2 seconds sub-intervals in > - // order to check if we were closed, 30 is chosen as the maximum > loopCount (2s * 30 = 1m). > - double loopCount = Math.min(Math.pow(2, retries - 1), 30); > - log.info("Wait [{}] seconds before trying to recover again > (attempt={})", > - loopCount * startingRecoveryDelayMilliSeconds, retries); > - for (int i = 0; i < loopCount; i++) { > - if (isClosed()) { > - log.info("RecoveryStrategy has been closed"); > - break; // check if someone closed us > - } > - Thread.sleep(startingRecoveryDelayMilliSeconds); > - } > - } catch (InterruptedException e) { > - Thread.currentThread().interrupt(); > - log.warn("Recovery was interrupted.", e); > - close = true; > - } > + if (waitBetweenRecoveries(core.getName())) break; > } > - > } > > // if replay was skipped (possibly to due pulling a full index from the > leader), > @@ -780,6 +735,10 @@ public class RecoveryStrategy implements Runnable, > Closeable { > log.info("Finished recovery process, successful=[{}]", > successfulRecovery); > } > > + /** > + * Make sure we can connect to the shard leader as currently defined in ZK > + * @param ourUrl if the leader url is the same as our url, we will skip > trying to connect > + */ > private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, > boolean mayPutReplicaAsDown) > throws Exception { > int numTried = 0; > diff --git > a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java > b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java > index 69ccf75..68b062e 100644 > --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java > +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java > @@ -46,7 +46,6 @@ import > org.apache.zookeeper.KeeperException.SessionExpiredException; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > -// add core container and stop passing core around... > final class ShardLeaderElectionContext extends > ShardLeaderElectionContextBase { > private static final Logger log = > LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); > > @@ -120,11 +119,10 @@ final class ShardLeaderElectionContext extends > ShardLeaderElectionContextBase { > > zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m)); > } > > - boolean allReplicasInLine = false; > if (!weAreReplacement) { > - allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait); > + waitForReplicasToComeUp(leaderVoteWait); > } else { > - allReplicasInLine = areAllReplicasParticipating(); > + areAllReplicasParticipating(); > } > > if (isClosed) { > @@ -237,7 +235,6 @@ final class ShardLeaderElectionContext extends > ShardLeaderElectionContextBase { > > } > > - boolean isLeader = true; > if (!isClosed) { > try { > if (replicaType == Replica.Type.TLOG) { > @@ -281,7 +278,6 @@ final class ShardLeaderElectionContext extends > ShardLeaderElectionContextBase { > throw new SolrException(ErrorCode.SERVER_ERROR, > "ZK session expired - cancelling election for " + collection + > " " + shardId); > } catch (Exception e) { > - isLeader = false; > SolrException.log(log, "There was a problem trying to register as > the leader", e); > > try (SolrCore core = cc.getCore(coreName)) { > diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java > b/solr/core/src/java/org/apache/solr/cloud/ZkController.java > index d71adb1..6cc2afe 100644 > --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java > +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java > @@ -64,11 +64,33 @@ import org.apache.solr.cloud.overseer.SliceMutator; > import org.apache.solr.common.AlreadyClosedException; > import org.apache.solr.common.SolrException; > import org.apache.solr.common.SolrException.ErrorCode; > -import org.apache.solr.common.cloud.*; > +import org.apache.solr.common.cloud.BeforeReconnect; > +import org.apache.solr.common.cloud.ClusterState; > +import org.apache.solr.common.cloud.ConnectionManager; > +import org.apache.solr.common.cloud.DefaultConnectionStrategy; > +import org.apache.solr.common.cloud.DefaultZkACLProvider; > +import org.apache.solr.common.cloud.DefaultZkCredentialsProvider; > +import org.apache.solr.common.cloud.DocCollection; > +import org.apache.solr.common.cloud.DocCollectionWatcher; > +import org.apache.solr.common.cloud.LiveNodesListener; > +import org.apache.solr.common.cloud.NodesSysPropsCacher; > +import org.apache.solr.common.cloud.OnReconnect; > +import org.apache.solr.common.cloud.Replica; > import org.apache.solr.common.cloud.Replica.Type; > +import org.apache.solr.common.cloud.Slice; > +import org.apache.solr.common.cloud.SolrZkClient; > +import org.apache.solr.common.cloud.UrlScheme; > +import org.apache.solr.common.cloud.ZkACLProvider; > +import org.apache.solr.common.cloud.ZkCmdExecutor; > +import org.apache.solr.common.cloud.ZkConfigManager; > +import org.apache.solr.common.cloud.ZkCoreNodeProps; > +import org.apache.solr.common.cloud.ZkCredentialsProvider; > +import org.apache.solr.common.cloud.ZkMaintenanceUtils; > +import org.apache.solr.common.cloud.ZkNodeProps; > +import org.apache.solr.common.cloud.ZkStateReader; > +import org.apache.solr.common.cloud.ZooKeeperException; > import org.apache.solr.common.params.CollectionParams; > import org.apache.solr.common.params.CommonParams; > -import org.apache.solr.common.params.CoreAdminParams; > import org.apache.solr.common.params.SolrParams; > import org.apache.solr.common.util.ExecutorUtil; > import org.apache.solr.common.util.IOUtils; > @@ -109,7 +131,6 @@ import static > org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP; > import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP; > import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; > import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP; > -import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; > import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; > > /** > @@ -174,6 +195,11 @@ public class ZkController implements Closeable { > } else if (!coreNodeName.equals(other.coreNodeName)) return false; > return true; > } > + > + @Override > + public String toString() { > + return collection + ':' + coreNodeName; > + } > } > > private final Map<ContextKey, ElectionContext> electionContexts = > Collections.synchronizedMap(new HashMap<>()); > @@ -652,55 +678,46 @@ public class ZkController implements Closeable { > /** > * Best effort to give up the leadership of a shard in a core after > hitting a tragic exception > * @param cd The current core descriptor > - * @param tragicException The tragic exception from the {@code IndexWriter} > */ > - public void giveupLeadership(CoreDescriptor cd, Throwable tragicException) > { > - assert tragicException != null; > + public void giveupLeadership(CoreDescriptor cd) { > assert cd != null; > - DocCollection dc = > getClusterState().getCollectionOrNull(cd.getCollectionName()); > + > + String collection = cd.getCollectionName(); > + if (collection == null) return; > + > + DocCollection dc = getClusterState().getCollectionOrNull(collection); > if (dc == null) return; > > Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId()); > if (shard == null) return; > > // if this replica is not a leader, it will be put in recovery state by > the leader > - if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) != > shard.getLeader()) return; > + String leader = cd.getCloudDescriptor().getCoreNodeName(); > + if (shard.getReplica(leader) != shard.getLeader()) return; > > + Set<String> liveNodes = getClusterState().getLiveNodes(); > int numActiveReplicas = shard.getReplicas( > rep -> rep.getState() == Replica.State.ACTIVE > && rep.getType() != Type.PULL > - && getClusterState().getLiveNodes().contains(rep.getNodeName()) > + && liveNodes.contains(rep.getNodeName()) > ).size(); > > // at least the leader still be able to search, we should give up > leadership if other replicas can take over > if (numActiveReplicas >= 2) { > - String key = cd.getCollectionName() + ":" + > cd.getCloudDescriptor().getCoreNodeName(); > - //TODO better handling the case when delete replica was failed > - if (replicasMetTragicEvent.putIfAbsent(key, tragicException) == null) { > - log.warn("Leader {} met tragic exception, give up its leadership", > key, tragicException); > + ContextKey key = new ContextKey(collection, leader); > + ElectionContext context = electionContexts.get(key); > + if (context instanceof ShardLeaderElectionContextBase) { > + LeaderElector elector = ((ShardLeaderElectionContextBase) > context).getLeaderElector(); > try { > - // by using Overseer to remove and add replica back, we can do the > task in an async/robust manner > - Map<String,Object> props = new HashMap<>(); > - props.put(Overseer.QUEUE_OPERATION, "deletereplica"); > - props.put(COLLECTION_PROP, cd.getCollectionName()); > - props.put(SHARD_ID_PROP, shard.getName()); > - props.put(REPLICA_PROP, cd.getCloudDescriptor().getCoreNodeName()); > - getOverseerCollectionQueue().offer(Utils.toJSON(new > ZkNodeProps(props))); > - > - props.clear(); > - props.put(Overseer.QUEUE_OPERATION, "addreplica"); > - props.put(COLLECTION_PROP, cd.getCollectionName()); > - props.put(SHARD_ID_PROP, shard.getName()); > - props.put(ZkStateReader.REPLICA_TYPE, > cd.getCloudDescriptor().getReplicaType().name().toUpperCase(Locale.ROOT)); > - props.put(CoreAdminParams.NODE, getNodeName()); > - getOverseerCollectionQueue().offer(Utils.toJSON(new > ZkNodeProps(props))); > - } catch (Exception e) { > - // Exceptions are not bubbled up. giveupLeadership is best effort, > and is only called in case of some other > - // unrecoverable error happened > - log.error("Met exception on give up leadership for {}", key, e); > - replicasMetTragicEvent.remove(key); > + log.warn("Leader {} met tragic exception, give up its leadership", > key); > + elector.retryElection(context, false); > + } catch (KeeperException | InterruptedException | IOException e) { > SolrZkClient.checkInterrupted(e); > + log.error("Met exception on give up leadership for {}", key, e); > } > + } else { > + // The node is probably already gone > + log.warn("Could not get election context {} to give up leadership", > key); > } > } > } > diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java > b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java > index cc33205..6c38797 100644 > --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java > +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java > @@ -25,6 +25,7 @@ import java.util.Set; > import java.util.concurrent.TimeoutException; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicReference; > +import java.util.function.Function; > > import org.apache.solr.client.solrj.cloud.ShardTerms; > import org.apache.solr.common.SolrException; > @@ -73,7 +74,7 @@ public class ZkShardTerms implements AutoCloseable{ > private final Set<CoreTermWatcher> listeners = new HashSet<>(); > private final AtomicBoolean isClosed = new AtomicBoolean(false); > > - private AtomicReference<ShardTerms> terms = new AtomicReference<>(); > + private final AtomicReference<ShardTerms> terms = new AtomicReference<>(); > > /** > * Listener of a core for shard's term change events > @@ -106,17 +107,15 @@ public class ZkShardTerms implements AutoCloseable{ > } > > /** > - * Ensure that leader's term is higher than some replica's terms > + * Ensure that terms are higher than some replica's terms. If the current > leader is attempting to give up > + * leadership and included in replicasNeedingRecovery, then other replicas > that are in sync will have higher > + * terms, while the leader will stay where it is. > * @param leader coreNodeName of leader > * @param replicasNeedingRecovery set of replicas in which their terms > should be lower than leader's term > */ > public void ensureTermsIsHigher(String leader, Set<String> > replicasNeedingRecovery) { > if (replicasNeedingRecovery.isEmpty()) return; > - > - ShardTerms newTerms; > - while( (newTerms = terms.get().increaseTerms(leader, > replicasNeedingRecovery)) != null) { > - if (forceSaveTerms(newTerms)) return; > - } > + mutate(terms -> terms.increaseTerms(leader, replicasNeedingRecovery)); > } > > public ShardTerms getShardTerms() { > @@ -206,10 +205,7 @@ public class ZkShardTerms implements AutoCloseable{ > * @param coreNodeName of the replica > */ > void registerTerm(String coreNodeName) { > - ShardTerms newTerms; > - while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) { > - if (forceSaveTerms(newTerms)) break; > - } > + mutate(terms -> terms.registerTerm(coreNodeName)); > } > > /** > @@ -218,37 +214,29 @@ public class ZkShardTerms implements AutoCloseable{ > * @param coreNodeName of the replica > */ > public void setTermEqualsToLeader(String coreNodeName) { > - ShardTerms newTerms; > - while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != > null) { > - if (forceSaveTerms(newTerms)) break; > - } > + mutate(terms -> terms.setTermEqualsToLeader(coreNodeName)); > } > > + /** > + * Set a replica's term to 0. If the term does not exist, create it. > + * @param coreNodeName of the replica > + */ > public void setTermToZero(String coreNodeName) { > - ShardTerms newTerms; > - while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) { > - if (forceSaveTerms(newTerms)) break; > - } > + mutate(terms -> terms.setTermToZero(coreNodeName)); > } > > /** > * Mark {@code coreNodeName} as recovering > */ > public void startRecovering(String coreNodeName) { > - ShardTerms newTerms; > - while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) { > - if (forceSaveTerms(newTerms)) break; > - } > + mutate(terms -> terms.startRecovering(coreNodeName)); > } > > /** > * Mark {@code coreNodeName} as finished recovering > */ > public void doneRecovering(String coreNodeName) { > - ShardTerms newTerms; > - while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) { > - if (forceSaveTerms(newTerms)) break; > - } > + mutate(terms -> terms.doneRecovering(coreNodeName)); > } > > public boolean isRecovering(String name) { > @@ -260,8 +248,17 @@ public class ZkShardTerms implements AutoCloseable{ > * so we must switch from term 0 (registered) to 1 (have some data) > */ > public void ensureHighestTermsAreNotZero() { > + mutate(ShardTerms::ensureHighestTermsAreNotZero); > + } > + > + /** > + * Attempt to apply an action and save the results, retrying as necessary. > + * If action returns null, then we are done and will not make additional > retires. > + * @param action The mutation to apply to current shard terms before saving > + */ > + private void mutate(Function<ShardTerms, ShardTerms> action) { > ShardTerms newTerms; > - while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) > { > + while ((newTerms = action.apply(terms.get())) != null) { > if (forceSaveTerms(newTerms)) break; > } > } > @@ -315,7 +312,8 @@ public class ZkShardTerms implements AutoCloseable{ > refreshTerms(); > } catch (KeeperException.NoNodeException e) { > throw e; > - } catch (Exception e) { > + } catch (RuntimeException | KeeperException | InterruptedException e) { > + SolrZkClient.checkInterrupted(e); > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error > while saving shard term for collection: " + collection, e); > } > return false; > @@ -336,13 +334,10 @@ public class ZkShardTerms implements AutoCloseable{ > // it's okay if another beats us creating the node > } > > - } catch (InterruptedException e) { > - Thread.interrupted(); > + } catch (KeeperException | InterruptedException e) { > + Throwable cause = SolrZkClient.checkInterrupted(e); > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, > - "Error creating shard term node in Zookeeper for collection: " + > collection, e); > - } catch (KeeperException e) { > - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, > - "Error creating shard term node in Zookeeper for collection: " + > collection, e); > + "Error creating shard term node in Zookeeper for collection: " + > collection, cause); > } > } > > @@ -356,11 +351,10 @@ public class ZkShardTerms implements AutoCloseable{ > Stat stat = new Stat(); > byte[] data = zkClient.getData(znodePath, null, stat, true); > newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), > stat.getVersion()); > - } catch (KeeperException e) { > - Thread.interrupted(); > - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error > updating shard term for collection: " + collection, e); > - } catch (InterruptedException e) { > - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error > updating shard term for collection: " + collection, e); > + } catch (KeeperException | InterruptedException e) { > + Throwable cause = SolrZkClient.checkInterrupted(e); > + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, > + "Error updating shard term for collection: " + collection, cause); > } > > setNewTerms(newTerms); > @@ -408,7 +402,7 @@ public class ZkShardTerms implements AutoCloseable{ > // exists operation is faster than getData operation > zkClient.exists(znodePath, watcher, true); > } catch (InterruptedException e) { > - Thread.interrupted(); > + Thread.currentThread().interrupt(); > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error > watching shard term for collection: " + collection, e); > } > } > diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java > b/solr/core/src/java/org/apache/solr/core/CoreContainer.java > index c3fb644..6022f83 100644 > --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java > +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java > @@ -2207,7 +2207,15 @@ public class CoreContainer { > } > > if (tragicException != null && isZooKeeperAware()) { > - getZkController().giveupLeadership(solrCore.getCoreDescriptor(), > tragicException); > + getZkController().giveupLeadership(solrCore.getCoreDescriptor()); > + > + try { > + // If the error was something like a full file system disconnect, > this probably won't help > + // But if it is a transient disk failure then it's worth a try > + solrCore.getSolrCoreState().newIndexWriter(solrCore, false); // > should we rollback? > + } catch (IOException e) { > + log.warn("Could not roll index writer after tragedy"); > + } > } > > return tragicException != null; > diff --git > a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java > b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java > index c2dbd0e..080a696 100644 > --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java > +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java > @@ -46,6 +46,7 @@ import org.apache.solr.request.SolrRequestHandler; > import org.apache.solr.response.SolrQueryResponse; > import org.apache.solr.search.SyntaxError; > import org.apache.solr.util.SolrPluginUtils; > +import org.apache.solr.util.TestInjection; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > @@ -206,6 +207,7 @@ public abstract class RequestHandlerBase implements > SolrRequestHandler, SolrInfo > @SuppressWarnings("resource") > Timer.Context dTimer = distrib ? distribRequestTimes.time() : > localRequestTimes.time(); > try { > + TestInjection.injectLeaderTragedy(req.getCore()); > if (pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) > req.getContext().put(USEPARAM, pluginInfo.attributes.get(USEPARAM)); > SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants); > diff --git > a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java > b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java > index f4fb7e0..0832126 100644 > --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java > +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java > @@ -1370,6 +1370,7 @@ public class CollectionsHandler extends > RequestHandlerBase implements Permission > //TODO only increase terms of replicas less out-of-sync > liveReplicas.stream() > .filter(rep -> zkShardTerms.registered(rep.getName())) > + // TODO should this all be done at once instead of increasing > each replica individually? > .forEach(rep -> > zkShardTerms.setTermEqualsToLeader(rep.getName())); > } > > diff --git > a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java > b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java > index 8f3fdb2..239fbec 100644 > --- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java > +++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java > @@ -318,7 +318,7 @@ class RebalanceLeaders { > > // Put the replica in at the head of the queue and send all nodes with the > same sequence number to the back of the list > // There can be "ties", i.e. replicas in the queue with the same sequence > number. Sorting doesn't necessarily sort > - // the one we most care about first. So put the node we _don't care about > at the end of the election queuel > + // the one we most care about first. So put the node we _don't care about > at the end of the election queue_ > > void makeReplicaFirstWatcher(Slice slice, Replica replica) > throws KeeperException, InterruptedException { > diff --git a/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java > b/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java > index 8a5f2eb..b7df6fe 100644 > --- a/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java > +++ b/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java > @@ -69,7 +69,7 @@ public class ResponseUtils { > if (code == 500 || code < 100) { > StringWriter sw = new StringWriter(); > ex.printStackTrace(new PrintWriter(sw)); > - SolrException.log(log, null, ex); > + SolrException.log(log, ex); > info.add("trace", sw.toString()); > > // non standard codes have undefined results with various servers > diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java > b/solr/core/src/java/org/apache/solr/util/TestInjection.java > index 3ae2349..edee292 100644 > --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java > +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java > @@ -16,6 +16,7 @@ > */ > package org.apache.solr.util; > > +import java.io.IOException; > import java.lang.invoke.MethodHandles; > import java.lang.reflect.Method; > import java.util.Collections; > @@ -31,11 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger; > import java.util.regex.Matcher; > import java.util.regex.Pattern; > > +import org.apache.lucene.index.IndexWriter; > import org.apache.solr.common.NonExistentCoreException; > import org.apache.solr.common.SolrException; > import org.apache.solr.common.SolrException.ErrorCode; > import org.apache.solr.common.util.Pair; > import org.apache.solr.core.CoreContainer; > +import org.apache.solr.core.SolrCore; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > @@ -111,6 +114,8 @@ public class TestInjection { > > public volatile static String failUpdateRequests = null; > > + public volatile static String leaderTragedy = null; > + > public volatile static String nonExistentCoreExceptionAfterUnload = null; > > public volatile static String updateLogReplayRandomPause = null; > @@ -171,6 +176,7 @@ public class TestInjection { > nonGracefullClose = null; > failReplicaRequests = null; > failUpdateRequests = null; > + leaderTragedy = null; > nonExistentCoreExceptionAfterUnload = null; > updateLogReplayRandomPause = null; > updateRandomPause = null; > @@ -337,6 +343,39 @@ public class TestInjection { > > return true; > } > + > + public static boolean injectLeaderTragedy(SolrCore core) { > + if (leaderTragedy != null) { > + Random rand = random(); > + if (null == rand) return true; > + > + Pair<Boolean, Integer> pair = parseValue(leaderTragedy); > + boolean enabled = pair.first(); > + int chanceIn100 = pair.second(); > + > + if (! core.getCoreDescriptor().getCloudDescriptor().isLeader()) { > + return true; > + } > + > + if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) { > + RefCounted<IndexWriter> writer = null; > + try { > + writer = core.getSolrCoreState().getIndexWriter(null); > + writer.get().onTragicEvent(new Exception("injected tragedy"), > "injection"); > + } catch (IOException e) { > + // Problem getting the writer, but that will likely bubble up later > + return true; > + } finally { > + if (writer != null) { > + writer.decref(); > + } > + } > + > + throw new SolrException(ErrorCode.SERVER_ERROR, "Random tragedy > fail"); > + } > + } > + return true; > + } > > public static boolean injectNonExistentCoreExceptionAfterUnload(String > cname) { > if (nonExistentCoreExceptionAfterUnload != null) { > diff --git > a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java > b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java > index 2be7add..f15ad47 100644 > --- a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java > +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java > @@ -17,145 +17,120 @@ > > package org.apache.solr.cloud; > > -import static org.hamcrest.CoreMatchers.anyOf; > -import static org.hamcrest.CoreMatchers.is; > - > -import java.io.FileNotFoundException; > -import java.io.IOException; > -import java.lang.invoke.MethodHandles; > -import java.nio.file.NoSuchFileException; > -import java.util.ArrayList; > -import java.util.Collections; > -import java.util.List; > import org.apache.lucene.store.AlreadyClosedException; > -import org.apache.lucene.store.MockDirectoryWrapper; > -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; > +import org.apache.solr.client.solrj.SolrClient; > +import org.apache.solr.client.solrj.SolrQuery; > +import org.apache.solr.client.solrj.SolrServerException; > import org.apache.solr.client.solrj.embedded.JettySolrRunner; > +import > org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException; > import org.apache.solr.client.solrj.impl.HttpSolrClient; > -import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException; > import org.apache.solr.client.solrj.request.CollectionAdminRequest; > +import org.apache.solr.client.solrj.request.QueryRequest; > import org.apache.solr.client.solrj.request.UpdateRequest; > -import org.apache.solr.common.SolrException; > +import org.apache.solr.client.solrj.response.QueryResponse; > +import org.apache.solr.client.solrj.response.UpdateResponse; > import org.apache.solr.common.cloud.ClusterStateUtil; > import org.apache.solr.common.cloud.DocCollection; > import org.apache.solr.common.cloud.Replica; > import org.apache.solr.common.cloud.Slice; > -import org.apache.solr.core.CoreContainer; > -import org.apache.solr.core.DirectoryFactory; > -import org.apache.solr.core.MockDirectoryFactory; > -import org.apache.solr.core.SolrCore; > -import org.junit.AfterClass; > +import org.apache.solr.util.TestInjection; > +import org.hamcrest.MatcherAssert; > +import org.junit.After; > +import org.junit.Before; > import org.junit.BeforeClass; > import org.junit.Test; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > -@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13237") > -public class LeaderTragicEventTest extends SolrCloudTestCase { > +import java.io.IOException; > +import java.lang.invoke.MethodHandles; > > +import static org.hamcrest.CoreMatchers.anyOf; > +import static org.hamcrest.CoreMatchers.is; > + > +public class LeaderTragicEventTest extends SolrCloudTestCase { > private static final Logger log = > LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); > > + private String collection; > + > @BeforeClass > public static void setupCluster() throws Exception { > - System.setProperty("solr.mscheduler", > "org.apache.solr.core.MockConcurrentMergeScheduler"); > - > System.setProperty(MockDirectoryFactory.SOLR_TESTS_USING_MOCK_DIRECTORY_WRAPPER, > "true"); > - > configureCluster(2) > .addConfig("config", > TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) > .configure(); > } > > - @AfterClass > - public static void cleanup() { > - System.clearProperty("solr.mscheduler"); > - > System.clearProperty(MockDirectoryFactory.SOLR_TESTS_USING_MOCK_DIRECTORY_WRAPPER); > + @Before > + public void setUp() throws Exception { > + super.setUp(); > + collection = getSaferTestName(); > + cluster.getSolrClient().setDefaultCollection(collection); > } > > + @After > + public void tearDown() throws Exception { > + super.tearDown(); > + > CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient()); > + } > > @Test > - public void test() throws Exception { > - final String collection = "collection1"; > - cluster.getSolrClient().setDefaultCollection(collection); > + public void testLeaderFailsOver() throws Exception { > CollectionAdminRequest > .createCollection(collection, "config", 1, 2) > .process(cluster.getSolrClient()); > cluster.waitForActiveCollection(collection, 1, 2); > - try { > - List<String> addedIds = new ArrayList<>(); > - Replica oldLeader = corruptLeader(collection, addedIds); > > - waitForState("Timeout waiting for new replica become leader", > collection, (liveNodes, collectionState) -> { > - Slice slice = collectionState.getSlice("shard1"); > + UpdateResponse updateResponse = new UpdateRequest().add("id", > "1").commit(cluster.getSolrClient(), null); > + assertEquals(0, updateResponse.getStatus()); > > - if (slice.getReplicas().size() != 2) return false; > - if (slice.getLeader() == null) return false; > - if (slice.getLeader().getName().equals(oldLeader.getName())) return > false; > + Replica oldLeader = corruptLeader(collection); > > - return true; > - }); > - > ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), > collection, 120000); > - Slice shard = getCollectionState(collection).getSlice("shard1"); > - assertNotSame(shard.getLeader().getNodeName(), > oldLeader.getNodeName()); > - assertEquals(getNonLeader(shard).getNodeName(), > oldLeader.getNodeName()); > + waitForState("Now waiting for new replica to become leader", collection, > (liveNodes, collectionState) -> { > + Slice slice = collectionState.getSlice("shard1"); > > - for (String id : addedIds) { > - assertNotNull(cluster.getSolrClient().getById(collection,id)); > - } > - if (log.isInfoEnabled()) { > - log.info("The test success oldLeader:{} currentState:{}", oldLeader, > getCollectionState(collection)); > - } > + if (slice.getReplicas().size() != 2) return false; > + if (slice.getLeader() == null) return false; > + if (slice.getLeader().getName().equals(oldLeader.getName())) return > false; > > - } finally { > - > CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient()); > + return true; > + }); > + > ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), > collection, 120000); > + Slice shard = getCollectionState(collection).getSlice("shard1"); > + assertNotEquals("Old leader should not be leader again", > oldLeader.getNodeName(), shard.getLeader().getNodeName()); > + assertEquals("Old leader should be a follower", oldLeader.getNodeName(), > getNonLeader(shard).getNodeName()); > + > + // Check that we can continue indexing after this > + updateResponse = new UpdateRequest().add("id", > "2").commit(cluster.getSolrClient(), null); > + assertEquals(0, updateResponse.getStatus()); > + try (SolrClient followerClient = new > HttpSolrClient.Builder(oldLeader.getCoreUrl()).build()) { > + QueryResponse queryResponse = new QueryRequest(new > SolrQuery("*:*")).process(followerClient); > + assertEquals(queryResponse.getResults().toString(), 2, > queryResponse.getResults().getNumFound()); > } > } > > - private Replica corruptLeader(String collection, List<String> addedIds) > throws IOException { > - DocCollection dc = getCollectionState(collection); > - Replica oldLeader = dc.getLeader("shard1"); > - log.info("Corrupt leader : {}", oldLeader); > - > - CoreContainer leaderCC = > cluster.getReplicaJetty(oldLeader).getCoreContainer(); > - SolrCore leaderCore = leaderCC.getCores().iterator().next(); > - MockDirectoryWrapper mockDir = (MockDirectoryWrapper) > leaderCore.getDirectoryFactory() > - .get(leaderCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, > leaderCore.getSolrConfig().indexConfig.lockType); > - leaderCore.getDirectoryFactory().release(mockDir); > - > - try (HttpSolrClient solrClient = new > HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) { > - for (int i = 0; i < 100; i++) { > - new UpdateRequest() > - .add("id", i + "") > - .process(solrClient); > - solrClient.commit(); > - addedIds.add(i + ""); > - > - for (String file : mockDir.listAll()) { > - if (file.contains("segments_")) continue; > - if (file.endsWith("si")) continue; > - if (file.endsWith("fnm")) continue; > - if (random().nextBoolean()) continue; > - > - try { > - mockDir.corruptFiles(Collections.singleton(file)); > - } catch (RuntimeException | FileNotFoundException | > NoSuchFileException e) { > - // merges can lead to this exception > - } > - } > - } > - } catch (Exception e) { > - log.info("Corrupt leader ex: ", e); > - > - // solrClient.add/commit would throw RemoteSolrException with error > code 500 or > - // 404(when the leader replica is already deleted by giveupLeadership) > - if (e instanceof RemoteSolrException) { > - SolrException se = (SolrException) e; > - assertThat(se.code(), anyOf(is(500), is(404))); > - } else if (!(e instanceof AlreadyClosedException)) { > - throw new RuntimeException("Unexpected exception", e); > + private Replica corruptLeader(String collection) throws IOException, > SolrServerException { > + try { > + TestInjection.leaderTragedy = "true:100"; > + > + DocCollection dc = getCollectionState(collection); > + Replica oldLeader = dc.getLeader("shard1"); > + log.info("Will crash leader : {}", oldLeader); > + > + try (HttpSolrClient solrClient = new > HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) { > + new UpdateRequest().add("id", "99").commit(solrClient, null); > + fail("Should have injected tragedy"); > + } catch (RemoteSolrException e) { > + // solrClient.add would throw RemoteSolrException with code 500 > + // or 404 if the bad replica has already been deleted > + MatcherAssert.assertThat(e.code(), anyOf(is(500), is(404))); > + } catch (AlreadyClosedException e) { > + // If giving up leadership, might be already closed/closing > } > - //else expected > + > + return oldLeader; > + } finally { > + TestInjection.leaderTragedy = null; > } > - return oldLeader; > } > > private Replica getNonLeader(Slice slice) { > @@ -165,8 +140,6 @@ public class LeaderTragicEventTest extends > SolrCloudTestCase { > > @Test > public void testOtherReplicasAreNotActive() throws Exception { > - final String collection = "collection2"; > - cluster.getSolrClient().setDefaultCollection(collection); > int numReplicas = random().nextInt(2) + 1; > // won't do anything if leader is the only one active replica in the > shard > CollectionAdminRequest > @@ -174,7 +147,6 @@ public class LeaderTragicEventTest extends > SolrCloudTestCase { > .process(cluster.getSolrClient()); > cluster.waitForActiveCollection(collection, 1, numReplicas); > > - try { > JettySolrRunner otherReplicaJetty = null; > if (numReplicas == 2) { > Slice shard = getCollectionState(collection).getSlice("shard1"); > @@ -187,7 +159,7 @@ public class LeaderTragicEventTest extends > SolrCloudTestCase { > waitForState("Timeout waiting for replica get down", collection, > (liveNodes, collectionState) -> > getNonLeader(collectionState.getSlice("shard1")).getState() != > Replica.State.ACTIVE); > } > > - Replica oldLeader = corruptLeader(collection, new ArrayList<>()); > + Replica oldLeader = corruptLeader(collection); > > if (otherReplicaJetty != null) { > otherReplicaJetty.start(); > @@ -196,9 +168,6 @@ public class LeaderTragicEventTest extends > SolrCloudTestCase { > > Replica leader = > getCollectionState(collection).getSlice("shard1").getLeader(); > assertEquals(leader.getName(), oldLeader.getName()); > - } finally { > - > CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient()); > - } > } > > > diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java > b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java > new file mode 100644 > index 0000000..ec20cec > --- /dev/null > +++ b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java > @@ -0,0 +1,48 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +package org.apache.solr.cloud; > + > +import org.apache.solr.SolrTestCase; > +import org.apache.solr.client.solrj.cloud.ShardTerms; > +import org.junit.Test; > + > +import java.util.Collections; > +import java.util.HashMap; > +import java.util.Map; > + > +public class ShardTermsTest extends SolrTestCase { > + @Test > + public void testIncreaseTerms() { > + Map<String, Long> map = new HashMap<>(); > + map.put("leader", 0L); > + ShardTerms terms = new ShardTerms(map, 0); > + terms = terms.increaseTerms("leader", Collections.singleton("replica")); > + assertEquals(1L, terms.getTerm("leader").longValue()); > + > + map.put("leader", 2L); > + map.put("live-replica", 2L); > + map.put("dead-replica", 1L); > + terms = new ShardTerms(map, 0); > + assertNull(terms.increaseTerms("leader", > Collections.singleton("dead-replica"))); > + > + terms = terms.increaseTerms("leader", Collections.singleton("leader")); > + assertEquals(3L, terms.getTerm("live-replica").longValue()); > + assertEquals(2L, terms.getTerm("leader").longValue()); > + assertEquals(1L, terms.getTerm("dead-replica").longValue()); > + } > +} > diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java > b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java > index 56ed8ae7..452c0da 100644 > --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java > +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java > @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; > import java.util.function.Supplier; > > import org.apache.solr.client.solrj.SolrServerException; > -import org.apache.solr.client.solrj.cloud.ShardTerms; > import org.apache.solr.client.solrj.request.CollectionAdminRequest; > import org.apache.solr.common.util.TimeSource; > import org.apache.solr.util.TimeOut; > @@ -265,13 +264,6 @@ public class ZkShardTermsTest extends SolrCloudTestCase { > replicaTerms.close(); > } > > - public void testEnsureTermsIsHigher() { > - Map<String, Long> map = new HashMap<>(); > - map.put("leader", 0L); > - ShardTerms terms = new ShardTerms(map, 0); > - terms = terms.increaseTerms("leader", Collections.singleton("replica")); > - assertEquals(1L, terms.getTerm("leader").longValue()); > - } > > public void testSetTermToZero() { > String collection = "setTermToZero"; > diff --git > a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java > b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java > index 3b2f754..cd6ead0 100644 > --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java > +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java > @@ -73,7 +73,6 @@ public class ShardTerms implements MapWriter { > */ > public boolean haveHighestTermValue(String coreNodeName) { > if (values.isEmpty()) return true; > - long maxTerm = Collections.max(values.values()); > return values.getOrDefault(coreNodeName, 0L) == maxTerm; > } > > @@ -92,7 +91,7 @@ public class ShardTerms implements MapWriter { > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not > find leader's term " + leader); > } > > - boolean changed = false; > + boolean saveChanges = false; > boolean foundReplicasInLowerTerms = false; > > HashMap<String, Long> newValues = new HashMap<>(values); > @@ -102,16 +101,16 @@ public class ShardTerms implements MapWriter { > if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = > true; > if (Objects.equals(entry.getValue(), leaderTerm)) { > if(skipIncreaseTermOf(key, replicasNeedingRecovery)) { > - changed = true; > + saveChanges = true; // if we don't skip anybody, then there's no > reason to increment > } else { > - newValues.put(key, leaderTerm+1); > + entry.setValue(leaderTerm + 1); > } > } > } > > // We should skip the optimization if there are no > replicasNeedingRecovery present in local terms, > // this may indicate that the current value is stale > - if (!changed && foundReplicasInLowerTerms) return null; > + if (!saveChanges && foundReplicasInLowerTerms) return null; > return new ShardTerms(newValues, version); > } > > @@ -167,6 +166,12 @@ public class ShardTerms implements MapWriter { > return new ShardTerms(newValues, version); > } > > + /** > + * Return a new {@link ShardTerms} in which the associate term of {@code > coreNodeName} is equal to zero, > + * creating it if it does not previously exist. > + * @param coreNodeName of the replica > + * @return null if the term of {@code coreNodeName} already exists and is > zero > + */ > public ShardTerms setTermToZero(String coreNodeName) { > if (values.getOrDefault(coreNodeName, -1L) == 0) { > return null; > @@ -182,7 +187,6 @@ public class ShardTerms implements MapWriter { > * @return null if term of {@code coreNodeName} is already maximum > */ > public ShardTerms setTermEqualsToLeader(String coreNodeName) { > - long maxTerm = getMaxTerm(); > if (values.get(coreNodeName) == maxTerm) return null; > > HashMap<String, Long> newValues = new HashMap<>(values); > @@ -201,7 +205,6 @@ public class ShardTerms implements MapWriter { > * @return null if {@code coreNodeName} is already marked as doing > recovering > */ > public ShardTerms startRecovering(String coreNodeName) { > - long maxTerm = getMaxTerm(); > if (values.get(coreNodeName) == maxTerm) > return null; > > @@ -246,7 +249,7 @@ public class ShardTerms implements MapWriter { > return version; > } > > - public Map<String , Long> getTerms() { > + public Map<String, Long> getTerms() { > return new HashMap<>(this.values); > } > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
