Hi Yonik, I committed a change yesterday. Could you backports this, too? It broke the forbidden tests' because it used currentTimeMillis instead of RTimer.
Uwe Am 5. Februar 2016 17:14:34 MEZ, schrieb [email protected]: >Repository: lucene-solr >Updated Branches: > refs/heads/branch_5x 482b40f84 -> ff83a4001 > > >SOLR-8586: add index fingerprinting and use it in peersync >(cherry picked from commit 629767b) > > >Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo >Commit: >http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400 >Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400 >Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400 > >Branch: refs/heads/branch_5x >Commit: ff83a400156beb6a8dd2d0845c7f878c28431739 >Parents: 482b40f >Author: yonik <[email protected]> >Authored: Thu Feb 4 14:54:08 2016 -0500 >Committer: yonik <[email protected]> >Committed: Fri Feb 5 11:11:34 2016 -0500 > >---------------------------------------------------------------------- > solr/CHANGES.txt | 4 + > .../org/apache/solr/cloud/SyncStrategy.java | 2 +- > .../handler/component/RealTimeGetComponent.java | 8 + >.../apache/solr/update/IndexFingerprint.java | 232 >+++++++++++++++++++ > .../java/org/apache/solr/update/PeerSync.java | 78 +++++-- > .../java/org/apache/solr/update/UpdateLog.java | 85 +++---- > .../org/apache/solr/update/PeerSyncTest.java | 23 ++ > 7 files changed, 374 insertions(+), 58 deletions(-) >---------------------------------------------------------------------- > > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt >---------------------------------------------------------------------- >diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt >index f2067c6..2b7b46e 100644 >--- a/solr/CHANGES.txt >+++ b/solr/CHANGES.txt >@@ -119,6 +119,10 @@ New Features > * SOLR-8415: Provide command to switch between non/secure mode in ZK > (Mike Drob, Gregory Chanan) > >+* SOLR-8586: added index fingerprint, a hash over all versions >currently in the index. >+ PeerSync now uses this to check if replicas are in sync. (yonik) >+ >+ > Bug Fixes > ---------------------- > > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >---------------------------------------------------------------------- >diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >index 7a16598..d811f5c 100644 >--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java >@@ -173,7 +173,7 @@ public class SyncStrategy { >// if we can't reach a replica for sync, we still consider the overall >sync a success >// TODO: as an assurance, we should still try and tell the sync nodes >that we couldn't reach > // to recover once more? >- PeerSync peerSync = new PeerSync(core, syncWith, >core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, >true, peerSyncOnlyWithActive); >+ PeerSync peerSync = new PeerSync(core, syncWith, >core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, >true, peerSyncOnlyWithActive, false); > return peerSync.sync(); > } > > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >---------------------------------------------------------------------- >diff --git >a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >index 2bbf5a2..14a4185 100644 >--- >a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >+++ >b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java >@@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields; > import org.apache.solr.search.SolrIndexSearcher; > import org.apache.solr.search.SolrReturnFields; > import org.apache.solr.update.DocumentBuilder; >+import org.apache.solr.update.IndexFingerprint; > import org.apache.solr.update.PeerSync; > import org.apache.solr.update.UpdateLog; > import org.apache.solr.util.RefCounted; >@@ -536,6 +537,8 @@ public class RealTimeGetComponent extends >SearchComponent > int nVersions = params.getInt("getVersions", -1); > if (nVersions == -1) return; > >+ boolean doFingerprint = params.getBool("fingerprint", false); >+ > String sync = params.get("sync"); > if (sync != null) { > processSync(rb, nVersions, sync); >@@ -548,6 +551,11 @@ public class RealTimeGetComponent extends >SearchComponent >try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { > rb.rsp.add("versions", recentUpdates.getVersions(nVersions)); > } >+ >+ if (doFingerprint) { >+ IndexFingerprint fingerprint = >IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE); >+ rb.rsp.add("fingerprint", fingerprint.toObject()); >+ } > } > > > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >---------------------------------------------------------------------- >diff --git >a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >new file mode 100644 >index 0000000..c73b57b >--- /dev/null >+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java >@@ -0,0 +1,232 @@ >+package org.apache.solr.update; >+ >+/* >+ * Licensed to the Apache Software Foundation (ASF) under one or more >+ * contributor license agreements. See the NOTICE file distributed >with >+ * this work for additional information regarding copyright ownership. >+ * The ASF licenses this file to You under the Apache License, Version >2.0 >+ * (the "License"); you may not use this file except in compliance >with >+ * the License. You may obtain a copy of the License at >+ * >+ * http://www.apache.org/licenses/LICENSE-2.0 >+ * >+ * Unless required by applicable law or agreed to in writing, software >+ * distributed under the License is distributed on an "AS IS" BASIS, >+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >implied. >+ * See the License for the specific language governing permissions and >+ * limitations under the License. >+ */ >+ >+import java.io.IOException; >+import java.lang.invoke.MethodHandles; >+import java.net.ConnectException; >+import java.net.SocketException; >+import java.util.ArrayList; >+import java.util.Collections; >+import java.util.Comparator; >+import java.util.HashSet; >+import java.util.LinkedHashMap; >+import java.util.List; >+import java.util.Map; >+import java.util.Set; >+ >+import org.apache.http.NoHttpResponseException; >+import org.apache.http.client.HttpClient; >+import org.apache.http.conn.ConnectTimeoutException; >+import org.apache.lucene.index.LeafReader; >+import org.apache.lucene.index.LeafReaderContext; >+import org.apache.lucene.queries.function.FunctionValues; >+import org.apache.lucene.queries.function.ValueSource; >+import org.apache.lucene.util.Bits; >+import org.apache.lucene.util.BytesRef; >+import org.apache.solr.client.solrj.SolrServerException; >+import org.apache.solr.cloud.ZkController; >+import org.apache.solr.common.SolrException; >+import org.apache.solr.common.SolrInputDocument; >+import org.apache.solr.common.params.ModifiableSolrParams; >+import org.apache.solr.common.util.Hash; >+import org.apache.solr.common.util.NamedList; >+import org.apache.solr.common.util.StrUtils; >+import org.apache.solr.core.SolrCore; >+import org.apache.solr.handler.component.HttpShardHandlerFactory; >+import org.apache.solr.handler.component.ShardHandler; >+import org.apache.solr.handler.component.ShardHandlerFactory; >+import org.apache.solr.handler.component.ShardRequest; >+import org.apache.solr.handler.component.ShardResponse; >+import org.apache.solr.logging.MDCLoggingContext; >+import org.apache.solr.request.LocalSolrQueryRequest; >+import org.apache.solr.request.SolrQueryRequest; >+import org.apache.solr.response.SolrQueryResponse; >+import org.apache.solr.schema.SchemaField; >+import org.apache.solr.search.SolrIndexSearcher; >+import org.apache.solr.update.processor.UpdateRequestProcessor; >+import org.apache.solr.update.processor.UpdateRequestProcessorChain; >+import org.apache.solr.util.RefCounted; >+import org.slf4j.Logger; >+import org.slf4j.LoggerFactory; >+ >+import static >org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; >+import static >org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; >+ >+/** @lucene.internal */ >+public class IndexFingerprint { >+ private static final Logger log = >LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); >+ >+ private long maxVersionSpecified; >+ private long maxVersionEncountered; >+ private long maxInHash; >+ private long versionsHash; >+ private long numVersions; >+ private long numDocs; >+ private long maxDoc; >+ >+ public long getMaxVersionSpecified() { >+ return maxVersionSpecified; >+ } >+ >+ public long getMaxVersionEncountered() { >+ return maxVersionEncountered; >+ } >+ >+ public long getMaxInHash() { >+ return maxInHash; >+ } >+ >+ public long getVersionsHash() { >+ return versionsHash; >+ } >+ >+ public long getNumVersions() { >+ return numVersions; >+ } >+ >+ public long getNumDocs() { >+ return numDocs; >+ } >+ >+ public long getMaxDoc() { >+ return maxDoc; >+ } >+ >+ /** Opens a new realtime searcher and returns it's fingerprint */ >+ public static IndexFingerprint getFingerprint(SolrCore core, long >maxVersion) throws IOException { >+ core.getUpdateHandler().getUpdateLog().openRealtimeSearcher(); >+ RefCounted<SolrIndexSearcher> newestSearcher = >core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher(); >+ try { >+ return getFingerprint(newestSearcher.get(), maxVersion); >+ } finally { >+ if (newestSearcher != null) { >+ newestSearcher.decref(); >+ } >+ } >+ } >+ >+ public static IndexFingerprint getFingerprint(SolrIndexSearcher >searcher, long maxVersion) throws IOException { >+ long start = System.currentTimeMillis(); >+ >+ SchemaField versionField = >VersionInfo.getAndCheckVersionField(searcher.getSchema()); >+ >+ IndexFingerprint f = new IndexFingerprint(); >+ f.maxVersionSpecified = maxVersion; >+ f.maxDoc = searcher.maxDoc(); >+ >+ // TODO: this could be parallelized, or even cached per-segment if >performance becomes an issue >+ ValueSource vs = >versionField.getType().getValueSource(versionField, null); >+ Map funcContext = ValueSource.newContext(searcher); >+ vs.createWeight(funcContext, searcher); >+ for (LeafReaderContext ctx : >searcher.getTopReaderContext().leaves()) { >+ int maxDoc = ctx.reader().maxDoc(); >+ f.numDocs += ctx.reader().numDocs(); >+ Bits liveDocs = ctx.reader().getLiveDocs(); >+ FunctionValues fv = vs.getValues(funcContext, ctx); >+ for (int doc = 0; doc < maxDoc; doc++) { >+ if (liveDocs != null && !liveDocs.get(doc)) continue; >+ long v = fv.longVal(doc); >+ f.maxVersionEncountered = Math.max(v, >f.maxVersionEncountered); >+ if (v <= f.maxVersionSpecified) { >+ f.maxInHash = Math.max(v, f.maxInHash); >+ f.versionsHash += Hash.fmix64(v); >+ f.numVersions++; >+ } >+ } >+ } >+ >+ long end = System.currentTimeMillis(); >+ log.info("IndexFingerprint millis:" + (end-start) + " result:" + >f); >+ >+ return f; >+ } >+ >+ /** returns 0 for equal, negative if f1 is less recent than f2, >positive if more recent */ >+ public static int compare(IndexFingerprint f1, IndexFingerprint f2) >{ >+ int cmp; >+ >+ // NOTE: some way want number of docs in index to take precedence >over highest version (add-only systems for sure) >+ >+ // if we're comparing all of the versions in the index, then go by >the highest encountered. >+ if (f1.maxVersionSpecified == Long.MAX_VALUE) { >+ cmp = Long.compare(f1.maxVersionEncountered, >f2.maxVersionEncountered); >+ if (cmp != 0) return cmp; >+ } >+ >+ // Go by the highest version under the requested max. >+ cmp = Long.compare(f1.maxInHash, f2.maxInHash); >+ if (cmp != 0) return cmp; >+ >+ // go by who has the most documents in the index >+ cmp = Long.compare(f1.numVersions, f2.numVersions); >+ if (cmp != 0) return cmp; >+ >+ // both have same number of documents, so go by hash >+ cmp = Long.compare(f1.versionsHash, f2.versionsHash); >+ return cmp; >+ } >+ >+ /** >+ * Create a generic object suitable for serializing with >ResponseWriters >+ */ >+ public Object toObject() { >+ Map<String,Object> map = new LinkedHashMap<>(); >+ map.put("maxVersionSpecified", maxVersionSpecified); >+ map.put("maxVersionEncountered", maxVersionEncountered); >+ map.put("maxInHash", maxInHash); >+ map.put("versionsHash", versionsHash); >+ map.put("numVersions", numVersions); >+ map.put("numDocs", numDocs); >+ map.put("maxDoc", maxDoc); >+ return map; >+ } >+ >+ private static long getLong(Object o, String key, long def) { >+ long v = def; >+ >+ Object oval = null; >+ if (o instanceof Map) { >+ oval = ((Map)o).get(key); >+ } else if (o instanceof NamedList) { >+ oval = ((NamedList)o).get(key); >+ } >+ >+ return oval != null ? ((Number)oval).longValue() : def; >+ } >+ >+ /** >+ * Create an IndexFingerprint object from a deserialized generic >object (Map or NamedList) >+ */ >+ public static IndexFingerprint fromObject(Object o) { >+ IndexFingerprint f = new IndexFingerprint(); >+ f.maxVersionSpecified = getLong(o, "maxVersionSpecified", >Long.MAX_VALUE); >+ f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1); >+ f.maxInHash = getLong(o, "maxInHash", -1); >+ f.versionsHash = getLong(o, "versionsHash", -1); >+ f.numVersions = getLong(o, "numVersions", -1); >+ f.numDocs = getLong(o, "numDocs", -1); >+ f.maxDoc = getLong(o, "maxDoc", -1); >+ return f; >+ } >+ >+ @Override >+ public String toString() { >+ return toObject().toString(); >+ } >+} > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java >---------------------------------------------------------------------- >diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java >b/solr/core/src/java/org/apache/solr/update/PeerSync.java >index c5ddaf0..dbc0091 100644 >--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java >+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java >@@ -68,6 +68,7 @@ public class PeerSync { > private UpdateLog ulog; > private HttpShardHandlerFactory shardHandlerFactory; > private ShardHandler shardHandler; >+ private List<SyncShardRequest> requests = new ArrayList<>(); > > private List<Long> startingVersions; > >@@ -76,8 +77,10 @@ public class PeerSync { > private Set<Long> requestedUpdateSet; > private long ourLowThreshold; // 20th percentile > private long ourHighThreshold; // 80th percentile >+ private long ourHighest; // currently just used for >logging/debugging purposes > private final boolean cantReachIsSuccess; > private final boolean getNoVersionsIsSuccess; >+ private final boolean doFingerprint; > private final HttpClient client; > private final boolean onlyIfActive; > private SolrCore core; >@@ -116,6 +119,8 @@ public class PeerSync { > > private static class SyncShardRequest extends ShardRequest { > List<Long> reportedVersions; >+ IndexFingerprint fingerprint; >+ boolean doFingerprintComparison; > List<Long> requestedUpdates; > Exception updateException; > } >@@ -125,16 +130,17 @@ public class PeerSync { > } > >public PeerSync(SolrCore core, List<String> replicas, int nUpdates, >boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) { >- this(core, replicas, nUpdates, cantReachIsSuccess, >getNoVersionsIsSuccess, false); >+ this(core, replicas, nUpdates, cantReachIsSuccess, >getNoVersionsIsSuccess, false, true); > } > >- public PeerSync(SolrCore core, List<String> replicas, int nUpdates, >boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean >onlyIfActive) { >+ public PeerSync(SolrCore core, List<String> replicas, int nUpdates, >boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean >onlyIfActive, boolean doFingerprint) { > this.core = core; > this.replicas = replicas; > this.nUpdates = nUpdates; > this.maxUpdates = nUpdates; > this.cantReachIsSuccess = cantReachIsSuccess; > this.getNoVersionsIsSuccess = getNoVersionsIsSuccess; >+ this.doFingerprint = doFingerprint; >this.client = >core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient(); > this.onlyIfActive = onlyIfActive; > >@@ -169,9 +175,8 @@ public class PeerSync { > return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" "; > } > >- /** Returns true if peer sync was successful, meaning that this core >may not be considered to have the latest updates >- * when considering the last N updates between it and its peers. >- * A commit is not performed. >+ /** Returns true if peer sync was successful, meaning that this core >may be considered to have the latest updates. >+ * It does not mean that the remote replica is in sync with us. > */ > public boolean sync() { > if (ulog == null) { >@@ -210,7 +215,7 @@ public class PeerSync { > > ourLowThreshold = percentile(startingVersions, 0.8f); > ourHighThreshold = percentile(startingVersions, 0.2f); >- >+ > // now make sure that the starting updates overlap our updates > // there shouldn't be reorders, so any overlap will do. > >@@ -231,6 +236,7 @@ public class PeerSync { > } > > ourUpdates = newList; >+ Collections.sort(ourUpdates, absComparator); > } else { > > if (ourUpdates.size() > 0) { >@@ -243,9 +249,10 @@ public class PeerSync { > return false; > } > } >- >+ >+ ourHighest = ourUpdates.get(0); > ourUpdateSet = new HashSet<>(ourUpdates); >- requestedUpdateSet = new HashSet<>(ourUpdates); >+ requestedUpdateSet = new HashSet<>(); > > for (;;) { > ShardResponse srsp = shardHandler.takeCompletedOrError(); >@@ -257,9 +264,18 @@ public class PeerSync { > return false; > } > } >- >- log.info(msg() + "DONE. sync succeeded"); >- return true; >+ >+ // finish up any comparisons with other shards that we deferred >+ boolean success = true; >+ for (SyncShardRequest sreq : requests) { >+ if (sreq.doFingerprintComparison) { >+ success = compareFingerprint(sreq); >+ if (!success) break; >+ } >+ } >+ >+ log.info(msg() + "DONE. sync " + (success ? "succeeded" : >"failed")); >+ return success; > } finally { > MDCLoggingContext.clear(); > } >@@ -267,6 +283,7 @@ public class PeerSync { > > private void requestVersions(String replica) { > SyncShardRequest sreq = new SyncShardRequest(); >+ requests.add(sreq); > sreq.purpose = 1; > sreq.shards = new String[]{replica}; > sreq.actualShards = sreq.shards; >@@ -274,6 +291,7 @@ public class PeerSync { > sreq.params.set("qt","/get"); > sreq.params.set("distrib",false); > sreq.params.set("getVersions",nUpdates); >+ sreq.params.set("fingerprint",doFingerprint); > shardHandler.submit(sreq, replica, sreq.params); > } > >@@ -355,7 +373,12 @@ public class PeerSync { > SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest(); > sreq.reportedVersions = otherVersions; > >- log.info(msg() + " Received " + otherVersions.size() + " versions >from " + sreq.shards[0] ); >+ Object fingerprint = >srsp.getSolrResponse().getResponse().get("fingerprint"); >+ >+ log.info(msg() + " Received " + otherVersions.size() + " versions >from " + sreq.shards[0] + " fingerprint:" + fingerprint ); >+ if (fingerprint != null) { >+ sreq.fingerprint = IndexFingerprint.fromObject(fingerprint); >+ } > > if (otherVersions.size() == 0) { > return getNoVersionsIsSuccess; >@@ -371,13 +394,14 @@ public class PeerSync { > > long otherHigh = percentile(otherVersions, .2f); > long otherLow = percentile(otherVersions, .8f); >+ long otherHighest = otherVersions.get(0); > > if (ourHighThreshold < otherLow) { > // Small overlap between version windows and ours is older >// This means that we might miss updates if we attempted to use this >method. > // Since there exists just one replica that is so much newer, we must > // fail the sync. >- log.info(msg() + " Our versions are too old. >ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow); >+ log.info(msg() + " Our versions are too old. >ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + >" ourHighest=" + ourHighest + " otherHighest=" + otherHighest); > return false; > } > >@@ -385,7 +409,10 @@ public class PeerSync { > // Small overlap between windows and ours is newer. >// Using this list to sync would result in requesting/replaying results >we don't need > // and possibly bringing deleted docs back to life. >- log.info(msg() + " Our versions are newer. >ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh); >+ log.info(msg() + " Our versions are newer. >ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " >ourHighest=" + ourHighest + " otherHighest=" + otherHighest); >+ >+ // Because our versions are newer, IndexFingerprint with the >remote would not match us. >+ // We return true on our side, but the remote peersync with us >should fail. > return true; > } > >@@ -408,9 +435,15 @@ public class PeerSync { > sreq.requestedUpdates = toRequest; > > if (toRequest.isEmpty()) { >- log.info(msg() + " Our versions are newer. >ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh); >+ log.info(msg() + " No additional versions requested. >ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " >ourHighest=" + ourHighest + " otherHighest=" + otherHighest); > >// we had (or already requested) all the updates referenced by the >replica >+ >+ // If we requested updates from another replica, we can't >compare fingerprints yet with this replica, we need to defer >+ if (doFingerprint) { >+ sreq.doFingerprintComparison = true; >+ } >+ > return true; > } > >@@ -422,6 +455,19 @@ public class PeerSync { > return requestUpdates(srsp, toRequest); > } > >+ private boolean compareFingerprint(SyncShardRequest sreq) { >+ if (sreq.fingerprint == null) return true; >+ try { >+ IndexFingerprint ourFingerprint = >IndexFingerprint.getFingerprint(core, Long.MAX_VALUE); >+ int cmp = IndexFingerprint.compare(ourFingerprint, >sreq.fingerprint); >+ log.info("Fingerprint comparison: " + cmp); >+ return cmp == 0; // currently, we only check for equality... >+ } catch(IOException e){ >+ log.error(msg() + "Error getting index fingerprint", e); >+ return false; >+ } >+ } >+ >private boolean requestUpdates(ShardResponse srsp, List<Long> >toRequest) { > String replica = srsp.getShardRequest().shards[0]; > >@@ -556,7 +602,7 @@ public class PeerSync { > } > } > >- return true; >+ return compareFingerprint(sreq); > } > > > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java >---------------------------------------------------------------------- >diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java >b/solr/core/src/java/org/apache/solr/update/UpdateLog.java >index 7d7d3ef..7e3fc9f 100644 >--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java >+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java >@@ -283,7 +283,7 @@ public class UpdateLog implements >PluginInfoInitialized { > if (debug) { >log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing >tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id); > } >- >+ > TransactionLog oldLog = null; > for (String oldLogName : tlogFiles) { > File f = new File(tlogDir, oldLogName); >@@ -334,11 +334,11 @@ public class UpdateLog implements >PluginInfoInitialized { > } > > } >- >+ > public String getLogDir() { > return tlogDir.getAbsolutePath(); > } >- >+ > public List<Long> getStartingVersions() { > return startingVersions; > } >@@ -503,32 +503,35 @@ public class UpdateLog implements >PluginInfoInitialized { > if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { >// given that we just did a delete-by-query, we don't know what >documents were > // affected and hence we must purge our caches. >- if (map != null) map.clear(); >- if (prevMap != null) prevMap.clear(); >- if (prevMap2 != null) prevMap2.clear(); >- >+ openRealtimeSearcher(); > trackDeleteByQuery(cmd.getQuery(), cmd.getVersion()); > >- // oldDeletes.clear(); >- >- // We must cause a new IndexReader to be opened before >anything looks at these caches again >- // so that a cache miss will read fresh data. >- // >- // TODO: FUTURE: open a new searcher lazily for better >throughput with delete-by-query commands >- try { >- RefCounted<SolrIndexSearcher> holder = >uhandler.core.openNewSearcher(true, true); >- holder.decref(); >- } catch (Exception e) { >- SolrException.log(log, "Error opening realtime searcher for >deleteByQuery", e); >+ if (trace) { >+ LogPtr ptr = new LogPtr(pos, cmd.getVersion()); >+ log.trace("TLOG: added deleteByQuery " + cmd.query + " to " >+ tlog + " " + ptr + " map=" + System.identityHashCode(map)); > } >- > } >+ } >+ } > >- LogPtr ptr = new LogPtr(pos, cmd.getVersion()); >- >- if (trace) { >- log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + >tlog + " " + ptr + " map=" + System.identityHashCode(map)); >+ /** Opens a new realtime searcher and clears the id caches. >+ * This may also be called when we updates are being buffered (from >PeerSync/IndexFingerprint) >+ */ >+ public void openRealtimeSearcher() { >+ synchronized (this) { >+ // We must cause a new IndexReader to be opened before anything >looks at these caches again >+ // so that a cache miss will read fresh data. >+ try { >+ RefCounted<SolrIndexSearcher> holder = >uhandler.core.openNewSearcher(true, true); >+ holder.decref(); >+ } catch (Exception e) { >+ SolrException.log(log, "Error opening realtime searcher", e); >+ return; > } >+ >+ if (map != null) map.clear(); >+ if (prevMap != null) prevMap.clear(); >+ if (prevMap2 != null) prevMap2.clear(); > } > } > >@@ -620,7 +623,7 @@ public class UpdateLog implements >PluginInfoInitialized { > public boolean hasUncommittedChanges() { > return tlog != null; > } >- >+ > public void preCommit(CommitUpdateCommand cmd) { > synchronized (this) { > if (debug) { >@@ -878,11 +881,11 @@ public class UpdateLog implements >PluginInfoInitialized { > theLog.forceClose(); > } > } >- >+ > public void close(boolean committed) { > close(committed, false); > } >- >+ > public void close(boolean committed, boolean deleteOnClose) { > synchronized (this) { > recoveryExecutor.shutdown(); // no new tasks >@@ -923,7 +926,7 @@ public class UpdateLog implements >PluginInfoInitialized { > this.id = id; > } > } >- >+ > public class RecentUpdates implements Closeable { > > final Deque<TransactionLog> logList; // newest first >@@ -950,17 +953,17 @@ public class UpdateLog implements >PluginInfoInitialized { > > public List<Long> getVersions(int n) { > List<Long> ret = new ArrayList<>(n); >- >+ > for (List<Update> singleList : updateList) { > for (Update ptr : singleList) { > ret.add(ptr.version); > if (--n <= 0) return ret; > } > } >- >+ > return ret; > } >- >+ > public Object lookup(long version) { > Update update = updates.get(version); > if (update == null) return null; >@@ -1004,7 +1007,7 @@ public class UpdateLog implements >PluginInfoInitialized { > try { > o = reader.next(); > if (o==null) break; >- >+ > // should currently be a List<Oper,Ver,Doc/Id> > List entry = (List)o; > >@@ -1027,13 +1030,13 @@ public class UpdateLog implements >PluginInfoInitialized { > > updatesForLog.add(update); > updates.put(version, update); >- >+ > if (oper == UpdateLog.DELETE_BY_QUERY) { > deleteByQueryList.add(update); > } else if (oper == UpdateLog.DELETE) { > deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2))); > } >- >+ > break; > > case UpdateLog.COMMIT: >@@ -1063,7 +1066,7 @@ public class UpdateLog implements >PluginInfoInitialized { > } > > } >- >+ > @Override > public void close() { > for (TransactionLog log : logList) { >@@ -1331,10 +1334,10 @@ public class UpdateLog implements >PluginInfoInitialized { >"log replay status {} active={} starting pos={} current pos={} current >size={} % read={}", > translog, activeLog, recoveryInfo.positionOfStart, cpos, csize, > Math.floor(cpos / (double) csize * 100.)); >- >+ > } > } >- >+ > o = null; > o = tlogReader.next(); > if (o == null && activeLog) { >@@ -1513,25 +1516,25 @@ public class UpdateLog implements >PluginInfoInitialized { > } > } > } >- >+ > protected String getTlogDir(SolrCore core, PluginInfo info) { > String dataDir = (String) info.initArgs.get("dir"); >- >+ > String ulogDir = core.getCoreDescriptor().getUlogDir(); > if (ulogDir != null) { > dataDir = ulogDir; > } >- >+ > if (dataDir == null || dataDir.length() == 0) { > dataDir = core.getDataDir(); > } > > return dataDir + "/" + TLOG_NAME; > } >- >+ > /** > * Clears the logs on the file system. Only call before init. >- * >+ * > * @param core the SolrCore > * @param ulogPluginInfo the init info for the UpdateHandler > */ > >http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >---------------------------------------------------------------------- >diff --git >a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >index 8083ad0..bcaf846 100644 >--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java >@@ -168,6 +168,29 @@ public class PeerSyncTest extends >BaseDistributedSearchTestCase { > > assertSync(client1, numVersions, true, shardsArr[0]); >client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", >"sort","_version_ desc"), client0, client1); >+ >+ // now lets check fingerprinting causes appropriate fails >+ v = 4000; >+ add(client0, seenLeader, >sdoc("id",Integer.toString((int)v),"_version_",v)); >+ toAdd = numVersions+10; >+ for (int i=0; i<toAdd; i++) { >+ add(client0, seenLeader, >sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1)); >+ add(client1, seenLeader, >sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1)); >+ } >+ >+ // client0 now has an additional add beyond our window and the >fingerprint should cause this to fail >+ assertSync(client1, numVersions, false, shardsArr[0]); >+ >+ // lets add the missing document and verify that order doesn't >matter >+ add(client1, seenLeader, >sdoc("id",Integer.toString((int)v),"_version_",v)); >+ assertSync(client1, numVersions, true, shardsArr[0]); >+ >+ // lets do some overwrites to ensure that repeated updates and >maxDoc don't matter >+ for (int i=0; i<10; i++) { >+ add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i >+ 1), "_version_", v + i + 1)); >+ } >+ assertSync(client1, numVersions, true, shardsArr[0]); >+ > } > > -- Uwe Schindler H.-H.-Meier-Allee 63, 28213 Bremen http://www.thetaphi.de
