On Fri, Feb 5, 2016 at 11:23 AM, Uwe Schindler <[email protected]> wrote: > Hi Yonik, > > I committed a change yesterday. Could you backports this, too? It broke the > forbidden tests' because it used currentTimeMillis instead of RTimer.
Ah, I hadn't noticed (I got busy between the commit to trunk yesterday and the backport to 5x today). Will do. -Yonik > Uwe > > Am 5. Februar 2016 17:14:34 MEZ, schrieb [email protected]: >> >> Repository: lucene-solr >> Updated Branches: >> refs/heads/branch_5x 482b40f84 -> ff83a4001 >> >> >> SOLR-8586: add index fingerprinting and use it in peersync >> (cherry picked from commit 629767b) >> >> >> Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo >> Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400 >> Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400 >> Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400 >> >> Branch: refs/heads/branch_5x >> Commit: ff83a400156beb6a8dd2d0845c7f878c28431739 >> Parents: 482b40f >> Author: yonik >> <[email protected]> >> Authored: Thu Feb 4 14:54:08 2016 -0500 >> Committer: yonik <[email protected]> >> Committed: Fri Feb 5 11:11:34 2016 -0500 >> >> ________________________________ >> >> solr/CHANGES.txt | 4 + >> .../org/apache/solr/cloud/SyncStrategy.java | 2 +- >> .../handler/component/RealTimeGetComponent.java | 8 + >> .../apache/solr/update/IndexFingerprint.java | 232 +++++++++++++++++++ >> .../java/org/apache/solr/update/PeerSync.java | 78 +++++-- >> .../java/org/apache/solr/update/UpdateLog.java | 85 +++---- >> .../org/apache/solr/update/PeerSyncTest.java | 23 ++ >> 7 files changed, 374 insertions(+), 58 deletions(-) >> ________________________________ >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt >> ________________________________ >> >> diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt >> index >> f2067c6..2b7b46e 100644 >> --- a/solr/CHANGES.txt >> +++ b/solr/CHANGES.txt >> @@ -119,6 +119,10 @@ New Features >> * SOLR-8415: Provide command to switch between non/secure mode in ZK >> (Mike Drob, Gregory Chanan) >> >> +* SOLR-8586: added index fingerprint, a hash over all versions currently >> in the index. >> + PeerSync now uses this to check if replicas are in sync. (yonik) >> + >> + >> Bug Fixes >> ---------------------- >> >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >> ________________________________ >> >> diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >> b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >> index 7a16598..d811f5c 100644 >> --- >> a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >> +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >> @@ -173,7 +173,7 @@ public class SyncStrategy { >> // if we can't reach a replica for sync, we still consider the >> overall sync a success >> // TODO: as an assurance, we should still try and tell the sync nodes >> that we couldn't reach >> // to recover once more? >> - PeerSync peerSync = new PeerSync(core, syncWith, >> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, >> peerSyncOnlyWithActive); >> + PeerSync peerSync = new PeerSync(core, syncWith, >> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, >> peerSyncOnlyWithActive, false); >> return peerSync.sync(); >> } >> >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >> ________________________________ >> >> diff --git >> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >> index 2bbf5a2..14a4185 100644 >> --- >> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >> +++ >> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >> @@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields; >> import org.apache.solr.search.SolrIndexSearcher; >> import org.apache.solr.search.SolrReturnFields; >> import org.apache.solr.update.DocumentBuilder; >> +import org.apache.solr.update.IndexFingerprint; >> >> import org.apache.solr.update.PeerSync; >> import org.apache.solr.update.UpdateLog; >> import org.apache.solr.util.RefCounted; >> @@ -536,6 +537,8 @@ public class RealTimeGetComponent extends >> SearchComponent >> int nVersions = params.getInt("getVersions", -1); >> if (nVersions == -1) return; >> >> + boolean doFingerprint = params.getBool("fingerprint", false); >> + >> String sync = params.get("sync"); >> if (sync != null) { >> processSync(rb, nVersions, sync); >> @@ -548,6 +551,11 @@ public class RealTimeGetComponent extends >> SearchComponent >> try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) >> { >> rb.rsp.add("versions", recentUpdates.getVersions(nVersions)); >> } >> + >> + if (doFingerprint) { >> + IndexFingerprint fingerprint = >> IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE); >> + rb.rsp.add("fingerprint", fingerprint.toObject()); >> + } >> } >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >> ________________________________ >> >> diff --git >> a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >> b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >> new file mode 100644 >> index 0000000..c73b57b >> --- /dev/null >> +++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >> @@ -0,0 +1,232 @@ >> +package org.apache.solr.update; >> + >> +/* >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, >> Version 2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +import java.io.IOException; >> +import java.lang.invoke.MethodHandles; >> +import java.net.ConnectException; >> +import java.net.SocketException; >> +import java.util.ArrayList; >> +import java.util.Collections; >> +import >> java.util.Comparator; >> +import java.util.HashSet; >> +import java.util.LinkedHashMap; >> +import java.util.List; >> +import java.util.Map; >> +import java.util.Set; >> + >> +import org.apache.http.NoHttpResponseException; >> +import org.apache.http.client.HttpClient; >> +import org.apache.http.conn.ConnectTimeoutException; >> +import org.apache.lucene.index.LeafReader; >> +import org.apache.lucene.index.LeafReaderContext; >> +import org.apache.lucene.queries.function.FunctionValues; >> +import org.apache.lucene.queries.function.ValueSource; >> +import org.apache.lucene.util.Bits; >> +import org.apache.lucene.util.BytesRef; >> +import org.apache.solr.client.solrj.SolrServerException; >> +import org.apache.solr.cloud.ZkController; >> +import org.apache.solr.common.SolrException; >> +import org.apache.solr.common.SolrInputDocument; >> +import org.apache.solr.common.params.ModifiableSolrParams; >> +import org.apache.solr.common.util.Hash; >> +import org.apache.solr.common.util.NamedList; >> +import org.apache.solr.common.util.StrUtils; >> +import org.apache.solr.core.SolrCore; >> +import org.apache.solr.handler.component.HttpShardHandlerFactory; >> +import org.apache.solr.handler.component.ShardHandler; >> +import org.apache.solr.handler.component.ShardHandlerFactory; >> +import org.apache.solr.handler.component.ShardRequest; >> +import org.apache.solr.handler.component.ShardResponse; >> +import org.apache.solr.logging.MDCLoggingContext; >> +import org.apache.solr.request.LocalSolrQueryRequest; >> +import org.apache.solr.request.SolrQueryRequest; >> +import org.apache.solr.response.SolrQueryResponse; >> +import org.apache.solr.schema.SchemaField; >> +import org.apache.solr.search.SolrIndexSearcher; >> +import org.apache.solr.update.processor.UpdateRequestProcessor; >> +import org.apache.solr.update.processor.UpdateRequestProcessorChain; >> +import org.apache.solr.util.RefCounted; >> +import org.slf4j.Logger; >> +import org.slf4j.LoggerFactory; >> + >> +import static >> org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; >> +import static >> org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; >> + >> +/** @lucene.internal */ >> +public class IndexFingerprint { >> + private static final Logger log = >> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); >> + >> + private long maxVersionSpecified; >> + private long maxVersionEncountered; >> + private long maxInHash; >> + private long versionsHash; >> + private long numVersions; >> + private long numDocs; >> + private long maxDoc; >> + >> + public long getMaxVersionSpecified() { >> + return maxVersionSpecified; >> + } >> + >> + public long getMaxVersionEncountered() { >> + return maxVersionEncountered; >> + } >> + >> + public long getMaxInHash() { >> + return >> maxInHash; >> + } >> + >> + public long getVersionsHash() { >> + return versionsHash; >> + } >> + >> + public long getNumVersions() { >> + return numVersions; >> + } >> + >> + public long getNumDocs() { >> + return numDocs; >> + } >> + >> + public long getMaxDoc() { >> + return maxDoc; >> + } >> + >> + /** Opens a new realtime searcher and returns it's fingerprint */ >> + public static IndexFingerprint getFingerprint(SolrCore core, long >> maxVersion) throws IOException { >> + core.getUpdateHandler().getUpdateLog().openRealtimeSearcher(); >> + RefCounted<SolrIndexSearcher> newestSearcher = >> core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher(); >> + try { >> + return getFingerprint(newestSearcher.get(), maxVersion); >> + } finally { >> + if (newestSearcher != null) { >> + newestSearcher.decref(); >> + } >> + } >> + } >> + >> + public >> static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long >> maxVersion) throws IOException { >> + long start = System.currentTimeMillis(); >> + >> + SchemaField versionField = >> VersionInfo.getAndCheckVersionField(searcher.getSchema()); >> + >> + IndexFingerprint f = new IndexFingerprint(); >> + f.maxVersionSpecified = maxVersion; >> + f.maxDoc = searcher.maxDoc(); >> + >> + // TODO: this could be parallelized, or even cached per-segment if >> performance becomes an issue >> + ValueSource vs = versionField.getType().getValueSource(versionField, >> null); >> + Map funcContext = ValueSource.newContext(searcher); >> + vs.createWeight(funcContext, searcher); >> + for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) >> { >> + int maxDoc = ctx.reader().maxDoc(); >> + f.numDocs += ctx.reader().numDocs(); >> + Bits liveDocs = ctx.reader().getLiveDocs(); >> + FunctionValues fv = >> vs.getValues(funcContext, ctx); >> + for (int doc = 0; doc < maxDoc; doc++) { >> + if (liveDocs != null && !liveDocs.get(doc)) continue; >> + long v = fv.longVal(doc); >> + f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered); >> + if (v <= f.maxVersionSpecified) { >> + f.maxInHash = Math.max(v, f.maxInHash); >> + f.versionsHash += Hash.fmix64(v); >> + f.numVersions++; >> + } >> + } >> + } >> + >> + long end = System.currentTimeMillis(); >> + log.info("IndexFingerprint millis:" + (end-start) + " result:" + f); >> + >> + return f; >> + } >> + >> + /** returns 0 for equal, negative if f1 is less recent than f2, >> positive if more recent */ >> + public static int compare(IndexFingerprint f1, IndexFingerprint f2) { >> + int cmp; >> + >> + // NOTE: some way want number of docs in index to take >> precedence over highest version (add-only systems for sure) >> + >> + // if we're comparing all of the versions in the index, then go by >> the highest encountered. >> + if (f1.maxVersionSpecified == Long.MAX_VALUE) { >> + cmp = Long.compare(f1.maxVersionEncountered, >> f2.maxVersionEncountered); >> + if (cmp != 0) return cmp; >> + } >> + >> + // Go by the highest version under the requested max. >> + cmp = Long.compare(f1.maxInHash, f2.maxInHash); >> + if (cmp != 0) return cmp; >> + >> + // go by who has the most documents in the index >> + cmp = Long.compare(f1.numVersions, f2.numVersions); >> + if (cmp != 0) return cmp; >> + >> + // both have same number of documents, so go by hash >> + cmp = Long.compare(f1.versionsHash, f2.versionsHash); >> + return cmp; >> + } >> + >> + /** >> + * Create a generic object suitable for serializing with >> ResponseWriters >> + */ >> + public Object >> toObject() { >> + Map<String,Object> map = new LinkedHashMap<>(); >> + map.put("maxVersionSpecified", maxVersionSpecified); >> + map.put("maxVersionEncountered", maxVersionEncountered); >> + map.put("maxInHash", maxInHash); >> + map.put("versionsHash", versionsHash); >> + map.put("numVersions", numVersions); >> + map.put("numDocs", numDocs); >> + map.put("maxDoc", maxDoc); >> + return map; >> + } >> + >> + private static long getLong(Object o, String key, long def) { >> + long v = def; >> + >> + Object oval = null; >> + if (o instanceof Map) { >> + oval = ((Map)o).get(key); >> + } else if (o instanceof NamedList) { >> + oval = ((NamedList)o).get(key); >> + } >> + >> + return oval != null ? ((Number)oval).longValue() : def; >> + } >> + >> + /** >> + * Create an IndexFingerprint object from a deserialized generic object >> (Map or NamedList) >> + */ >> + >> public static IndexFingerprint fromObject(Object o) { >> + IndexFingerprint f = new IndexFingerprint(); >> + f.maxVersionSpecified = getLong(o, "maxVersionSpecified", >> Long.MAX_VALUE); >> + f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1); >> + f.maxInHash = getLong(o, "maxInHash", -1); >> + f.versionsHash = getLong(o, "versionsHash", -1); >> + f.numVersions = getLong(o, "numVersions", -1); >> + f.numDocs = getLong(o, "numDocs", -1); >> + f.maxDoc = getLong(o, "maxDoc", -1); >> + return f; >> + } >> + >> + @Override >> + public String toString() { >> + return toObject().toString(); >> + } >> +} >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java >> ________________________________ >> >> diff --git >> a/solr/core/src/java/org/apache/solr/update/PeerSync.java >> b/solr/core/src/java/org/apache/solr/update/PeerSync.java >> index c5ddaf0..dbc0091 100644 >> --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java >> +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java >> @@ -68,6 +68,7 @@ public class PeerSync { >> private UpdateLog ulog; >> private HttpShardHandlerFactory shardHandlerFactory; >> private ShardHandler shardHandler; >> + private List<SyncShardRequest> requests = new ArrayList<>(); >> >> private List<Long> startingVersions; >> >> @@ -76,8 +77,10 @@ public class PeerSync { >> private Set<Long> requestedUpdateSet; >> private long ourLowThreshold; // 20th percentile >> private long ourHighThreshold; // 80th percentile >> + private long ourHighest; // currently just used for logging/debugging >> purposes >> private final boolean cantReachIsSuccess; >> private final >> boolean getNoVersionsIsSuccess; >> + private final boolean doFingerprint; >> private final HttpClient client; >> private final boolean onlyIfActive; >> private SolrCore core; >> @@ -116,6 +119,8 @@ public class PeerSync { >> >> private static class SyncShardRequest extends ShardRequest { >> List<Long> reportedVersions; >> + IndexFingerprint fingerprint; >> + boolean doFingerprintComparison; >> List<Long> requestedUpdates; >> Exception updateException; >> } >> @@ -125,16 +130,17 @@ public class PeerSync { >> } >> >> public PeerSync(SolrCore core, List<String> replicas, int nUpdates, >> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) { >> - this(core, replicas, nUpdates, cantReachIsSuccess, >> getNoVersionsIsSuccess, false); >> + this(core, replicas, nUpdates, cantReachIsSuccess, >> getNoVersionsIsSuccess, false, true); >> } >> >> - public PeerSync(SolrCore >> core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, >> boolean getNoVersionsIsSuccess, boolean onlyIfActive) { >> + public PeerSync(SolrCore core, List<String> replicas, int nUpdates, >> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean >> onlyIfActive, boolean doFingerprint) { >> this.core = core; >> this.replicas = replicas; >> this.nUpdates = nUpdates; >> this.maxUpdates = nUpdates; >> this.cantReachIsSuccess = cantReachIsSuccess; >> this.getNoVersionsIsSuccess = getNoVersionsIsSuccess; >> + this.doFingerprint = doFingerprint; >> this.client = >> core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient(); >> this.onlyIfActive = onlyIfActive; >> >> @@ -169,9 +175,8 @@ public class PeerSync { >> return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" "; >> } >> >> - /** Returns true if peer sync was successful, >> meaning that this core may not be considered to have the latest updates >> - * when considering the last N updates between it and its peers. >> - * A commit is not performed. >> + /** Returns true if peer sync was successful, meaning that this core >> may be considered to have the latest updates. >> + * It does not mean that the remote replica is in sync with us. >> */ >> public boolean sync() { >> if (ulog == null) { >> @@ -210,7 +215,7 @@ public class PeerSync { >> >> ourLowThreshold = percentile(startingVersions, 0.8f); >> ourHighThreshold = percentile(startingVersions, 0.2f); >> - >> + >> // now make sure that the starting updates overlap our updates >> // there shouldn't be reorders, so any overlap will do. >> >> @@ -231,6 +236,7 @@ public class PeerSync { >> } >> >> ourUpdates = newList; >> + >> Collections.sort(ourUpdates, absComparator); >> } else { >> >> if (ourUpdates.size() > 0) { >> @@ -243,9 +249,10 @@ public class PeerSync { >> return false; >> } >> } >> - >> + >> + ourHighest = ourUpdates.get(0); >> ourUpdateSet = new HashSet<>(ourUpdates); >> - requestedUpdateSet = new HashSet<>(ourUpdates); >> + requestedUpdateSet = new HashSet<>(); >> >> for (;;) { >> ShardResponse srsp = shardHandler.takeCompletedOrError(); >> @@ -257,9 +264,18 @@ public class PeerSync { >> return false; >> } >> } >> - >> - log.info(msg() + "DONE. sync succeeded"); >> - return true; >> + >> + // finish up any comparisons with other shards that we deferred >> + boolean success = true; >> + for (SyncShardRequest sreq : requests) >> { >> + if (sreq.doFingerprintComparison) { >> + success = compareFingerprint(sreq); >> + if (!success) break; >> + } >> + } >> + >> + log.info(msg() + "DONE. sync " + (success ? "succeeded" : >> "failed")); >> + return success; >> } finally { >> MDCLoggingContext.clear(); >> } >> @@ -267,6 +283,7 @@ public class PeerSync { >> >> private void requestVersions(String replica) { >> SyncShardRequest sreq = new SyncShardRequest(); >> + requests.add(sreq); >> sreq.purpose = 1; >> sreq.shards = new String[]{replica}; >> sreq.actualShards = sreq.shards; >> @@ -274,6 +291,7 @@ public class PeerSync { >> sreq.params.set("qt","/get"); >> sreq.params.set("distrib",false); >> sreq.params.set("getVersions",nUpdates); >> + sreq.params.set("fingerprint",doFingerprint); >> shardHandler.submit(sreq, replica, >> sreq.params); >> } >> >> @@ -355,7 +373,12 @@ public class PeerSync { >> SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest(); >> sreq.reportedVersions = otherVersions; >> >> - log.info(msg() + " Received " + otherVersions.size() + " versions >> from " + sreq.shards[0] ); >> + Object fingerprint = >> srsp.getSolrResponse().getResponse().get("fingerprint"); >> + >> + log.info(msg() + " Received " + otherVersions.size() + " versions >> from " + sreq.shards[0] + " fingerprint:" + fingerprint ); >> + if (fingerprint != null) { >> + sreq.fingerprint = IndexFingerprint.fromObject(fingerprint); >> + } >> >> if (otherVersions.size() == 0) { >> return getNoVersionsIsSuccess; >> @@ -371,13 +394,14 @@ public class PeerSync { >> >> long otherHigh = percentile(otherVersions, .2f); >> long otherLow = >> percentile(otherVersions, .8f); >> + long otherHighest = otherVersions.get(0); >> >> if (ourHighThreshold < otherLow) { >> // Small overlap between version windows and ours is older >> // This means that we might miss updates if we attempted to use >> this method. >> // Since there exists just one replica that is so much newer, we >> must >> // fail the sync. >> - log.info(msg() + " Our versions are too old. >> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow); >> + log.info(msg() + " Our versions are too old. >> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " >> ourHighest=" + ourHighest + " otherHighest=" + otherHighest); >> return false; >> } >> >> @@ -385,7 +409,10 @@ public class PeerSync { >> // Small overlap between windows and ours is newer. >> // Using this list to sync would >> result in requesting/replaying results we don't need >> // and possibly bringing deleted docs back to life. >> - log.info(msg() + " Our versions are newer. >> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh); >> + log.info(msg() + " Our versions are newer. >> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" >> + ourHighest + " otherHighest=" + otherHighest); >> + >> + // Because our versions are newer, IndexFingerprint with the remote >> would not match us. >> + // We return true on our side, but the remote peersync with us >> should fail. >> return true; >> } >> >> @@ -408,9 +435,15 @@ public class PeerSync { >> sreq.requestedUpdates = toRequest; >> >> if (toRequest.isEmpty()) { >> - log.info(msg() + " Our versions are newer. >> ourLowThreshold="+ourLowThreshold + " >> otherHigh="+otherHigh); >> + log.info(msg() + " No additional versions requested. >> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" >> + ourHighest + " otherHighest=" + otherHighest); >> >> // we had (or already requested) all the updates referenced by the >> replica >> + >> + // If we requested updates from another replica, we can't compare >> fingerprints yet with this replica, we need to defer >> + if (doFingerprint) { >> + sreq.doFingerprintComparison = true; >> + } >> + >> return true; >> } >> >> @@ -422,6 +455,19 @@ public class PeerSync { >> return requestUpdates(srsp, toRequest); >> } >> >> + private boolean compareFingerprint(SyncShardRequest sreq) { >> + if (sreq.fingerprint == null) return true; >> + try { >> + IndexFingerprint ourFingerprint = >> IndexFingerprint.getFingerprint(core, Long.MAX_VALUE); >> + >> int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint); >> + log.info("Fingerprint comparison: " + cmp); >> + return cmp == 0; // currently, we only check for equality... >> + } catch(IOException e){ >> + log.error(msg() + "Error getting index fingerprint", e); >> + return false; >> + } >> + } >> + >> private boolean requestUpdates(ShardResponse srsp, List<Long> >> toRequest) { >> String replica = srsp.getShardRequest().shards[0]; >> >> @@ -556,7 +602,7 @@ public class PeerSync { >> } >> } >> >> - return true; >> + return compareFingerprint(sreq); >> } >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java >> ________________________________ >> >> diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java >> b/solr/core/src/java/org/apache/solr/update/UpdateLog.java >> index 7d7d3ef..7e3fc9f 100644 >> --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java >> +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java >> @@ -283,7 +283,7 @@ public class UpdateLog implements >> PluginInfoInitialized { >> if (debug) { >> log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing >> tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id); >> } >> - >> + >> TransactionLog oldLog = null; >> for (String oldLogName : tlogFiles) { >> File f = new File(tlogDir, oldLogName); >> @@ -334,11 +334,11 @@ public class UpdateLog implements >> PluginInfoInitialized { >> } >> >> } >> - >> + >> public String getLogDir() { >> return tlogDir.getAbsolutePath(); >> } >> - >> + >> public List<Long> >> getStartingVersions() { >> return startingVersions; >> } >> @@ -503,32 +503,35 @@ public class UpdateLog implements >> PluginInfoInitialized { >> if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { >> // given that we just did a delete-by-query, we don't know what >> documents were >> // affected and hence we must purge our caches. >> - if (map != null) map.clear(); >> - if (prevMap != null) prevMap.clear(); >> - if (prevMap2 != null) prevMap2.clear(); >> - >> + openRealtimeSearcher(); >> trackDeleteByQuery(cmd.getQuery(), cmd.getVersion()); >> >> - // oldDeletes.clear(); >> - >> - // We must cause a new IndexReader to be opened before anything >> looks at these caches again >> - // so that a cache miss will read fresh data. >> - // >> - // TODO: FUTURE: open a new searcher lazily for better throughput >> with delete-by-query commands >> - >> try { >> - RefCounted<SolrIndexSearcher> holder = >> uhandler.core.openNewSearcher(true, true); >> - holder.decref(); >> - } catch (Exception e) { >> - SolrException.log(log, "Error opening realtime searcher for >> deleteByQuery", e); >> + if (trace) { >> + LogPtr ptr = new LogPtr(pos, cmd.getVersion()); >> + log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + >> tlog + " " + ptr + " map=" + System.identityHashCode(map)); >> } >> - >> } >> + } >> + } >> >> - LogPtr ptr = new LogPtr(pos, cmd.getVersion()); >> - >> - if (trace) { >> - log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + >> tlog + " " + ptr + " map=" + System.identityHashCode(map)); >> + /** Opens a new realtime searcher and clears the id caches. >> + * This may also be called when we updates are being buffered (from >> PeerSync/IndexFingerprint) >> + */ >> + >> public void openRealtimeSearcher() { >> + synchronized (this) { >> + // We must cause a new IndexReader to be opened before anything >> looks at these caches again >> + // so that a cache miss will read fresh data. >> + try { >> + RefCounted<SolrIndexSearcher> holder = >> uhandler.core.openNewSearcher(true, true); >> + holder.decref(); >> + } catch (Exception e) { >> + SolrException.log(log, "Error opening realtime searcher", e); >> + return; >> } >> + >> + if (map != null) map.clear(); >> + if (prevMap != null) prevMap.clear(); >> + if (prevMap2 != null) prevMap2.clear(); >> } >> } >> >> @@ -620,7 +623,7 @@ public class UpdateLog implements >> PluginInfoInitialized { >> public boolean hasUncommittedChanges() { >> return tlog != null; >> } >> - >> + >> public void preCommit(CommitUpdateCommand cmd) { >> synchronized (this) { >> >> if (debug) { >> @@ -878,11 +881,11 @@ public class UpdateLog implements >> PluginInfoInitialized { >> theLog.forceClose(); >> } >> } >> - >> + >> public void close(boolean committed) { >> close(committed, false); >> } >> - >> + >> public void close(boolean committed, boolean deleteOnClose) { >> synchronized (this) { >> recoveryExecutor.shutdown(); // no new tasks >> @@ -923,7 +926,7 @@ public class UpdateLog implements >> PluginInfoInitialized { >> this.id = id; >> } >> } >> - >> + >> public class RecentUpdates implements Closeable { >> >> final Deque<TransactionLog> logList; // newest first >> @@ -950,17 +953,17 @@ public class UpdateLog implements >> PluginInfoInitialized { >> >> public List<Long> getVersions(int n) { >> List<Long> ret = new ArrayList<>(n); >> - >> + >> for >> (List<Update> singleList : updateList) { >> for (Update ptr : singleList) { >> ret.add(ptr.version); >> if (--n <= 0) return ret; >> } >> } >> - >> + >> return ret; >> } >> - >> + >> public Object lookup(long version) { >> Update update = updates.get(version); >> if (update == null) return null; >> @@ -1004,7 +1007,7 @@ public class UpdateLog implements >> PluginInfoInitialized { >> try { >> o = reader.next(); >> if (o==null) break; >> - >> + >> // should currently be a List<Oper,Ver,Doc/Id> >> List entry = (List)o; >> >> @@ -1027,13 +1030,13 @@ public class UpdateLog implements >> PluginInfoInitialized { >> >> updatesForLog.add(update); >> updates.put(version, update); >> - >> + >> >> if (oper == UpdateLog.DELETE_BY_QUERY) { >> deleteByQueryList.add(update); >> } else if (oper == UpdateLog.DELETE) { >> deleteList.add(new DeleteUpdate(version, >> (byte[])entry.get(2))); >> } >> - >> + >> break; >> >> case UpdateLog.COMMIT: >> @@ -1063,7 +1066,7 @@ public class UpdateLog implements >> PluginInfoInitialized { >> } >> >> } >> - >> + >> @Override >> public void close() { >> for (TransactionLog log : logList) { >> @@ -1331,10 +1334,10 @@ public class UpdateLog implements >> PluginInfoInitialized { >> "log replay status {} active={} starting pos={} >> current pos={} current size={} % read={}", >> translog, activeLog, >> recoveryInfo.positionOfStart, cpos, csize, >> Math.floor(cpos / >> (double) csize * 100.)); >> - >> + >> } >> } >> - >> + >> o = null; >> o = tlogReader.next(); >> if (o == null && activeLog) { >> @@ -1513,25 +1516,25 @@ public class UpdateLog implements >> PluginInfoInitialized { >> } >> } >> } >> - >> + >> protected String getTlogDir(SolrCore core, PluginInfo info) { >> String dataDir = (String) info.initArgs.get("dir"); >> - >> + >> String ulogDir = core.getCoreDescriptor().getUlogDir(); >> if (ulogDir != null) { >> dataDir = ulogDir; >> } >> - >> + >> if (dataDir == null || dataDir.length() == 0) { >> dataDir = core.getDataDir(); >> } >> >> return dataDir + "/" + TLOG_NAME; >> } >> - >> + >> /** >> * Clears the logs on the file system. Only call before init. >> - * >> + * >> >> * @param core the SolrCore >> * @param ulogPluginInfo the init info for the UpdateHandler >> */ >> >> >> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >> ________________________________ >> >> diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >> b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >> index 8083ad0..bcaf846 100644 >> --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >> +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >> @@ -168,6 +168,29 @@ public class PeerSyncTest extends >> BaseDistributedSearchTestCase { >> >> assertSync(client1, numVersions, true, shardsArr[0]); >> client0.commit(); client1.commit(); queryAndCompare(params("q", >> "*:*", "sort","_version_ desc"), client0, >> client1); >> + >> + // now lets check fingerprinting causes appropriate fails >> + v = 4000; >> + add(client0, seenLeader, >> sdoc("id",Integer.toString((int)v),"_version_",v)); >> + toAdd = numVersions+10; >> + for (int i=0; i<toAdd; i++) { >> + add(client0, seenLeader, >> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1)); >> + add(client1, seenLeader, >> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1)); >> + } >> + >> + // client0 now has an additional add beyond our window and the >> fingerprint should cause this to fail >> + assertSync(client1, numVersions, false, shardsArr[0]); >> + >> + // lets add the missing document and verify that order doesn't matter >> + add(client1, seenLeader, >> sdoc("id",Integer.toString((int)v),"_version_",v)); >> + assertSync(client1, numVersions, true, shardsArr[0]); >> + >> + // lets do some overwrites to ensure that repeated updates and maxDoc >> don't >> matter >> + for (int i=0; i<10; i++) { >> + add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + >> 1), "_version_", v + i + 1)); >> + } >> + assertSync(client1, numVersions, true, shardsArr[0]); >> + >> } >> >> >> > > -- > Uwe Schindler > H.-H.-Meier-Allee 63, 28213 Bremen > http://www.thetaphi.de --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
