On Fri, Feb 5, 2016 at 11:23 AM, Uwe Schindler <[email protected]> wrote:
> Hi Yonik,
>
> I committed a change yesterday. Could you backports this, too? It broke the
> forbidden tests' because it used currentTimeMillis instead of RTimer.

Ah, I hadn't noticed (I got busy between the commit to trunk yesterday
and the backport to 5x today).
Will do.

-Yonik

> Uwe
>
> Am 5. Februar 2016 17:14:34 MEZ, schrieb [email protected]:
>>
>> Repository: lucene-solr
>> Updated Branches:
>>   refs/heads/branch_5x 482b40f84 -> ff83a4001
>>
>>
>> SOLR-8586: add index fingerprinting and use it in peersync
>> (cherry picked from commit 629767b)
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ff83a400
>> Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ff83a400
>> Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ff83a400
>>
>> Branch: refs/heads/branch_5x
>> Commit: ff83a400156beb6a8dd2d0845c7f878c28431739
>> Parents: 482b40f
>> Author: yonik
>> <[email protected]>
>> Authored: Thu Feb 4 14:54:08 2016 -0500
>> Committer: yonik <[email protected]>
>> Committed: Fri Feb 5 11:11:34 2016 -0500
>>
>> ________________________________
>>
>>  solr/CHANGES.txt                                |   4 +
>>  .../org/apache/solr/cloud/SyncStrategy.java     |   2 +-
>>  .../handler/component/RealTimeGetComponent.java |   8 +
>>  .../apache/solr/update/IndexFingerprint.java    | 232 +++++++++++++++++++
>>  .../java/org/apache/solr/update/PeerSync.java   |  78 +++++--
>>  .../java/org/apache/solr/update/UpdateLog.java  |  85 +++----
>>  .../org/apache/solr/update/PeerSyncTest.java    |  23 ++
>>  7 files changed, 374 insertions(+), 58 deletions(-)
>> ________________________________
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/CHANGES.txt
>> ________________________________
>>
>> diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
>> index
>> f2067c6..2b7b46e 100644
>> --- a/solr/CHANGES.txt
>> +++ b/solr/CHANGES.txt
>> @@ -119,6 +119,10 @@ New Features
>>  * SOLR-8415: Provide command to switch between non/secure mode in ZK
>>    (Mike Drob, Gregory Chanan)
>>
>> +* SOLR-8586: added index fingerprint, a hash over all versions currently
>> in the index.
>> +  PeerSync now uses this to check if replicas are in sync. (yonik)
>> +
>> +
>>  Bug Fixes
>>  ----------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> ________________________________
>>
>> diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> index 7a16598..d811f5c 100644
>> ---
>> a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
>> @@ -173,7 +173,7 @@ public class SyncStrategy {
>>      // if we can't reach a replica for sync, we still consider the
>> overall sync a success
>>      // TODO: as an assurance, we should still try and tell the sync nodes
>> that we couldn't reach
>>      // to recover once more?
>> -    PeerSync peerSync = new PeerSync(core, syncWith,
>> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true,
>> peerSyncOnlyWithActive);
>> +    PeerSync peerSync = new PeerSync(core, syncWith,
>> core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true,
>> peerSyncOnlyWithActive, false);
>>      return peerSync.sync();
>>    }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> index 2bbf5a2..14a4185 100644
>> ---
>> a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> +++
>> b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
>> @@ -58,6 +58,7 @@ import org.apache.solr.search.ReturnFields;
>>  import org.apache.solr.search.SolrIndexSearcher;
>>  import org.apache.solr.search.SolrReturnFields;
>>  import org.apache.solr.update.DocumentBuilder;
>> +import org.apache.solr.update.IndexFingerprint;
>>
>> import org.apache.solr.update.PeerSync;
>>  import org.apache.solr.update.UpdateLog;
>>  import org.apache.solr.util.RefCounted;
>> @@ -536,6 +537,8 @@ public class RealTimeGetComponent extends
>> SearchComponent
>>      int nVersions = params.getInt("getVersions", -1);
>>      if (nVersions == -1) return;
>>
>> +    boolean doFingerprint = params.getBool("fingerprint", false);
>> +
>>      String sync = params.get("sync");
>>      if (sync != null) {
>>        processSync(rb, nVersions, sync);
>> @@ -548,6 +551,11 @@ public class RealTimeGetComponent extends
>> SearchComponent
>>      try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates())
>> {
>>        rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
>>      }
>> +
>> +    if (doFingerprint) {
>> +      IndexFingerprint fingerprint =
>> IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
>> +      rb.rsp.add("fingerprint", fingerprint.toObject());
>> +    }
>>    }
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> new file mode 100644
>> index 0000000..c73b57b
>> --- /dev/null
>> +++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
>> @@ -0,0 +1,232 @@
>> +package org.apache.solr.update;
>> +
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License,
>> Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +import java.io.IOException;
>> +import java.lang.invoke.MethodHandles;
>> +import java.net.ConnectException;
>> +import java.net.SocketException;
>> +import java.util.ArrayList;
>> +import java.util.Collections;
>> +import
>> java.util.Comparator;
>> +import java.util.HashSet;
>> +import java.util.LinkedHashMap;
>> +import java.util.List;
>> +import java.util.Map;
>> +import java.util.Set;
>> +
>> +import org.apache.http.NoHttpResponseException;
>> +import org.apache.http.client.HttpClient;
>> +import org.apache.http.conn.ConnectTimeoutException;
>> +import org.apache.lucene.index.LeafReader;
>> +import org.apache.lucene.index.LeafReaderContext;
>> +import org.apache.lucene.queries.function.FunctionValues;
>> +import org.apache.lucene.queries.function.ValueSource;
>> +import org.apache.lucene.util.Bits;
>> +import org.apache.lucene.util.BytesRef;
>> +import org.apache.solr.client.solrj.SolrServerException;
>> +import org.apache.solr.cloud.ZkController;
>> +import org.apache.solr.common.SolrException;
>> +import org.apache.solr.common.SolrInputDocument;
>> +import org.apache.solr.common.params.ModifiableSolrParams;
>> +import org.apache.solr.common.util.Hash;
>> +import org.apache.solr.common.util.NamedList;
>> +import org.apache.solr.common.util.StrUtils;
>> +import org.apache.solr.core.SolrCore;
>> +import org.apache.solr.handler.component.HttpShardHandlerFactory;
>> +import org.apache.solr.handler.component.ShardHandler;
>> +import org.apache.solr.handler.component.ShardHandlerFactory;
>> +import org.apache.solr.handler.component.ShardRequest;
>> +import org.apache.solr.handler.component.ShardResponse;
>> +import org.apache.solr.logging.MDCLoggingContext;
>> +import org.apache.solr.request.LocalSolrQueryRequest;
>> +import org.apache.solr.request.SolrQueryRequest;
>> +import org.apache.solr.response.SolrQueryResponse;
>> +import org.apache.solr.schema.SchemaField;
>> +import org.apache.solr.search.SolrIndexSearcher;
>> +import org.apache.solr.update.processor.UpdateRequestProcessor;
>> +import org.apache.solr.update.processor.UpdateRequestProcessorChain;
>> +import org.apache.solr.util.RefCounted;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +import static
>> org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
>> +import static
>> org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
>> +
>> +/** @lucene.internal */
>> +public class IndexFingerprint {
>> +  private static final Logger log =
>> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>> +
>> +  private long maxVersionSpecified;
>> +  private long maxVersionEncountered;
>> +  private long maxInHash;
>> +  private long versionsHash;
>> +  private long numVersions;
>> +  private long numDocs;
>> +  private long maxDoc;
>> +
>> +  public long getMaxVersionSpecified() {
>> +    return maxVersionSpecified;
>> +  }
>> +
>> +  public long getMaxVersionEncountered() {
>> +    return maxVersionEncountered;
>> +  }
>> +
>> +  public long getMaxInHash() {
>> +    return
>> maxInHash;
>> +  }
>> +
>> +  public long getVersionsHash() {
>> +    return versionsHash;
>> +  }
>> +
>> +  public long getNumVersions() {
>> +    return numVersions;
>> +  }
>> +
>> +  public long getNumDocs() {
>> +    return numDocs;
>> +  }
>> +
>> +  public long getMaxDoc() {
>> +    return maxDoc;
>> +  }
>> +
>> +  /** Opens a new realtime searcher and returns it's fingerprint */
>> +  public static IndexFingerprint getFingerprint(SolrCore core, long
>> maxVersion) throws IOException {
>> +    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
>> +    RefCounted<SolrIndexSearcher> newestSearcher =
>> core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
>> +    try {
>> +      return getFingerprint(newestSearcher.get(), maxVersion);
>> +    } finally {
>> +      if (newestSearcher != null) {
>> +        newestSearcher.decref();
>> +      }
>> +    }
>> +  }
>> +
>> +  public
>> static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long
>> maxVersion) throws IOException {
>> +    long start = System.currentTimeMillis();
>> +
>> +    SchemaField versionField =
>> VersionInfo.getAndCheckVersionField(searcher.getSchema());
>> +
>> +    IndexFingerprint f = new IndexFingerprint();
>> +    f.maxVersionSpecified = maxVersion;
>> +    f.maxDoc = searcher.maxDoc();
>> +
>> +    // TODO: this could be parallelized, or even cached per-segment if
>> performance becomes an issue
>> +    ValueSource vs = versionField.getType().getValueSource(versionField,
>> null);
>> +    Map funcContext = ValueSource.newContext(searcher);
>> +    vs.createWeight(funcContext, searcher);
>> +    for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves())
>> {
>> +      int maxDoc = ctx.reader().maxDoc();
>> +      f.numDocs += ctx.reader().numDocs();
>> +      Bits liveDocs = ctx.reader().getLiveDocs();
>> +      FunctionValues fv =
>> vs.getValues(funcContext, ctx);
>> +      for (int doc = 0; doc < maxDoc; doc++) {
>> +        if (liveDocs != null && !liveDocs.get(doc)) continue;
>> +        long v = fv.longVal(doc);
>> +        f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
>> +        if (v <= f.maxVersionSpecified) {
>> +          f.maxInHash = Math.max(v, f.maxInHash);
>> +          f.versionsHash += Hash.fmix64(v);
>> +          f.numVersions++;
>> +        }
>> +      }
>> +    }
>> +
>> +    long end = System.currentTimeMillis();
>> +    log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
>> +
>> +    return f;
>> +  }
>> +
>> +  /** returns 0 for equal, negative if f1 is less recent than f2,
>> positive if more recent */
>> +  public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
>> +    int cmp;
>> +
>> +    // NOTE: some way want number of docs in index to take
>> precedence over highest version (add-only systems for sure)
>> +
>> +    // if we're comparing all of the versions in the index, then go by
>> the highest encountered.
>> +    if (f1.maxVersionSpecified == Long.MAX_VALUE) {
>> +      cmp = Long.compare(f1.maxVersionEncountered,
>> f2.maxVersionEncountered);
>> +      if (cmp != 0) return cmp;
>> +    }
>> +
>> +    // Go by the highest version under the requested max.
>> +    cmp = Long.compare(f1.maxInHash, f2.maxInHash);
>> +    if (cmp != 0) return cmp;
>> +
>> +    // go by who has the most documents in the index
>> +    cmp = Long.compare(f1.numVersions, f2.numVersions);
>> +    if (cmp != 0) return cmp;
>> +
>> +    // both have same number of documents, so go by hash
>> +    cmp = Long.compare(f1.versionsHash, f2.versionsHash);
>> +    return cmp;
>> +  }
>> +
>> +  /**
>> +   * Create a generic object suitable for serializing with
>> ResponseWriters
>> +   */
>> +  public Object
>> toObject() {
>> +    Map<String,Object> map = new LinkedHashMap<>();
>> +    map.put("maxVersionSpecified", maxVersionSpecified);
>> +    map.put("maxVersionEncountered", maxVersionEncountered);
>> +    map.put("maxInHash", maxInHash);
>> +    map.put("versionsHash", versionsHash);
>> +    map.put("numVersions", numVersions);
>> +    map.put("numDocs", numDocs);
>> +    map.put("maxDoc", maxDoc);
>> +    return map;
>> +  }
>> +
>> +  private static long getLong(Object o, String key, long def) {
>> +    long v = def;
>> +
>> +    Object oval = null;
>> +    if (o instanceof Map) {
>> +      oval = ((Map)o).get(key);
>> +    } else if (o instanceof NamedList) {
>> +      oval = ((NamedList)o).get(key);
>> +    }
>> +
>> +    return oval != null ? ((Number)oval).longValue() : def;
>> +  }
>> +
>> +  /**
>> +   * Create an IndexFingerprint object from a deserialized generic object
>> (Map or NamedList)
>> +   */
>> +
>> public static IndexFingerprint fromObject(Object o) {
>> +    IndexFingerprint f = new IndexFingerprint();
>> +    f.maxVersionSpecified = getLong(o, "maxVersionSpecified",
>> Long.MAX_VALUE);
>> +    f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
>> +    f.maxInHash = getLong(o, "maxInHash", -1);
>> +    f.versionsHash = getLong(o, "versionsHash", -1);
>> +    f.numVersions = getLong(o, "numVersions", -1);
>> +    f.numDocs = getLong(o, "numDocs", -1);
>> +    f.maxDoc = getLong(o, "maxDoc", -1);
>> +    return f;
>> +  }
>> +
>> +  @Override
>> +  public String toString() {
>> +    return toObject().toString();
>> +  }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> ________________________________
>>
>> diff --git
>> a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> index c5ddaf0..dbc0091 100644
>> --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
>> @@ -68,6 +68,7 @@ public class PeerSync  {
>>    private UpdateLog ulog;
>>    private HttpShardHandlerFactory shardHandlerFactory;
>>    private ShardHandler shardHandler;
>> +  private List<SyncShardRequest> requests = new ArrayList<>();
>>
>>    private List<Long> startingVersions;
>>
>> @@ -76,8 +77,10 @@ public class PeerSync  {
>>    private Set<Long> requestedUpdateSet;
>>    private long ourLowThreshold;  // 20th percentile
>>    private long ourHighThreshold; // 80th percentile
>> +  private long ourHighest;  // currently just used for logging/debugging
>> purposes
>>    private final boolean cantReachIsSuccess;
>>    private final
>> boolean getNoVersionsIsSuccess;
>> +  private final boolean doFingerprint;
>>    private final HttpClient client;
>>    private final boolean onlyIfActive;
>>    private SolrCore core;
>> @@ -116,6 +119,8 @@ public class PeerSync  {
>>
>>    private static class SyncShardRequest extends ShardRequest {
>>      List<Long> reportedVersions;
>> +    IndexFingerprint fingerprint;
>> +    boolean doFingerprintComparison;
>>      List<Long> requestedUpdates;
>>      Exception updateException;
>>    }
>> @@ -125,16 +130,17 @@ public class PeerSync  {
>>    }
>>
>>    public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
>> -    this(core, replicas, nUpdates, cantReachIsSuccess,
>> getNoVersionsIsSuccess, false);
>> +    this(core, replicas, nUpdates, cantReachIsSuccess,
>> getNoVersionsIsSuccess, false, true);
>>    }
>>
>> -  public PeerSync(SolrCore
>> core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess,
>> boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
>> +  public PeerSync(SolrCore core, List<String> replicas, int nUpdates,
>> boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean
>> onlyIfActive, boolean doFingerprint) {
>>      this.core = core;
>>      this.replicas = replicas;
>>      this.nUpdates = nUpdates;
>>      this.maxUpdates = nUpdates;
>>      this.cantReachIsSuccess = cantReachIsSuccess;
>>      this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
>> +    this.doFingerprint = doFingerprint;
>>      this.client =
>> core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
>>      this.onlyIfActive = onlyIfActive;
>>
>> @@ -169,9 +175,8 @@ public class PeerSync  {
>>      return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
>>    }
>>
>> -  /** Returns true if peer sync was successful,
>> meaning that this core may not be considered to have the latest updates
>> -   *  when considering the last N updates between it and its peers.
>> -   *  A commit is not performed.
>> +  /** Returns true if peer sync was successful, meaning that this core
>> may be considered to have the latest updates.
>> +   * It does not mean that the remote replica is in sync with us.
>>     */
>>    public boolean sync() {
>>      if (ulog == null) {
>> @@ -210,7 +215,7 @@ public class PeerSync  {
>>
>>          ourLowThreshold = percentile(startingVersions, 0.8f);
>>          ourHighThreshold = percentile(startingVersions, 0.2f);
>> -
>> +
>>          // now make sure that the starting updates overlap our updates
>>          // there shouldn't be reorders, so any overlap will do.
>>
>> @@ -231,6 +236,7 @@ public class PeerSync  {
>>          }
>>
>>          ourUpdates = newList;
>> +
>> Collections.sort(ourUpdates, absComparator);
>>        } else {
>>
>>          if (ourUpdates.size() > 0) {
>> @@ -243,9 +249,10 @@ public class PeerSync  {
>>            return false;
>>          }
>>        }
>> -
>> +
>> +      ourHighest = ourUpdates.get(0);
>>        ourUpdateSet = new HashSet<>(ourUpdates);
>> -      requestedUpdateSet = new HashSet<>(ourUpdates);
>> +      requestedUpdateSet = new HashSet<>();
>>
>>        for (;;) {
>>          ShardResponse srsp = shardHandler.takeCompletedOrError();
>> @@ -257,9 +264,18 @@ public class PeerSync  {
>>            return false;
>>          }
>>        }
>> -
>> -      log.info(msg() + "DONE. sync succeeded");
>> -      return true;
>> +
>> +      // finish up any comparisons with other shards that we deferred
>> +      boolean success = true;
>> +      for (SyncShardRequest sreq : requests)
>> {
>> +        if (sreq.doFingerprintComparison) {
>> +          success = compareFingerprint(sreq);
>> +          if (!success) break;
>> +        }
>> +      }
>> +
>> +      log.info(msg() + "DONE. sync " + (success ? "succeeded" :
>> "failed"));
>> +      return success;
>>      } finally {
>>        MDCLoggingContext.clear();
>>      }
>> @@ -267,6 +283,7 @@ public class PeerSync  {
>>
>>    private void requestVersions(String replica) {
>>      SyncShardRequest sreq = new SyncShardRequest();
>> +    requests.add(sreq);
>>      sreq.purpose = 1;
>>      sreq.shards = new String[]{replica};
>>      sreq.actualShards = sreq.shards;
>> @@ -274,6 +291,7 @@ public class PeerSync  {
>>      sreq.params.set("qt","/get");
>>      sreq.params.set("distrib",false);
>>      sreq.params.set("getVersions",nUpdates);
>> +    sreq.params.set("fingerprint",doFingerprint);
>>      shardHandler.submit(sreq, replica,
>> sreq.params);
>>    }
>>
>> @@ -355,7 +373,12 @@ public class PeerSync  {
>>      SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
>>      sreq.reportedVersions =  otherVersions;
>>
>> -    log.info(msg() + " Received " + otherVersions.size() + " versions
>> from " + sreq.shards[0] );
>> +    Object fingerprint =
>> srsp.getSolrResponse().getResponse().get("fingerprint");
>> +
>> +    log.info(msg() + " Received " + otherVersions.size() + " versions
>> from " + sreq.shards[0] + " fingerprint:" + fingerprint );
>> +    if (fingerprint != null) {
>> +      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
>> +    }
>>
>>      if (otherVersions.size() == 0) {
>>        return getNoVersionsIsSuccess;
>> @@ -371,13 +394,14 @@ public class PeerSync  {
>>
>>      long otherHigh = percentile(otherVersions, .2f);
>>      long otherLow =
>> percentile(otherVersions, .8f);
>> +    long otherHighest = otherVersions.get(0);
>>
>>      if (ourHighThreshold < otherLow) {
>>        // Small overlap between version windows and ours is older
>>        // This means that we might miss updates if we attempted to use
>> this method.
>>        // Since there exists just one replica that is so much newer, we
>> must
>>        // fail the sync.
>> -      log.info(msg() + " Our versions are too old.
>> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
>> +      log.info(msg() + " Our versions are too old.
>> ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + "
>> ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
>>        return false;
>>      }
>>
>> @@ -385,7 +409,10 @@ public class PeerSync  {
>>        // Small overlap between windows and ours is newer.
>>        // Using this list to sync would
>> result in requesting/replaying results we don't need
>>        // and possibly bringing deleted docs back to life.
>> -      log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
>> +      log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest="
>> + ourHighest + " otherHighest=" + otherHighest);
>> +
>> +      // Because our versions are newer, IndexFingerprint with the remote
>> would not match us.
>> +      // We return true on our side, but the remote peersync with us
>> should fail.
>>        return true;
>>      }
>>
>> @@ -408,9 +435,15 @@ public class PeerSync  {
>>      sreq.requestedUpdates = toRequest;
>>
>>      if (toRequest.isEmpty()) {
>> -      log.info(msg() + " Our versions are newer.
>> ourLowThreshold="+ourLowThreshold + "
>> otherHigh="+otherHigh);
>> +      log.info(msg() + " No additional versions requested.
>> ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest="
>> + ourHighest + " otherHighest=" + otherHighest);
>>
>>        // we had (or already requested) all the updates referenced by the
>> replica
>> +
>> +      // If we requested updates from another replica, we can't compare
>> fingerprints yet with this replica, we need to defer
>> +      if (doFingerprint) {
>> +        sreq.doFingerprintComparison = true;
>> +      }
>> +
>>        return true;
>>      }
>>
>> @@ -422,6 +455,19 @@ public class PeerSync  {
>>      return requestUpdates(srsp, toRequest);
>>    }
>>
>> +  private boolean compareFingerprint(SyncShardRequest sreq) {
>> +    if (sreq.fingerprint == null) return true;
>> +    try {
>> +      IndexFingerprint ourFingerprint =
>> IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
>> +
>> int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
>> +      log.info("Fingerprint comparison: " + cmp);
>> +      return cmp == 0;  // currently, we only check for equality...
>> +    } catch(IOException e){
>> +      log.error(msg() + "Error getting index fingerprint", e);
>> +      return false;
>> +    }
>> +  }
>> +
>>    private boolean requestUpdates(ShardResponse srsp, List<Long>
>> toRequest) {
>>      String replica = srsp.getShardRequest().shards[0];
>>
>> @@ -556,7 +602,7 @@ public class PeerSync  {
>>        }
>>      }
>>
>> -    return true;
>> +    return compareFingerprint(sreq);
>>    }
>>
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> ________________________________
>>
>> diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> index 7d7d3ef..7e3fc9f 100644
>> --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
>> @@ -283,7 +283,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>      if (debug) {
>>        log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing
>> tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
>>      }
>> -
>> +
>>      TransactionLog oldLog = null;
>>      for (String oldLogName : tlogFiles) {
>>        File f = new File(tlogDir, oldLogName);
>> @@ -334,11 +334,11 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>      }
>>
>>    }
>> -
>> +
>>    public String getLogDir() {
>>      return tlogDir.getAbsolutePath();
>>    }
>> -
>> +
>>    public List<Long>
>> getStartingVersions() {
>>      return startingVersions;
>>    }
>> @@ -503,32 +503,35 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
>>          // given that we just did a delete-by-query, we don't know what
>> documents were
>>          // affected and hence we must purge our caches.
>> -        if (map != null) map.clear();
>> -        if (prevMap != null) prevMap.clear();
>> -        if (prevMap2 != null) prevMap2.clear();
>> -
>> +        openRealtimeSearcher();
>>          trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
>>
>> -        // oldDeletes.clear();
>> -
>> -        // We must cause a new IndexReader to be opened before anything
>> looks at these caches again
>> -        // so that a cache miss will read fresh data.
>> -        //
>> -        // TODO: FUTURE: open a new searcher lazily for better throughput
>> with delete-by-query commands
>> -
>>    try {
>> -          RefCounted<SolrIndexSearcher> holder =
>> uhandler.core.openNewSearcher(true, true);
>> -          holder.decref();
>> -        } catch (Exception e) {
>> -          SolrException.log(log, "Error opening realtime searcher for
>> deleteByQuery", e);
>> +        if (trace) {
>> +          LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>> +          log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>> tlog + " " + ptr + " map=" + System.identityHashCode(map));
>>          }
>> -
>>        }
>> +    }
>> +  }
>>
>> -      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
>> -
>> -      if (trace) {
>> -        log.trace("TLOG: added deleteByQuery " + cmd.query + " to " +
>> tlog + " " + ptr + " map=" + System.identityHashCode(map));
>> +  /** Opens a new realtime searcher and clears the id caches.
>> +   * This may also be called when we updates are being buffered (from
>> PeerSync/IndexFingerprint)
>> +   */
>> +
>> public void openRealtimeSearcher() {
>> +    synchronized (this) {
>> +      // We must cause a new IndexReader to be opened before anything
>> looks at these caches again
>> +      // so that a cache miss will read fresh data.
>> +      try {
>> +        RefCounted<SolrIndexSearcher> holder =
>> uhandler.core.openNewSearcher(true, true);
>> +        holder.decref();
>> +      } catch (Exception e) {
>> +        SolrException.log(log, "Error opening realtime searcher", e);
>> +        return;
>>        }
>> +
>> +      if (map != null) map.clear();
>> +      if (prevMap != null) prevMap.clear();
>> +      if (prevMap2 != null) prevMap2.clear();
>>      }
>>    }
>>
>> @@ -620,7 +623,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>    public boolean hasUncommittedChanges() {
>>      return tlog != null;
>>    }
>> -
>> +
>>    public void preCommit(CommitUpdateCommand cmd) {
>>      synchronized (this) {
>>
>>   if (debug) {
>> @@ -878,11 +881,11 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        theLog.forceClose();
>>      }
>>    }
>> -
>> +
>>    public void close(boolean committed) {
>>      close(committed, false);
>>    }
>> -
>> +
>>    public void close(boolean committed, boolean deleteOnClose) {
>>      synchronized (this) {
>>        recoveryExecutor.shutdown(); // no new tasks
>> @@ -923,7 +926,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        this.id = id;
>>      }
>>    }
>> -
>> +
>>    public class RecentUpdates implements Closeable {
>>
>>      final Deque<TransactionLog> logList;    // newest first
>> @@ -950,17 +953,17 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>
>>      public List<Long> getVersions(int n) {
>>        List<Long> ret = new ArrayList<>(n);
>> -
>> +
>>        for
>> (List<Update> singleList : updateList) {
>>          for (Update ptr : singleList) {
>>            ret.add(ptr.version);
>>            if (--n <= 0) return ret;
>>          }
>>        }
>> -
>> +
>>        return ret;
>>      }
>> -
>> +
>>      public Object lookup(long version) {
>>        Update update = updates.get(version);
>>        if (update == null) return null;
>> @@ -1004,7 +1007,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>              try {
>>                o = reader.next();
>>                if (o==null) break;
>> -
>> +
>>                // should currently be a List<Oper,Ver,Doc/Id>
>>                List entry = (List)o;
>>
>> @@ -1027,13 +1030,13 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>
>>                    updatesForLog.add(update);
>>                    updates.put(version, update);
>> -
>> +
>>
>>                  if (oper == UpdateLog.DELETE_BY_QUERY) {
>>                      deleteByQueryList.add(update);
>>                    } else if (oper == UpdateLog.DELETE) {
>>                      deleteList.add(new DeleteUpdate(version,
>> (byte[])entry.get(2)));
>>                    }
>> -
>> +
>>                    break;
>>
>>                  case UpdateLog.COMMIT:
>> @@ -1063,7 +1066,7 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        }
>>
>>      }
>> -
>> +
>>      @Override
>>      public void close() {
>>        for (TransactionLog log : logList) {
>> @@ -1331,10 +1334,10 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>                          "log replay status {} active={} starting pos={}
>> current pos={} current size={} % read={}",
>>                          translog, activeLog,
>> recoveryInfo.positionOfStart, cpos, csize,
>>                          Math.floor(cpos /
>> (double) csize * 100.));
>> -
>> +
>>                }
>>              }
>> -
>> +
>>              o = null;
>>              o = tlogReader.next();
>>              if (o == null && activeLog) {
>> @@ -1513,25 +1516,25 @@ public class UpdateLog implements
>> PluginInfoInitialized {
>>        }
>>      }
>>    }
>> -
>> +
>>    protected String getTlogDir(SolrCore core, PluginInfo info) {
>>      String dataDir = (String) info.initArgs.get("dir");
>> -
>> +
>>      String ulogDir = core.getCoreDescriptor().getUlogDir();
>>      if (ulogDir != null) {
>>        dataDir = ulogDir;
>>      }
>> -
>> +
>>      if (dataDir == null || dataDir.length() == 0) {
>>        dataDir = core.getDataDir();
>>      }
>>
>>      return dataDir + "/" + TLOG_NAME;
>>    }
>> -
>> +
>>    /**
>>     * Clears the logs on the file system. Only call before init.
>> -   *
>> +   *
>>
>>  * @param core the SolrCore
>>     * @param ulogPluginInfo the init info for the UpdateHandler
>>     */
>>
>>
>> http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ff83a400/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> ________________________________
>>
>> diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> index 8083ad0..bcaf846 100644
>> --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
>> @@ -168,6 +168,29 @@ public class PeerSyncTest extends
>> BaseDistributedSearchTestCase {
>>
>>      assertSync(client1, numVersions, true, shardsArr[0]);
>>      client0.commit(); client1.commit(); queryAndCompare(params("q",
>> "*:*", "sort","_version_ desc"), client0,
>> client1);
>> +
>> +    // now lets check fingerprinting causes appropriate fails
>> +    v = 4000;
>> +    add(client0, seenLeader,
>> sdoc("id",Integer.toString((int)v),"_version_",v));
>> +    toAdd = numVersions+10;
>> +    for (int i=0; i<toAdd; i++) {
>> +      add(client0, seenLeader,
>> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>> +      add(client1, seenLeader,
>> sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
>> +    }
>> +
>> +    // client0 now has an additional add beyond our window and the
>> fingerprint should cause this to fail
>> +    assertSync(client1, numVersions, false, shardsArr[0]);
>> +
>> +    // lets add the missing document and verify that order doesn't matter
>> +    add(client1, seenLeader,
>> sdoc("id",Integer.toString((int)v),"_version_",v));
>> +    assertSync(client1, numVersions, true, shardsArr[0]);
>> +
>> +    // lets do some overwrites to ensure that repeated updates and maxDoc
>> don't
>> matter
>> +    for (int i=0; i<10; i++) {
>> +      add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i +
>> 1), "_version_", v + i + 1));
>> +    }
>> +    assertSync(client1, numVersions, true, shardsArr[0]);
>> +
>>    }
>>
>>
>>
>
> --
> Uwe Schindler
> H.-H.-Meier-Allee 63, 28213 Bremen
> http://www.thetaphi.de

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to