Hi Yonik,

I committed a change yesterday. Could you backports this, too? It broke the 
forbidden tests' because it used currentTimeMillis instead of RTimer.

Uwe

Am 5. Februar 2016 17:14:34 MEZ, schrieb [email protected]:
>Repository: lucene-solr
>Updated Branches:
>  refs/heads/branch_5x 482b40f84 -> ff83a4001
>
>
>SOLR-8586: add index fingerprinting and use it in peersync
>(cherry picked from commit 629767b)
>
>
>Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
>Commit:
>http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
>Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
>Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
>
>Branch: refs/heads/branch_5x
>Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
>Parents: 482b40f
>Author: yonik <[email protected]>
>Authored: Thu Feb 4 14:54:08 2016 -0500
>Committer: yonik <[email protected]>
>Committed: Fri Feb 5 11:11:34 2016 -0500
>
>----------------------------------------------------------------------
> solr/CHANGES.txt                                |   4 +
> .../org/apache/solr/cloud/SyncStrategy.java     |   2 +-
> .../handler/component/RealTimeGetComponent.java |   8 +
>.../apache/solr/update/IndexFingerprint.java    | 232
>+++++++++++++++++++
> .../java/org/apache/solr/update/PeerSync.java   |  78 +++++--
> .../java/org/apache/solr/update/UpdateLog.java  |  85 +++----
> .../org/apache/solr/update/PeerSyncTest.java    |  23 ++
> 7 files changed, 374 insertions(+), 58 deletions(-)
>----------------------------------------------------------------------
>
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
>----------------------------------------------------------------------
>diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
>index f2067c6..2b7b46e 100644
>--- a/solr/CHANGES.txt
>+++ b/solr/CHANGES.txt
>@@ -119,6 +119,10 @@ New Features
> * SOLR-8415: Provide command to switch between non/secure mode in ZK
>   (Mike Drob, Gregory Chanan)
> 
>+* SOLR-8586: added index fingerprint, a hash over all versions
>currently in the index.
>+  PeerSync now uses this to check if replicas are in sync. (yonik)
>+
>+
> Bug Fixes
> ----------------------
> 
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>index 7a16598..d811f5c 100644
>--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>@@ -173,7 +173,7 @@ public class SyncStrategy {
>// if we can't reach a replica for sync, we still consider the overall
>sync a success
>// TODO: as an assurance, we should still try and tell the sync nodes
>that we couldn't reach
>     // to recover once more?
>-    PeerSync peerSync = new PeerSync(core, syncWith,
>core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true,
>true, peerSyncOnlyWithActive);
>+    PeerSync peerSync = new PeerSync(core, syncWith,
>core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true,
>true, peerSyncOnlyWithActive, false);
>     return peerSync.sync();
>   }
>   
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>index 2bbf5a2..14a4185 100644
>---
>a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>+++
>b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>@@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
> import org.apache.solr.search.SolrIndexSearcher;
> import org.apache.solr.search.SolrReturnFields;
> import org.apache.solr.update.DocumentBuilder;
>+import org.apache.solr.update.IndexFingerprint;
> import org.apache.solr.update.PeerSync;
> import org.apache.solr.update.UpdateLog;
> import org.apache.solr.util.RefCounted;
>@@ -536,6 +537,8 @@ public class RealTimeGetComponent extends
>SearchComponent
>     int nVersions = params.getInt("getVersions", -1);
>     if (nVersions == -1) return;
> 
>+    boolean doFingerprint = params.getBool("fingerprint", false);
>+
>     String sync = params.get("sync");
>     if (sync != null) {
>       processSync(rb, nVersions, sync);
>@@ -548,6 +551,11 @@ public class RealTimeGetComponent extends
>SearchComponent
>try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
>       rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
>     }
>+
>+    if (doFingerprint) {
>+      IndexFingerprint fingerprint =
>IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
>+      rb.rsp.add("fingerprint", fingerprint.toObject());
>+    }
>   }
> 
>   
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>new file mode 100644
>index 0000000..c73b57b
>--- /dev/null
>+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>@@ -0,0 +1,232 @@
>+package org.apache.solr.update;
>+
>+/*
>+ * Licensed to the Apache Software Foundation (ASF) under one or more
>+ * contributor license agreements.  See the NOTICE file distributed
>with
>+ * this work for additional information regarding copyright ownership.
>+ * The ASF licenses this file to You under the Apache License, Version
>2.0
>+ * (the "License"); you may not use this file except in compliance
>with
>+ * the License.  You may obtain a copy of the License at
>+ *
>+ *     http://www.apache.org/licenses/LICENSE-2.0
>+ *
>+ * Unless required by applicable law or agreed to in writing, software
>+ * distributed under the License is distributed on an "AS IS" BASIS,
>+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>implied.
>+ * See the License for the specific language governing permissions and
>+ * limitations under the License.
>+ */
>+
>+import java.io.IOException;
>+import java.lang.invoke.MethodHandles;
>+import java.net.ConnectException;
>+import java.net.SocketException;
>+import java.util.ArrayList;
>+import java.util.Collections;
>+import java.util.Comparator;
>+import java.util.HashSet;
>+import java.util.LinkedHashMap;
>+import java.util.List;
>+import java.util.Map;
>+import java.util.Set;
>+
>+import org.apache.http.NoHttpResponseException;
>+import org.apache.http.client.HttpClient;
>+import org.apache.http.conn.ConnectTimeoutException;
>+import org.apache.lucene.index.LeafReader;
>+import org.apache.lucene.index.LeafReaderContext;
>+import org.apache.lucene.queries.function.FunctionValues;
>+import org.apache.lucene.queries.function.ValueSource;
>+import org.apache.lucene.util.Bits;
>+import org.apache.lucene.util.BytesRef;
>+import org.apache.solr.client.solrj.SolrServerException;
>+import org.apache.solr.cloud.ZkController;
>+import org.apache.solr.common.SolrException;
>+import org.apache.solr.common.SolrInputDocument;
>+import org.apache.solr.common.params.ModifiableSolrParams;
>+import org.apache.solr.common.util.Hash;
>+import org.apache.solr.common.util.NamedList;
>+import org.apache.solr.common.util.StrUtils;
>+import org.apache.solr.core.SolrCore;
>+import org.apache.solr.handler.component.HttpShardHandlerFactory;
>+import org.apache.solr.handler.component.ShardHandler;
>+import org.apache.solr.handler.component.ShardHandlerFactory;
>+import org.apache.solr.handler.component.ShardRequest;
>+import org.apache.solr.handler.component.ShardResponse;
>+import org.apache.solr.logging.MDCLoggingContext;
>+import org.apache.solr.request.LocalSolrQueryRequest;
>+import org.apache.solr.request.SolrQueryRequest;
>+import org.apache.solr.response.SolrQueryResponse;
>+import org.apache.solr.schema.SchemaField;
>+import org.apache.solr.search.SolrIndexSearcher;
>+import org.apache.solr.update.processor.UpdateRequestProcessor;
>+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
>+import org.apache.solr.util.RefCounted;
>+import org.slf4j.Logger;
>+import org.slf4j.LoggerFactory;
>+
>+import static
>org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
>+import static
>org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
>+
>+/** @lucene.internal */
>+public class IndexFingerprint {
>+  private static final Logger log =
>LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>+
>+  private long maxVersionSpecified;
>+  private long maxVersionEncountered;
>+  private long maxInHash;
>+  private long versionsHash;
>+  private long numVersions;
>+  private long numDocs;
>+  private long maxDoc;
>+
>+  public long getMaxVersionSpecified() {
>+    return maxVersionSpecified;
>+  }
>+
>+  public long getMaxVersionEncountered() {
>+    return maxVersionEncountered;
>+  }
>+
>+  public long getMaxInHash() {
>+    return maxInHash;
>+  }
>+
>+  public long getVersionsHash() {
>+    return versionsHash;
>+  }
>+
>+  public long getNumVersions() {
>+    return numVersions;
>+  }
>+
>+  public long getNumDocs() {
>+    return numDocs;
>+  }
>+
>+  public long getMaxDoc() {
>+    return maxDoc;
>+  }
>+
>+  /** Opens a new realtime searcher and returns it's fingerprint */
>+  public static IndexFingerprint getFingerprint(SolrCore core, long
>maxVersion) throws IOException {
>+    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
>+    RefCounted<SolrIndexSearcher> newestSearcher =
>core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
>+    try {
>+      return getFingerprint(newestSearcher.get(), maxVersion);
>+    } finally {
>+      if (newestSearcher != null) {
>+        newestSearcher.decref();
>+      }
>+    }
>+  }
>+
>+  public static IndexFingerprint getFingerprint(SolrIndexSearcher
>searcher, long maxVersion) throws IOException {
>+    long start = System.currentTimeMillis();
>+
>+    SchemaField versionField =
>VersionInfo.getAndCheckVersionField(searcher.getSchema());
>+
>+    IndexFingerprint f = new IndexFingerprint();
>+    f.maxVersionSpecified = maxVersion;
>+    f.maxDoc = searcher.maxDoc();
>+
>+    // TODO: this could be parallelized, or even cached per-segment if
>performance becomes an issue
>+    ValueSource vs =
>versionField.getType().getValueSource(versionField, null);
>+    Map funcContext = ValueSource.newContext(searcher);
>+    vs.createWeight(funcContext, searcher);
>+    for (LeafReaderContext ctx :
>searcher.getTopReaderContext().leaves()) {
>+      int maxDoc = ctx.reader().maxDoc();
>+      f.numDocs += ctx.reader().numDocs();
>+      Bits liveDocs = ctx.reader().getLiveDocs();
>+      FunctionValues fv = vs.getValues(funcContext, ctx);
>+      for (int doc = 0; doc < maxDoc; doc++) {
>+        if (liveDocs != null && !liveDocs.get(doc)) continue;
>+        long v = fv.longVal(doc);
>+        f.maxVersionEncountered = Math.max(v,
>f.maxVersionEncountered);
>+        if (v <= f.maxVersionSpecified) {
>+          f.maxInHash = Math.max(v, f.maxInHash);
>+          f.versionsHash += Hash.fmix64(v);
>+          f.numVersions++;
>+        }
>+      }
>+    }
>+
>+    long end = System.currentTimeMillis();
>+    log.info("IndexFingerprint millis:" + (end-start) + " result:" +
>f);
>+
>+    return f;
>+  }
>+
>+  /** returns 0 for equal, negative if f1 is less recent than f2,
>positive if more recent */
>+  public static int compare(IndexFingerprint f1, IndexFingerprint f2)
>{
>+    int cmp;
>+
>+    // NOTE: some way want number of docs in index to take precedence
>over highest version (add-only systems for sure)
>+
>+    // if we're comparing all of the versions in the index, then go by
>the highest encountered.
>+    if (f1.maxVersionSpecified == Long.MAX_VALUE) {
>+      cmp = Long.compare(f1.maxVersionEncountered,
>f2.maxVersionEncountered);
>+      if (cmp != 0) return cmp;
>+    }
>+
>+    // Go by the highest version under the requested max.
>+    cmp = Long.compare(f1.maxInHash, f2.maxInHash);
>+    if (cmp != 0) return cmp;
>+
>+    // go by who has the most documents in the index
>+    cmp = Long.compare(f1.numVersions, f2.numVersions);
>+    if (cmp != 0) return cmp;
>+
>+    // both have same number of documents, so go by hash
>+    cmp = Long.compare(f1.versionsHash, f2.versionsHash);
>+    return cmp;
>+  }
>+
>+  /**
>+   * Create a generic object suitable for serializing with
>ResponseWriters
>+   */
>+  public Object toObject() {
>+    Map<String,Object> map = new LinkedHashMap<>();
>+    map.put("maxVersionSpecified", maxVersionSpecified);
>+    map.put("maxVersionEncountered", maxVersionEncountered);
>+    map.put("maxInHash", maxInHash);
>+    map.put("versionsHash", versionsHash);
>+    map.put("numVersions", numVersions);
>+    map.put("numDocs", numDocs);
>+    map.put("maxDoc", maxDoc);
>+    return map;
>+  }
>+
>+  private static long getLong(Object o, String key, long def) {
>+    long v = def;
>+
>+    Object oval = null;
>+    if (o instanceof Map) {
>+      oval = ((Map)o).get(key);
>+    } else if (o instanceof NamedList) {
>+      oval = ((NamedList)o).get(key);
>+    }
>+
>+    return oval != null ? ((Number)oval).longValue() : def;
>+  }
>+
>+  /**
>+   * Create an IndexFingerprint object from a deserialized generic
>object (Map or NamedList)
>+   */
>+  public static IndexFingerprint fromObject(Object o) {
>+    IndexFingerprint f = new IndexFingerprint();
>+    f.maxVersionSpecified = getLong(o, "maxVersionSpecified",
>Long.MAX_VALUE);
>+    f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
>+    f.maxInHash = getLong(o, "maxInHash", -1);
>+    f.versionsHash = getLong(o, "versionsHash", -1);
>+    f.numVersions = getLong(o, "numVersions", -1);
>+    f.numDocs = getLong(o, "numDocs", -1);
>+    f.maxDoc = getLong(o, "maxDoc", -1);
>+    return f;
>+  }
>+
>+  @Override
>+  public String toString() {
>+    return toObject().toString();
>+  }
>+}
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>index c5ddaf0..dbc0091 100644
>--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>@@ -68,6 +68,7 @@ public class PeerSync  {
>   private UpdateLog ulog;
>   private HttpShardHandlerFactory shardHandlerFactory;
>   private ShardHandler shardHandler;
>+  private List<SyncShardRequest> requests = new ArrayList<>();
> 
>   private List<Long> startingVersions;
> 
>@@ -76,8 +77,10 @@ public class PeerSync  {
>   private Set<Long> requestedUpdateSet;
>   private long ourLowThreshold;  // 20th percentile
>   private long ourHighThreshold; // 80th percentile
>+  private long ourHighest;  // currently just used for
>logging/debugging purposes
>   private final boolean cantReachIsSuccess;
>   private final boolean getNoVersionsIsSuccess;
>+  private final boolean doFingerprint;
>   private final HttpClient client;
>   private final boolean onlyIfActive;
>   private SolrCore core;
>@@ -116,6 +119,8 @@ public class PeerSync  {
> 
>   private static class SyncShardRequest extends ShardRequest {
>     List<Long> reportedVersions;
>+    IndexFingerprint fingerprint;
>+    boolean doFingerprintComparison;
>     List<Long> requestedUpdates;
>     Exception updateException;
>   }
>@@ -125,16 +130,17 @@ public class PeerSync  {
>   }
>   
>public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
>-    this(core, replicas, nUpdates, cantReachIsSuccess,
>getNoVersionsIsSuccess, false);
>+    this(core, replicas, nUpdates, cantReachIsSuccess,
>getNoVersionsIsSuccess, false, true);
>   }
>   
>-  public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>onlyIfActive) {
>+  public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>onlyIfActive, boolean doFingerprint) {
>     this.core = core;
>     this.replicas = replicas;
>     this.nUpdates = nUpdates;
>     this.maxUpdates = nUpdates;
>     this.cantReachIsSuccess = cantReachIsSuccess;
>     this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
>+    this.doFingerprint = doFingerprint;
>this.client =
>core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
>     this.onlyIfActive = onlyIfActive;
>     
>@@ -169,9 +175,8 @@ public class PeerSync  {
>  return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
>   }
> 
>-  /** Returns true if peer sync was successful, meaning that this core
>may not be considered to have the latest updates
>-   *  when considering the last N updates between it and its peers.
>-   *  A commit is not performed.
>+  /** Returns true if peer sync was successful, meaning that this core
>may be considered to have the latest updates.
>+   * It does not mean that the remote replica is in sync with us.
>    */
>   public boolean sync() {
>     if (ulog == null) {
>@@ -210,7 +215,7 @@ public class PeerSync  {
>         
>         ourLowThreshold = percentile(startingVersions, 0.8f);
>         ourHighThreshold = percentile(startingVersions, 0.2f);
>-        
>+
>         // now make sure that the starting updates overlap our updates
>         // there shouldn't be reorders, so any overlap will do.
>         
>@@ -231,6 +236,7 @@ public class PeerSync  {
>         }
>         
>         ourUpdates = newList;
>+        Collections.sort(ourUpdates, absComparator);
>       } else {
>         
>         if (ourUpdates.size() > 0) {
>@@ -243,9 +249,10 @@ public class PeerSync  {
>           return false;
>         }
>       }
>-      
>+
>+      ourHighest = ourUpdates.get(0);
>       ourUpdateSet = new HashSet<>(ourUpdates);
>-      requestedUpdateSet = new HashSet<>(ourUpdates);
>+      requestedUpdateSet = new HashSet<>();
>       
>       for (;;) {
>         ShardResponse srsp = shardHandler.takeCompletedOrError();
>@@ -257,9 +264,18 @@ public class PeerSync  {
>           return false;
>         }
>       }
>-      
>-      log.info(msg() + "DONE. sync succeeded");
>-      return true;
>+
>+      // finish up any comparisons with other shards that we deferred
>+      boolean success = true;
>+      for (SyncShardRequest sreq : requests) {
>+        if (sreq.doFingerprintComparison) {
>+          success = compareFingerprint(sreq);
>+          if (!success) break;
>+        }
>+      }
>+
>+      log.info(msg() + "DONE. sync " + (success ? "succeeded" :
>"failed"));
>+      return success;
>     } finally {
>       MDCLoggingContext.clear();
>     }
>@@ -267,6 +283,7 @@ public class PeerSync  {
>   
>   private void requestVersions(String replica) {
>     SyncShardRequest sreq = new SyncShardRequest();
>+    requests.add(sreq);
>     sreq.purpose = 1;
>     sreq.shards = new String[]{replica};
>     sreq.actualShards = sreq.shards;
>@@ -274,6 +291,7 @@ public class PeerSync  {
>     sreq.params.set("qt","/get");
>     sreq.params.set("distrib",false);
>     sreq.params.set("getVersions",nUpdates);
>+    sreq.params.set("fingerprint",doFingerprint);
>     shardHandler.submit(sreq, replica, sreq.params);
>   }
> 
>@@ -355,7 +373,12 @@ public class PeerSync  {
>     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
>     sreq.reportedVersions =  otherVersions;
> 
>-    log.info(msg() + " Received " + otherVersions.size() + " versions
>from " + sreq.shards[0] );
>+    Object fingerprint =
>srsp.getSolrResponse().getResponse().get("fingerprint");
>+
>+    log.info(msg() + " Received " + otherVersions.size() + " versions
>from " + sreq.shards[0] + " fingerprint:" + fingerprint );
>+    if (fingerprint != null) {
>+      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
>+    }
> 
>     if (otherVersions.size() == 0) {
>       return getNoVersionsIsSuccess; 
>@@ -371,13 +394,14 @@ public class PeerSync  {
>     
>     long otherHigh = percentile(otherVersions, .2f);
>     long otherLow = percentile(otherVersions, .8f);
>+    long otherHighest = otherVersions.get(0);
> 
>     if (ourHighThreshold < otherLow) {
>       // Small overlap between version windows and ours is older
>// This means that we might miss updates if we attempted to use this
>method.
>  // Since there exists just one replica that is so much newer, we must
>       // fail the sync.
>-      log.info(msg() + " Our versions are too old.
>ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
>+      log.info(msg() + " Our versions are too old.
>ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow +
>" ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>       return false;
>     }
> 
>@@ -385,7 +409,10 @@ public class PeerSync  {
>       // Small overlap between windows and ours is newer.
>// Using this list to sync would result in requesting/replaying results
>we don't need
>       // and possibly bringing deleted docs back to life.
>-      log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>+      log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ "
>ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>+
>+      // Because our versions are newer, IndexFingerprint with the
>remote would not match us.
>+      // We return true on our side, but the remote peersync with us
>should fail.
>       return true;
>     }
>     
>@@ -408,9 +435,15 @@ public class PeerSync  {
>     sreq.requestedUpdates = toRequest;
>     
>     if (toRequest.isEmpty()) {
>-      log.info(msg() + " Our versions are newer.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>+      log.info(msg() + " No additional versions requested.
>ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ "
>ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
> 
>// we had (or already requested) all the updates referenced by the
>replica
>+
>+      // If we requested updates from another replica, we can't
>compare fingerprints yet with this replica, we need to defer
>+      if (doFingerprint) {
>+        sreq.doFingerprintComparison = true;
>+      }
>+
>       return true;
>     }
>     
>@@ -422,6 +455,19 @@ public class PeerSync  {
>     return requestUpdates(srsp, toRequest);
>   }
> 
>+  private boolean compareFingerprint(SyncShardRequest sreq) {
>+    if (sreq.fingerprint == null) return true;
>+    try {
>+      IndexFingerprint ourFingerprint =
>IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
>+      int cmp = IndexFingerprint.compare(ourFingerprint,
>sreq.fingerprint);
>+      log.info("Fingerprint comparison: " + cmp);
>+      return cmp == 0;  // currently, we only check for equality...
>+    } catch(IOException e){
>+      log.error(msg() + "Error getting index fingerprint", e);
>+      return false;
>+    }
>+  }
>+
>private boolean requestUpdates(ShardResponse srsp, List<Long>
>toRequest) {
>     String replica = srsp.getShardRequest().shards[0];
> 
>@@ -556,7 +602,7 @@ public class PeerSync  {
>       }
>     }
> 
>-    return true;
>+    return compareFingerprint(sreq);
>   }
> 
> 
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>----------------------------------------------------------------------
>diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>index 7d7d3ef..7e3fc9f 100644
>--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>@@ -283,7 +283,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>     if (debug) {
>log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing
>tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
>     }
>-    
>+
>     TransactionLog oldLog = null;
>     for (String oldLogName : tlogFiles) {
>       File f = new File(tlogDir, oldLogName);
>@@ -334,11 +334,11 @@ public class UpdateLog implements
>PluginInfoInitialized {
>     }
> 
>   }
>-  
>+
>   public String getLogDir() {
>     return tlogDir.getAbsolutePath();
>   }
>-  
>+
>   public List<Long> getStartingVersions() {
>     return startingVersions;
>   }
>@@ -503,32 +503,35 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
>// given that we just did a delete-by-query, we don't know what
>documents were
>         // affected and hence we must purge our caches.
>-        if (map != null) map.clear();
>-        if (prevMap != null) prevMap.clear();
>-        if (prevMap2 != null) prevMap2.clear();
>-
>+        openRealtimeSearcher();
>         trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
> 
>-        // oldDeletes.clear();
>-
>-        // We must cause a new IndexReader to be opened before
>anything looks at these caches again
>-        // so that a cache miss will read fresh data.
>-        //
>-        // TODO: FUTURE: open a new searcher lazily for better
>throughput with delete-by-query commands
>-        try {
>-          RefCounted<SolrIndexSearcher> holder =
>uhandler.core.openNewSearcher(true, true);
>-          holder.decref();
>-        } catch (Exception e) {
>-          SolrException.log(log, "Error opening realtime searcher for
>deleteByQuery", e);
>+        if (trace) {
>+          LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>+          log.trace("TLOG: added deleteByQuery " + cmd.query + " to "
>+ tlog + " " + ptr + " map=" + System.identityHashCode(map));
>         }
>-
>       }
>+    }
>+  }
> 
>-      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>-
>-      if (trace) {
>-        log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>tlog + " " + ptr + " map=" + System.identityHashCode(map));
>+  /** Opens a new realtime searcher and clears the id caches.
>+   * This may also be called when we updates are being buffered (from
>PeerSync/IndexFingerprint)
>+   */
>+  public void openRealtimeSearcher() {
>+    synchronized (this) {
>+      // We must cause a new IndexReader to be opened before anything
>looks at these caches again
>+      // so that a cache miss will read fresh data.
>+      try {
>+        RefCounted<SolrIndexSearcher> holder =
>uhandler.core.openNewSearcher(true, true);
>+        holder.decref();
>+      } catch (Exception e) {
>+        SolrException.log(log, "Error opening realtime searcher", e);
>+        return;
>       }
>+
>+      if (map != null) map.clear();
>+      if (prevMap != null) prevMap.clear();
>+      if (prevMap2 != null) prevMap2.clear();
>     }
>   }
> 
>@@ -620,7 +623,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>   public boolean hasUncommittedChanges() {
>     return tlog != null;
>   }
>-  
>+
>   public void preCommit(CommitUpdateCommand cmd) {
>     synchronized (this) {
>       if (debug) {
>@@ -878,11 +881,11 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       theLog.forceClose();
>     }
>   }
>-  
>+
>   public void close(boolean committed) {
>     close(committed, false);
>   }
>-  
>+
>   public void close(boolean committed, boolean deleteOnClose) {
>     synchronized (this) {
>       recoveryExecutor.shutdown(); // no new tasks
>@@ -923,7 +926,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       this.id = id;
>     }
>   }
>-  
>+
>   public class RecentUpdates implements Closeable {
> 
>     final Deque<TransactionLog> logList;    // newest first
>@@ -950,17 +953,17 @@ public class UpdateLog implements
>PluginInfoInitialized {
> 
>     public List<Long> getVersions(int n) {
>       List<Long> ret = new ArrayList<>(n);
>-      
>+
>       for (List<Update> singleList : updateList) {
>         for (Update ptr : singleList) {
>           ret.add(ptr.version);
>           if (--n <= 0) return ret;
>         }
>       }
>-      
>+
>       return ret;
>     }
>-    
>+
>     public Object lookup(long version) {
>       Update update = updates.get(version);
>       if (update == null) return null;
>@@ -1004,7 +1007,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>             try {
>               o = reader.next();
>               if (o==null) break;
>-              
>+
>               // should currently be a List<Oper,Ver,Doc/Id>
>               List entry = (List)o;
> 
>@@ -1027,13 +1030,13 @@ public class UpdateLog implements
>PluginInfoInitialized {
> 
>                   updatesForLog.add(update);
>                   updates.put(version, update);
>-                  
>+
>                   if (oper == UpdateLog.DELETE_BY_QUERY) {
>                     deleteByQueryList.add(update);
>                   } else if (oper == UpdateLog.DELETE) {
>       deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
>                   }
>-                  
>+
>                   break;
> 
>                 case UpdateLog.COMMIT:
>@@ -1063,7 +1066,7 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       }
> 
>     }
>-    
>+
>     @Override
>     public void close() {
>       for (TransactionLog log : logList) {
>@@ -1331,10 +1334,10 @@ public class UpdateLog implements
>PluginInfoInitialized {
>"log replay status {} active={} starting pos={} current pos={} current
>size={} % read={}",
>        translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
>                         Math.floor(cpos / (double) csize * 100.));
>-                
>+
>               }
>             }
>-            
>+
>             o = null;
>             o = tlogReader.next();
>             if (o == null && activeLog) {
>@@ -1513,25 +1516,25 @@ public class UpdateLog implements
>PluginInfoInitialized {
>       }
>     }
>   }
>-  
>+
>   protected String getTlogDir(SolrCore core, PluginInfo info) {
>     String dataDir = (String) info.initArgs.get("dir");
>-    
>+
>     String ulogDir = core.getCoreDescriptor().getUlogDir();
>     if (ulogDir != null) {
>       dataDir = ulogDir;
>     }
>-    
>+
>     if (dataDir == null || dataDir.length() == 0) {
>       dataDir = core.getDataDir();
>     }
> 
>     return dataDir + "/" + TLOG_NAME;
>   }
>-  
>+
>   /**
>    * Clears the logs on the file system. Only call before init.
>-   * 
>+   *
>    * @param core the SolrCore
>    * @param ulogPluginInfo the init info for the UpdateHandler
>    */
>
>http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>----------------------------------------------------------------------
>diff --git
>a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>index 8083ad0..bcaf846 100644
>--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>@@ -168,6 +168,29 @@ public class PeerSyncTest extends
>BaseDistributedSearchTestCase {
> 
>     assertSync(client1, numVersions, true, shardsArr[0]);
>client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*",
>"sort","_version_ desc"), client0, client1);
>+
>+    // now lets check fingerprinting causes appropriate fails
>+    v = 4000;
>+    add(client0, seenLeader,
>sdoc("id",Integer.toString((int)v),"_version_",v));
>+    toAdd = numVersions+10;
>+    for (int i=0; i<toAdd; i++) {
>+      add(client0, seenLeader,
>sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>+      add(client1, seenLeader,
>sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>+    }
>+
>+    // client0 now has an additional add beyond our window and the
>fingerprint should cause this to fail
>+    assertSync(client1, numVersions, false, shardsArr[0]);
>+
>+    // lets add the missing document and verify that order doesn't
>matter
>+    add(client1, seenLeader,
>sdoc("id",Integer.toString((int)v),"_version_",v));
>+    assertSync(client1, numVersions, true, shardsArr[0]);
>+
>+    // lets do some overwrites to ensure that repeated updates and
>maxDoc don't matter
>+    for (int i=0; i<10; i++) {
>+      add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i
>+ 1), "_version_", v + i + 1));
>+    }
>+    assertSync(client1, numVersions, true, shardsArr[0]);
>+
>   }
> 
> 

--
Uwe Schindler
H.-H.-Meier-Allee 63, 28213 Bremen
http://www.thetaphi.de

Reply via email to