Thanks all, I have now added the changes entry.

On Wed, Mar 4, 2015 at 10:23 PM, Chris Hostetter <hossman_luc...@fucit.org>
wrote:

>
> : The change had no functional impact, hence left it alone.
> :
> : But happy to follow whatever is the existing practice. Should I have one
> : for every change?
>
> anything non trivial should be noted in CHANGES.txt - the "Other
> Changes" section is good fit for internal refacotrings that don't fix any
> bugs, but also don't add any features.
>
>
> :
> : On Wed, Mar 4, 2015 at 8:29 PM, Alan Woodward <a...@flax.co.uk> wrote:
> :
> : > Hi Ram, I think you missed a CHANGES.txt entry on this one?
> : >
> : > Alan Woodward
> : > www.flax.co.uk
> : >
> : >
> : > On 4 Mar 2015, at 19:45, andyetitmo...@apache.org wrote:
> : >
> : > Author: andyetitmoves
> : > Date: Wed Mar  4 19:45:09 2015
> : > New Revision: 1664126
> : >
> : > URL: http://svn.apache.org/r1664126
> : > Log:
> : > SOLR-6804: Untangle SnapPuller and ReplicationHandler
> : >
> : > This closes #110
> : >
> : > Added:
> : >
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
> : >      - copied, changed from r1663969,
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
> : > Removed:
> : >
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
> : > Modified:
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
> : >
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
> : >
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
> : >    lucene/dev/trunk/solr/core/src/test-files/log4j.properties
> : >
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java
> : >
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
> : >
> : >
> lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java
> : >
> : > Modified:
> : > lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
> : > (original)
> : > +++
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -83,9 +83,9 @@ import org.apache.solr.common.util.IOUti
> : > import org.apache.solr.common.util.NamedList;
> : > import org.apache.solr.common.util.SimpleOrderedMap;
> : > import org.apache.solr.core.DirectoryFactory.DirContext;
> : > +import org.apache.solr.handler.IndexFetcher;
> : > import org.apache.solr.handler.ReplicationHandler;
> : > import org.apache.solr.handler.RequestHandlerBase;
> : > -import org.apache.solr.handler.SnapPuller;
> : > import org.apache.solr.handler.admin.ShowFileRequestHandler;
> : > import org.apache.solr.handler.component.DebugComponent;
> : > import org.apache.solr.handler.component.ExpandComponent;
> : > @@ -291,7 +291,7 @@ public final class SolrCore implements S
> : >       dir = getDirectoryFactory().get(getDataDir(),
> DirContext.META_DATA,
> : > getSolrConfig().indexConfig.lockType);
> : >       IndexInput input;
> : >       try {
> : > -        input = dir.openInput(SnapPuller.INDEX_PROPERTIES,
> : > IOContext.DEFAULT);
> : > +        input = dir.openInput(IndexFetcher.INDEX_PROPERTIES,
> : > IOContext.DEFAULT);
> : >       } catch (FileNotFoundException | NoSuchFileException e) {
> : >         input = null;
> : >       }
> : > @@ -307,7 +307,7 @@ public final class SolrCore implements S
> : >           }
> : >
> : >         } catch (Exception e) {
> : > -          log.error("Unable to load " + SnapPuller.INDEX_PROPERTIES,
> e);
> : > +          log.error("Unable to load " +
> IndexFetcher.INDEX_PROPERTIES, e);
> : >         } finally {
> : >           IOUtils.closeQuietly(is);
> : >         }
> : >
> : > Copied:
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
> : > (from r1663969,
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java)
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?p2=lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java&p1=lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java&r1=1663969&r2=1664126&rev=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
> : > (original)
> : > +++
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -67,11 +67,7 @@ import java.util.concurrent.ExecutionExc
> : > import java.util.concurrent.ExecutorService;
> : > import java.util.concurrent.Executors;
> : > import java.util.concurrent.Future;
> : > -import java.util.concurrent.ScheduledExecutorService;
> : > import java.util.concurrent.TimeUnit;
> : > -import java.util.concurrent.atomic.AtomicBoolean;
> : > -import java.util.regex.Matcher;
> : > -import java.util.regex.Pattern;
> : > import java.util.zip.Adler32;
> : > import java.util.zip.Checksum;
> : > import java.util.zip.InflaterInputStream;
> : > @@ -94,7 +90,6 @@ import org.apache.solr.common.SolrExcept
> : > import org.apache.solr.common.SolrException.ErrorCode;
> : > import org.apache.solr.common.params.CommonParams;
> : > import org.apache.solr.common.params.ModifiableSolrParams;
> : > -import org.apache.solr.common.util.ExecutorUtil;
> : > import org.apache.solr.common.util.FastInputStream;
> : > import org.apache.solr.common.util.NamedList;
> : > import org.apache.solr.core.DirectoryFactory;
> : > @@ -121,24 +116,16 @@ import org.slf4j.LoggerFactory;
> : >  *
> : >  * @since solr 1.4
> : >  */
> : > -public class SnapPuller {
> : > +public class IndexFetcher {
> : >   private static final int _100K = 100000;
> : >
> : >   public static final String INDEX_PROPERTIES = "index.properties";
> : >
> : > -  private static final Logger LOG =
> : > LoggerFactory.getLogger(SnapPuller.class.getName());
> : > +  private static final Logger LOG =
> : > LoggerFactory.getLogger(IndexFetcher.class.getName());
> : >
> : >   private final String masterUrl;
> : >
> : > -  private final ReplicationHandler replicationHandler;
> : > -
> : > -  private final Integer pollInterval;
> : > -
> : > -  private String pollIntervalStr;
> : > -
> : > -  private ScheduledExecutorService executorService;
> : > -
> : > -  private volatile long executorStartTime;
> : > +  final ReplicationHandler replicationHandler;
> : >
> : >   private volatile long replicationStartTime;
> : >
> : > @@ -166,11 +153,6 @@ public class SnapPuller {
> : >
> : >   private boolean useExternal = false;
> : >
> : > -  /**
> : > -   * Disable the timer task for polling
> : > -   */
> : > -  private AtomicBoolean pollDisabled = new AtomicBoolean(false);
> : > -
> : >   private final HttpClient myHttpClient;
> : >
> : >   private static HttpClient createHttpClient(SolrCore core, String
> : > connTimeout, String readTimeout, String httpBasicAuthUser, String
> : > httpBasicAuthPassword, boolean useCompression) {
> : > @@ -184,7 +166,7 @@ public class SnapPuller {
> : >     return HttpClientUtil.createClient(httpClientParams,
> : >
> core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
> : >   }
> : >
> : > -  public SnapPuller(final NamedList initArgs, final ReplicationHandler
> : > handler, final SolrCore sc) {
> : > +  public IndexFetcher(final NamedList initArgs, final
> ReplicationHandler
> : > handler, final SolrCore sc) {
> : >     solrCore = sc;
> : >     String masterUrl = (String) initArgs.get(MASTER_URL);
> : >     if (masterUrl == null)
> : > @@ -197,8 +179,6 @@ public class SnapPuller {
> : >     this.masterUrl = masterUrl;
> : >
> : >     this.replicationHandler = handler;
> : > -    pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
> : > -    pollInterval = readInterval(pollIntervalStr);
> : >     String compress = (String) initArgs.get(COMPRESSION);
> : >     useInternal = INTERNAL.equals(compress);
> : >     useExternal = EXTERNAL.equals(compress);
> : > @@ -207,35 +187,6 @@ public class SnapPuller {
> : >     String httpBasicAuthUser = (String)
> : > initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
> : >     String httpBasicAuthPassword = (String)
> : > initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
> : >     myHttpClient = createHttpClient(solrCore, connTimeout, readTimeout,
> : > httpBasicAuthUser, httpBasicAuthPassword, useExternal);
> : > -    if (pollInterval != null && pollInterval > 0) {
> : > -      startExecutorService();
> : > -    } else {
> : > -      LOG.info(" No value set for 'pollInterval'. Timer Task not
> : > started.");
> : > -    }
> : > -  }
> : > -
> : > -  private void startExecutorService() {
> : > -    Runnable task = new Runnable() {
> : > -      @Override
> : > -      public void run() {
> : > -        if (pollDisabled.get()) {
> : > -          LOG.info("Poll disabled");
> : > -          return;
> : > -        }
> : > -        try {
> : > -          LOG.debug("Polling for index modifications");
> : > -          executorStartTime = System.currentTimeMillis();
> : > -          replicationHandler.doFetch(null, false);
> : > -        } catch (Exception e) {
> : > -          LOG.error("Exception in fetching index", e);
> : > -        }
> : > -      }
> : > -    };
> : > -    executorService = Executors.newSingleThreadScheduledExecutor(
> : > -        new DefaultSolrThreadFactory("snapPuller"));
> : > -    long initialDelay = pollInterval - (System.currentTimeMillis() %
> : > pollInterval);
> : > -    executorService.scheduleAtFixedRate(task, initialDelay,
> pollInterval,
> : > TimeUnit.MILLISECONDS);
> : > -    LOG.info("Poll Scheduled at an interval of " + pollInterval +
> "ms");
> : >   }
> : >
> : >   /**
> : > @@ -427,13 +378,13 @@ public class SnapPuller {
> : >               Thread.sleep(1000);
> : >               c++;
> : >               if (c >= 30)  {
> : > -                LOG.warn("SnapPuller unable to cleanup unused lucene
> : > index files so we must do a full copy instead");
> : > +                LOG.warn("IndexFetcher unable to cleanup unused lucene
> : > index files so we must do a full copy instead");
> : >                 isFullCopyNeeded = true;
> : >                 break;
> : >               }
> : >             }
> : >             if (c > 0)  {
> : > -              LOG.info("SnapPuller slept for " + (c * 1000) + "ms for
> : > unused lucene index files to be delete-able");
> : > +              LOG.info("IndexFetcher slept for " + (c * 1000) + "ms
> for
> : > unused lucene index files to be delete-able");
> : >             }
> : >           } finally {
> : >             writer.decref();
> : > @@ -634,7 +585,7 @@ public class SnapPuller {
> : >         props.setProperty(TIMES_CONFIG_REPLICATED,
> : > String.valueOf(confFilesCount));
> : >       }
> : >
> : > -      props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED,
> : > String.valueOf(getTotalBytesDownloaded(this)));
> : > +      props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED,
> : > String.valueOf(getTotalBytesDownloaded()));
> : >       if (!successfulInstall) {
> : >         int numFailures = 1;
> : >         if (props.containsKey(TIMES_FAILED)) {
> : > @@ -663,20 +614,20 @@ public class SnapPuller {
> : >     }
> : >   }
> : >
> : > -  static long getTotalBytesDownloaded(SnapPuller snappuller) {
> : > +  long getTotalBytesDownloaded() {
> : >     long bytesDownloaded = 0;
> : >     //get size from list of files to download
> : > -    for (Map<String, Object> file : snappuller.getFilesDownloaded()) {
> : > +    for (Map<String, Object> file : getFilesDownloaded()) {
> : >       bytesDownloaded += (Long) file.get(SIZE);
> : >     }
> : >
> : >     //get size from list of conf files to download
> : > -    for (Map<String, Object> file :
> snappuller.getConfFilesDownloaded()) {
> : > +    for (Map<String, Object> file : getConfFilesDownloaded()) {
> : >       bytesDownloaded += (Long) file.get(SIZE);
> : >     }
> : >
> : >     //get size from current file being downloaded
> : > -    Map<String, Object> currentFile = snappuller.getCurrentFile();
> : > +    Map<String, Object> currentFile = getCurrentFile();
> : >     if (currentFile != null) {
> : >       if (currentFile.containsKey("bytesDownloaded")) {
> : >         bytesDownloaded += (Long) currentFile.get("bytesDownloaded");
> : > @@ -1053,33 +1004,33 @@ public class SnapPuller {
> : >     Directory dir = null;
> : >     try {
> : >       dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(),
> : > DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
> : > -      if (slowFileExists(dir, SnapPuller.INDEX_PROPERTIES)){
> : > -        final IndexInput input =
> : > dir.openInput(SnapPuller.INDEX_PROPERTIES,
> : > DirectoryFactory.IOCONTEXT_NO_CACHE);
> : > +      if (slowFileExists(dir, IndexFetcher.INDEX_PROPERTIES)){
> : > +        final IndexInput input =
> : > dir.openInput(IndexFetcher.INDEX_PROPERTIES,
> : > DirectoryFactory.IOCONTEXT_NO_CACHE);
> : >
> : >         final InputStream is = new PropertiesInputStream(input);
> : >         try {
> : >           p.load(new InputStreamReader(is, StandardCharsets.UTF_8));
> : >         } catch (Exception e) {
> : > -          LOG.error("Unable to load " + SnapPuller.INDEX_PROPERTIES,
> e);
> : > +          LOG.error("Unable to load " +
> IndexFetcher.INDEX_PROPERTIES, e);
> : >         } finally {
> : >           IOUtils.closeQuietly(is);
> : >         }
> : >       }
> : >       try {
> : > -        dir.deleteFile(SnapPuller.INDEX_PROPERTIES);
> : > +        dir.deleteFile(IndexFetcher.INDEX_PROPERTIES);
> : >       } catch (IOException e) {
> : >         // no problem
> : >       }
> : > -      final IndexOutput out =
> : > dir.createOutput(SnapPuller.INDEX_PROPERTIES,
> : > DirectoryFactory.IOCONTEXT_NO_CACHE);
> : > +      final IndexOutput out =
> : > dir.createOutput(IndexFetcher.INDEX_PROPERTIES,
> : > DirectoryFactory.IOCONTEXT_NO_CACHE);
> : >       p.put("index", tmpIdxDirName);
> : >       Writer os = null;
> : >       try {
> : >         os = new OutputStreamWriter(new PropertiesOutputStream(out),
> : > StandardCharsets.UTF_8);
> : > -        p.store(os, SnapPuller.INDEX_PROPERTIES);
> : > +        p.store(os, IndexFetcher.INDEX_PROPERTIES);
> : >         dir.sync(Collections.singleton(INDEX_PROPERTIES));
> : >       } catch (Exception e) {
> : >         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> : > -            "Unable to write " + SnapPuller.INDEX_PROPERTIES, e);
> : > +            "Unable to write " + IndexFetcher.INDEX_PROPERTIES, e);
> : >       } finally {
> : >         IOUtils.closeQuietly(os);
> : >       }
> : > @@ -1161,25 +1112,9 @@ public class SnapPuller {
> : >   }
> : >
> : >   /**
> : > -   * Disable periodic polling
> : > -   */
> : > -  void disablePoll() {
> : > -    pollDisabled.set(true);
> : > -    LOG.info("inside disable poll, value of pollDisabled = " +
> : > pollDisabled);
> : > -  }
> : > -
> : > -  /**
> : > -   * Enable periodic polling
> : > -   */
> : > -  void enablePoll() {
> : > -    pollDisabled.set(false);
> : > -    LOG.info("inside enable poll, value of pollDisabled = " +
> : > pollDisabled);
> : > -  }
> : > -
> : > -  /**
> : > -   * Stops the ongoing pull
> : > +   * Stops the ongoing fetch
> : >    */
> : > -  void abortPull() {
> : > +  void abortFetch() {
> : >     stop = true;
> : >   }
> : >
> : > @@ -1187,6 +1122,13 @@ public class SnapPuller {
> : >     return replicationStartTime;
> : >   }
> : >
> : > +  long getReplicationTimeElapsed() {
> : > +    long timeElapsed = 0;
> : > +    if (getReplicationStartTime() > 0)
> : > +      timeElapsed =
> TimeUnit.SECONDS.convert(System.currentTimeMillis() -
> : > getReplicationStartTime(), TimeUnit.MILLISECONDS);
> : > +    return timeElapsed;
> : > +  }
> : > +
> : >   List<Map<String, Object>> getConfFilesToDownload() {
> : >     //make a copy first because it can be null later
> : >     List<Map<String, Object>> tmp = confFilesToDownload;
> : > @@ -1224,17 +1166,6 @@ public class SnapPuller {
> : >     return tmp;
> : >   }
> : >
> : > -  boolean isPollingDisabled() {
> : > -    return pollDisabled.get();
> : > -  }
> : > -
> : > -  Long getNextScheduledExecTime() {
> : > -    Long nextTime = null;
> : > -    if (executorStartTime > 0)
> : > -      nextTime = executorStartTime + pollInterval;
> : > -    return nextTime;
> : > -  }
> : > -
> : >   private static class ReplicationHandlerException extends
> : > InterruptedException {
> : >     public ReplicationHandlerException(String message) {
> : >       super(message);
> : > @@ -1586,55 +1517,14 @@ public class SnapPuller {
> : >     }
> : >   }
> : >
> : > -  static Integer readInterval(String interval) {
> : > -    if (interval == null)
> : > -      return null;
> : > -    int result = 0;
> : > -    Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
> : > -    if (m.find()) {
> : > -      String hr = m.group(1);
> : > -      String min = m.group(2);
> : > -      String sec = m.group(3);
> : > -      result = 0;
> : > -      try {
> : > -        if (sec != null && sec.length() > 0)
> : > -          result += Integer.parseInt(sec);
> : > -        if (min != null && min.length() > 0)
> : > -          result += (60 * Integer.parseInt(min));
> : > -        if (hr != null && hr.length() > 0)
> : > -          result += (60 * 60 * Integer.parseInt(hr));
> : > -        result *= 1000;
> : > -      } catch (NumberFormatException e) {
> : > -        throw new SolrException(ErrorCode.SERVER_ERROR,
> INTERVAL_ERR_MSG);
> : > -      }
> : > -    } else {
> : > -      throw new SolrException(ErrorCode.SERVER_ERROR,
> INTERVAL_ERR_MSG);
> : > -    }
> : > -
> : > -    return result;
> : > -  }
> : > -
> : >   public void destroy() {
> : > -    try {
> : > -      if (executorService != null) executorService.shutdown();
> : > -    } finally {
> : > -      try {
> : > -        abortPull();
> : > -      } finally {
> : > -        if (executorService != null) ExecutorUtil
> : > -            .shutdownNowAndAwaitTermination(executorService);
> : > -      }
> : > -    }
> : > +    abortFetch();
> : >   }
> : >
> : >   String getMasterUrl() {
> : >     return masterUrl;
> : >   }
> : >
> : > -  String getPollInterval() {
> : > -    return pollIntervalStr;
> : > -  }
> : > -
> : >   private static final int MAX_RETRIES = 5;
> : >
> : >   private static final int NO_CONTENT = 1;
> : > @@ -1643,12 +1533,6 @@ public class SnapPuller {
> : >
> : >   public static final String REPLICATION_PROPERTIES =
> : > "replication.properties";
> : >
> : > -  public static final String POLL_INTERVAL = "pollInterval";
> : > -
> : > -  public static final String INTERVAL_ERR_MSG = "The " +
> POLL_INTERVAL +
> : > " must be in this format 'HH:mm:ss'";
> : > -
> : > -  private static final Pattern INTERVAL_PATTERN =
> Pattern.compile("(\\d*?
> : > ):(\\d*?):(\\d*)");
> : > -
> : >   static final String INDEX_REPLICATED_AT = "indexReplicatedAt";
> : >
> : >   static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";
> : >
> : > Modified:
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
> : > (original)
> : > +++
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -36,9 +36,13 @@ import java.util.HashMap;
> : > import java.util.List;
> : > import java.util.Map;
> : > import java.util.Properties;
> : > +import java.util.concurrent.Executors;
> : > +import java.util.concurrent.ScheduledExecutorService;
> : > import java.util.concurrent.TimeUnit;
> : > import java.util.concurrent.atomic.AtomicBoolean;
> : > import java.util.concurrent.locks.ReentrantLock;
> : > +import java.util.regex.Matcher;
> : > +import java.util.regex.Pattern;
> : > import java.util.zip.Adler32;
> : > import java.util.zip.Checksum;
> : > import java.util.zip.DeflaterOutputStream;
> : > @@ -60,6 +64,7 @@ import org.apache.solr.common.SolrExcept
> : > import org.apache.solr.common.params.CommonParams;
> : > import org.apache.solr.common.params.ModifiableSolrParams;
> : > import org.apache.solr.common.params.SolrParams;
> : > +import org.apache.solr.common.util.ExecutorUtil;
> : > import org.apache.solr.common.util.FastOutputStream;
> : > import org.apache.solr.common.util.NamedList;
> : > import org.apache.solr.common.util.SimpleOrderedMap;
> : > @@ -75,6 +80,7 @@ import org.apache.solr.request.SolrQuery
> : > import org.apache.solr.response.SolrQueryResponse;
> : > import org.apache.solr.search.SolrIndexSearcher;
> : > import org.apache.solr.update.SolrIndexWriter;
> : > +import org.apache.solr.util.DefaultSolrThreadFactory;
> : > import org.apache.solr.util.NumberUtils;
> : > import org.apache.solr.util.PropertiesInputStream;
> : > import org.apache.solr.util.RefCounted;
> : > @@ -90,8 +96,8 @@ import org.slf4j.LoggerFactory;
> : >  * file (command=filecontent&amp;file=&lt;FILE_NAME&gt;) You can
> : > optionally specify an offset and length to get that
> : >  * chunk of the file. You can request a configuration file by using
> "cf"
> : > parameter instead of the "file" parameter.</li>
> : >  * <li>Get status/statistics (command=details)</li> </ol> <p>When
> running
> : > on the slave, it provides the following
> : > - * commands <ol> <li>Perform a snap pull now (command=snappull)</li>
> : > <li>Get status/statistics (command=details)</li>
> : > - * <li>Abort a snap pull (command=abort)</li> <li>Enable/Disable
> polling
> : > the master for new versions (command=enablepoll
> : > + * commands <ol> <li>Perform an index fetch now
> (command=snappull)</li>
> : > <li>Get status/statistics (command=details)</li>
> : > + * <li>Abort an index fetch (command=abort)</li> <li>Enable/Disable
> : > polling the master for new versions (command=enablepoll
> : >  * or command=disablepoll)</li> </ol>
> : >  *
> : >  *
> : > @@ -134,9 +140,9 @@ public class ReplicationHandler extends
> : >     }
> : >   }
> : >
> : > -  private SnapPuller snapPuller;
> : > +  private IndexFetcher pollingIndexFetcher;
> : >
> : > -  private ReentrantLock snapPullLock = new ReentrantLock();
> : > +  private ReentrantLock indexFetchLock = new ReentrantLock();
> : >
> : >   private String includeConfFiles;
> : >
> : > @@ -151,14 +157,18 @@ public class ReplicationHandler extends
> : >   private boolean replicateOnCommit = false;
> : >
> : >   private boolean replicateOnStart = false;
> : > -
> : > +
> : > +  private ScheduledExecutorService executorService;
> : > +
> : > +  private volatile long executorStartTime;
> : > +
> : >   private int numberBackupsToKeep = 0; //zero: do not delete old
> backups
> : >
> : >   private int numTimesReplicated = 0;
> : >
> : >   private final Map<String, FileInfo> confFileInfoCache = new
> HashMap<>();
> : >
> : > -  private Integer reserveCommitDuration =
> : > SnapPuller.readInterval("00:00:10");
> : > +  private Integer reserveCommitDuration = readInterval("00:00:10");
> : >
> : >   volatile IndexCommit indexCommitPoint;
> : >
> : > @@ -166,6 +176,19 @@ public class ReplicationHandler extends
> : >
> : >   private AtomicBoolean replicationEnabled = new AtomicBoolean(true);
> : >
> : > +  private Integer pollInterval;
> : > +
> : > +  private String pollIntervalStr;
> : > +
> : > +  /**
> : > +   * Disable the timer task for polling
> : > +   */
> : > +  private AtomicBoolean pollDisabled = new AtomicBoolean(false);
> : > +
> : > +  String getPollInterval() {
> : > +    return pollIntervalStr;
> : > +  }
> : > +
> : >   @Override
> : >   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse
> : > rsp) throws Exception {
> : >     rsp.setHttpCaching(false);
> : > @@ -221,38 +244,38 @@ public class ReplicationHandler extends
> : >         return;
> : >       }
> : >       final SolrParams paramsCopy = new
> ModifiableSolrParams(solrParams);
> : > -      Thread puller = new Thread("explicit-fetchindex-cmd") {
> : > +      Thread fetchThread = new Thread("explicit-fetchindex-cmd") {
> : >         @Override
> : >         public void run() {
> : >           doFetch(paramsCopy, false);
> : >         }
> : >       };
> : > -      puller.setDaemon(false);
> : > -      puller.start();
> : > +      fetchThread.setDaemon(false);
> : > +      fetchThread.start();
> : >       if (solrParams.getBool(WAIT, false)) {
> : > -        puller.join();
> : > +        fetchThread.join();
> : >       }
> : >       rsp.add(STATUS, OK_STATUS);
> : >     } else if (command.equalsIgnoreCase(CMD_DISABLE_POLL)) {
> : > -      if (snapPuller != null){
> : > -        snapPuller.disablePoll();
> : > +      if (pollingIndexFetcher != null){
> : > +        disablePoll();
> : >         rsp.add(STATUS, OK_STATUS);
> : >       } else {
> : >         rsp.add(STATUS, ERR_STATUS);
> : >         rsp.add("message","No slave configured");
> : >       }
> : >     } else if (command.equalsIgnoreCase(CMD_ENABLE_POLL)) {
> : > -      if (snapPuller != null){
> : > -        snapPuller.enablePoll();
> : > +      if (pollingIndexFetcher != null){
> : > +        enablePoll();
> : >         rsp.add(STATUS, OK_STATUS);
> : >       }else {
> : >         rsp.add(STATUS,ERR_STATUS);
> : >         rsp.add("message","No slave configured");
> : >       }
> : >     } else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
> : > -      SnapPuller temp = tempSnapPuller;
> : > -      if (temp != null){
> : > -        temp.abortPull();
> : > +      IndexFetcher fetcher = currentIndexFetcher;
> : > +      if (fetcher != null){
> : > +        fetcher.abortFetch();
> : >         rsp.add(STATUS, OK_STATUS);
> : >       } else {
> : >         rsp.add(STATUS,ERR_STATUS);
> : > @@ -321,38 +344,35 @@ public class ReplicationHandler extends
> : >     return null;
> : >   }
> : >
> : > -  private volatile SnapPuller tempSnapPuller;
> : > +  private volatile IndexFetcher currentIndexFetcher;
> : >
> : >   public boolean doFetch(SolrParams solrParams, boolean
> forceReplication) {
> : >     String masterUrl = solrParams == null ? null :
> : > solrParams.get(MASTER_URL);
> : > -    if (!snapPullLock.tryLock())
> : > +    if (!indexFetchLock.tryLock())
> : >       return false;
> : >     try {
> : >       if (masterUrl != null) {
> : > -        if (tempSnapPuller != null && tempSnapPuller != snapPuller) {
> : > -          tempSnapPuller.destroy();
> : > +        if (currentIndexFetcher != null && currentIndexFetcher !=
> : > pollingIndexFetcher) {
> : > +          currentIndexFetcher.destroy();
> : >         }
> : > -
> : > -        NamedList<Object> nl = solrParams.toNamedList();
> : > -        nl.remove(SnapPuller.POLL_INTERVAL);
> : > -        tempSnapPuller = new SnapPuller(nl, this, core);
> : > +        currentIndexFetcher = new
> IndexFetcher(solrParams.toNamedList(),
> : > this, core);
> : >       } else {
> : > -        tempSnapPuller = snapPuller;
> : > +        currentIndexFetcher = pollingIndexFetcher;
> : >       }
> : > -      return tempSnapPuller.fetchLatestIndex(core, forceReplication);
> : > +      return currentIndexFetcher.fetchLatestIndex(core,
> forceReplication);
> : >     } catch (Exception e) {
> : > -      SolrException.log(LOG, "SnapPull failed ", e);
> : > +      SolrException.log(LOG, "Index fetch failed ", e);
> : >     } finally {
> : > -      if (snapPuller != null) {
> : > -        tempSnapPuller = snapPuller;
> : > +      if (pollingIndexFetcher != null) {
> : > +        currentIndexFetcher = pollingIndexFetcher;
> : >       }
> : > -      snapPullLock.unlock();
> : > +      indexFetchLock.unlock();
> : >     }
> : >     return false;
> : >   }
> : >
> : >   boolean isReplicating() {
> : > -    return snapPullLock.isLocked();
> : > +    return indexFetchLock.isLocked();
> : >   }
> : >
> : >   private void doSnapShoot(SolrParams params, SolrQueryResponse rsp,
> : > @@ -390,10 +410,10 @@ public class ReplicationHandler extends
> : >
> : >   /**
> : >    * This method adds an Object of FileStream to the response . The
> : > FileStream implements a custom protocol which is
> : > -   * understood by SnapPuller.FileFetcher
> : > +   * understood by IndexFetcher.FileFetcher
> : >    *
> : > -   * @see org.apache.solr.handler.SnapPuller.LocalFsFileFetcher
> : > -   * @see org.apache.solr.handler.SnapPuller.DirectoryFileFetcher
> : > +   * @see IndexFetcher.LocalFsFileFetcher
> : > +   * @see IndexFetcher.DirectoryFileFetcher
> : >    */
> : >   private void getFileStream(SolrParams solrParams, SolrQueryResponse
> rsp)
> : > {
> : >     ModifiableSolrParams rawParams = new
> ModifiableSolrParams(solrParams);
> : > @@ -538,18 +558,28 @@ public class ReplicationHandler extends
> : >   }
> : >
> : >   void disablePoll() {
> : > -    if (isSlave)
> : > -      snapPuller.disablePoll();
> : > +    if (isSlave) {
> : > +      pollDisabled.set(true);
> : > +      LOG.info("inside disable poll, value of pollDisabled = " +
> : > pollDisabled);
> : > +    }
> : >   }
> : >
> : >   void enablePoll() {
> : > -    if (isSlave)
> : > -      snapPuller.enablePoll();
> : > +    if (isSlave) {
> : > +      pollDisabled.set(false);
> : > +      LOG.info("inside enable poll, value of pollDisabled = " +
> : > pollDisabled);
> : > +    }
> : >   }
> : >
> : >   boolean isPollingDisabled() {
> : > -    if (snapPuller == null) return true;
> : > -    return snapPuller.isPollingDisabled();
> : > +    return pollDisabled.get();
> : > +  }
> : > +
> : > +  Long getNextScheduledExecTime() {
> : > +    Long nextTime = null;
> : > +    if (executorStartTime > 0)
> : > +      nextTime = executorStartTime + pollInterval;
> : > +    return nextTime;
> : >   }
> : >
> : >   int getTimesReplicatedSinceStartup() {
> : > @@ -611,31 +641,31 @@ public class ReplicationHandler extends
> : >       list.add("isMaster", String.valueOf(isMaster));
> : >       list.add("isSlave", String.valueOf(isSlave));
> : >
> : > -      SnapPuller snapPuller = tempSnapPuller;
> : > -      if (snapPuller != null) {
> : > -        list.add(MASTER_URL, snapPuller.getMasterUrl());
> : > -        if (snapPuller.getPollInterval() != null) {
> : > -          list.add(SnapPuller.POLL_INTERVAL,
> : > snapPuller.getPollInterval());
> : > +      IndexFetcher fetcher = currentIndexFetcher;
> : > +      if (fetcher != null) {
> : > +        list.add(MASTER_URL, fetcher.getMasterUrl());
> : > +        if (getPollInterval() != null) {
> : > +          list.add(POLL_INTERVAL, getPollInterval());
> : >         }
> : >         list.add("isPollingDisabled",
> String.valueOf(isPollingDisabled()));
> : >         list.add("isReplicating", String.valueOf(isReplicating()));
> : > -        long elapsed = getTimeElapsed(snapPuller);
> : > -        long val = SnapPuller.getTotalBytesDownloaded(snapPuller);
> : > +        long elapsed = fetcher.getReplicationTimeElapsed();
> : > +        long val = fetcher.getTotalBytesDownloaded();
> : >         if (elapsed > 0) {
> : >           list.add("timeElapsed", elapsed);
> : >           list.add("bytesDownloaded", val);
> : >           list.add("downloadSpeed", val / elapsed);
> : >         }
> : >         Properties props = loadReplicationProperties();
> : > -        addVal(list, SnapPuller.PREVIOUS_CYCLE_TIME_TAKEN, props,
> : > Long.class);
> : > -        addVal(list, SnapPuller.INDEX_REPLICATED_AT, props,
> Date.class);
> : > -        addVal(list, SnapPuller.CONF_FILES_REPLICATED_AT, props,
> : > Date.class);
> : > -        addVal(list, SnapPuller.REPLICATION_FAILED_AT, props,
> Date.class);
> : > -        addVal(list, SnapPuller.TIMES_FAILED, props, Integer.class);
> : > -        addVal(list, SnapPuller.TIMES_INDEX_REPLICATED, props,
> : > Integer.class);
> : > -        addVal(list, SnapPuller.LAST_CYCLE_BYTES_DOWNLOADED, props,
> : > Long.class);
> : > -        addVal(list, SnapPuller.TIMES_CONFIG_REPLICATED, props,
> : > Integer.class);
> : > -        addVal(list, SnapPuller.CONF_FILES_REPLICATED, props,
> : > String.class);
> : > +        addVal(list, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props,
> : > Long.class);
> : > +        addVal(list, IndexFetcher.INDEX_REPLICATED_AT, props,
> Date.class);
> : > +        addVal(list, IndexFetcher.CONF_FILES_REPLICATED_AT, props,
> : > Date.class);
> : > +        addVal(list, IndexFetcher.REPLICATION_FAILED_AT, props,
> : > Date.class);
> : > +        addVal(list, IndexFetcher.TIMES_FAILED, props, Integer.class);
> : > +        addVal(list, IndexFetcher.TIMES_INDEX_REPLICATED, props,
> : > Integer.class);
> : > +        addVal(list, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props,
> : > Long.class);
> : > +        addVal(list, IndexFetcher.TIMES_CONFIG_REPLICATED, props,
> : > Integer.class);
> : > +        addVal(list, IndexFetcher.CONF_FILES_REPLICATED, props,
> : > String.class);
> : >       }
> : >       if (isMaster) {
> : >         if (includeConfFiles != null) list.add("confFilesToReplicate",
> : > includeConfFiles);
> : > @@ -677,12 +707,12 @@ public class ReplicationHandler extends
> : >       master.add("replicableGeneration", repCommitInfo.generation);
> : >     }
> : >
> : > -    SnapPuller snapPuller = tempSnapPuller;
> : > -    if (snapPuller != null) {
> : > +    IndexFetcher fetcher = currentIndexFetcher;
> : > +    if (fetcher != null) {
> : >       Properties props = loadReplicationProperties();
> : >       if (showSlaveDetails) {
> : >         try {
> : > -          NamedList nl = snapPuller.getDetails();
> : > +          NamedList nl = fetcher.getDetails();
> : >           slave.add("masterDetails", nl.get(CMD_DETAILS));
> : >         } catch (Exception e) {
> : >           LOG.warn(
> : > @@ -691,26 +721,26 @@ public class ReplicationHandler extends
> : >           slave.add(ERR_STATUS, "invalid_master");
> : >         }
> : >       }
> : > -      slave.add(MASTER_URL, snapPuller.getMasterUrl());
> : > -      if (snapPuller.getPollInterval() != null) {
> : > -        slave.add(SnapPuller.POLL_INTERVAL,
> snapPuller.getPollInterval());
> : > +      slave.add(MASTER_URL, fetcher.getMasterUrl());
> : > +      if (getPollInterval() != null) {
> : > +        slave.add(POLL_INTERVAL, getPollInterval());
> : >       }
> : > -      if (snapPuller.getNextScheduledExecTime() != null &&
> : > !isPollingDisabled()) {
> : > -        slave.add(NEXT_EXECUTION_AT, new
> : > Date(snapPuller.getNextScheduledExecTime()).toString());
> : > +      if (getNextScheduledExecTime() != null && !isPollingDisabled())
> {
> : > +        slave.add(NEXT_EXECUTION_AT, new
> : > Date(getNextScheduledExecTime()).toString());
> : >       } else if (isPollingDisabled()) {
> : >         slave.add(NEXT_EXECUTION_AT, "Polling disabled");
> : >       }
> : > -      addVal(slave, SnapPuller.INDEX_REPLICATED_AT, props,
> Date.class);
> : > -      addVal(slave, SnapPuller.INDEX_REPLICATED_AT_LIST, props,
> : > List.class);
> : > -      addVal(slave, SnapPuller.REPLICATION_FAILED_AT_LIST, props,
> : > List.class);
> : > -      addVal(slave, SnapPuller.TIMES_INDEX_REPLICATED, props,
> : > Integer.class);
> : > -      addVal(slave, SnapPuller.CONF_FILES_REPLICATED, props,
> : > Integer.class);
> : > -      addVal(slave, SnapPuller.TIMES_CONFIG_REPLICATED, props,
> : > Integer.class);
> : > -      addVal(slave, SnapPuller.CONF_FILES_REPLICATED_AT, props,
> : > Integer.class);
> : > -      addVal(slave, SnapPuller.LAST_CYCLE_BYTES_DOWNLOADED, props,
> : > Long.class);
> : > -      addVal(slave, SnapPuller.TIMES_FAILED, props, Integer.class);
> : > -      addVal(slave, SnapPuller.REPLICATION_FAILED_AT, props,
> Date.class);
> : > -      addVal(slave, SnapPuller.PREVIOUS_CYCLE_TIME_TAKEN, props,
> : > Long.class);
> : > +      addVal(slave, IndexFetcher.INDEX_REPLICATED_AT, props,
> Date.class);
> : > +      addVal(slave, IndexFetcher.INDEX_REPLICATED_AT_LIST, props,
> : > List.class);
> : > +      addVal(slave, IndexFetcher.REPLICATION_FAILED_AT_LIST, props,
> : > List.class);
> : > +      addVal(slave, IndexFetcher.TIMES_INDEX_REPLICATED, props,
> : > Integer.class);
> : > +      addVal(slave, IndexFetcher.CONF_FILES_REPLICATED, props,
> : > Integer.class);
> : > +      addVal(slave, IndexFetcher.TIMES_CONFIG_REPLICATED, props,
> : > Integer.class);
> : > +      addVal(slave, IndexFetcher.CONF_FILES_REPLICATED_AT, props,
> : > Integer.class);
> : > +      addVal(slave, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props,
> : > Long.class);
> : > +      addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class);
> : > +      addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props,
> : > Date.class);
> : > +      addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props,
> : > Long.class);
> : >
> : >       slave.add("currentDate", new Date().toString());
> : >       slave.add("isPollingDisabled",
> String.valueOf(isPollingDisabled()));
> : > @@ -720,13 +750,13 @@ public class ReplicationHandler extends
> : >         try {
> : >           long bytesToDownload = 0;
> : >           List<String> filesToDownload = new ArrayList<>();
> : > -          for (Map<String, Object> file :
> : > snapPuller.getFilesToDownload()) {
> : > +          for (Map<String, Object> file :
> fetcher.getFilesToDownload()) {
> : >             filesToDownload.add((String) file.get(NAME));
> : >             bytesToDownload += (Long) file.get(SIZE);
> : >           }
> : >
> : >           //get list of conf files to download
> : > -          for (Map<String, Object> file :
> : > snapPuller.getConfFilesToDownload()) {
> : > +          for (Map<String, Object> file :
> : > fetcher.getConfFilesToDownload()) {
> : >             filesToDownload.add((String) file.get(NAME));
> : >             bytesToDownload += (Long) file.get(SIZE);
> : >           }
> : > @@ -737,18 +767,18 @@ public class ReplicationHandler extends
> : >
> : >           long bytesDownloaded = 0;
> : >           List<String> filesDownloaded = new ArrayList<>();
> : > -          for (Map<String, Object> file :
> : > snapPuller.getFilesDownloaded()) {
> : > +          for (Map<String, Object> file :
> fetcher.getFilesDownloaded()) {
> : >             filesDownloaded.add((String) file.get(NAME));
> : >             bytesDownloaded += (Long) file.get(SIZE);
> : >           }
> : >
> : >           //get list of conf files downloaded
> : > -          for (Map<String, Object> file :
> : > snapPuller.getConfFilesDownloaded()) {
> : > +          for (Map<String, Object> file :
> : > fetcher.getConfFilesDownloaded()) {
> : >             filesDownloaded.add((String) file.get(NAME));
> : >             bytesDownloaded += (Long) file.get(SIZE);
> : >           }
> : >
> : > -          Map<String, Object> currentFile =
> snapPuller.getCurrentFile();
> : > +          Map<String, Object> currentFile = fetcher.getCurrentFile();
> : >           String currFile = null;
> : >           long currFileSize = 0, currFileSizeDownloaded = 0;
> : >           float percentDownloaded = 0;
> : > @@ -767,10 +797,10 @@ public class ReplicationHandler extends
> : >
> : >           long estimatedTimeRemaining = 0;
> : >
> : > -          if (snapPuller.getReplicationStartTime() > 0) {
> : > -            slave.add("replicationStartTime", new
> : > Date(snapPuller.getReplicationStartTime()).toString());
> : > +          if (fetcher.getReplicationStartTime() > 0) {
> : > +            slave.add("replicationStartTime", new
> : > Date(fetcher.getReplicationStartTime()).toString());
> : >           }
> : > -          long elapsed = getTimeElapsed(snapPuller);
> : > +          long elapsed = fetcher.getReplicationTimeElapsed();
> : >           slave.add("timeElapsed", String.valueOf(elapsed) + "s");
> : >
> : >           if (bytesDownloaded > 0)
> : > @@ -840,13 +870,6 @@ public class ReplicationHandler extends
> : >     return replicateAfter;
> : >   }
> : >
> : > -  private long getTimeElapsed(SnapPuller snapPuller) {
> : > -    long timeElapsed = 0;
> : > -    if (snapPuller.getReplicationStartTime() > 0)
> : > -      timeElapsed =
> TimeUnit.SECONDS.convert(System.currentTimeMillis() -
> : > snapPuller.getReplicationStartTime(), TimeUnit.MILLISECONDS);
> : > -    return timeElapsed;
> : > -  }
> : > -
> : >   Properties loadReplicationProperties() {
> : >     Directory dir = null;
> : >     try {
> : > @@ -856,7 +879,7 @@ public class ReplicationHandler extends
> : >         IndexInput input;
> : >         try {
> : >           input = dir.openInput(
> : > -            SnapPuller.REPLICATION_PROPERTIES, IOContext.DEFAULT);
> : > +            IndexFetcher.REPLICATION_PROPERTIES, IOContext.DEFAULT);
> : >         } catch (FileNotFoundException | NoSuchFileException e) {
> : >           return new Properties();
> : >         }
> : > @@ -887,6 +910,37 @@ public class ReplicationHandler extends
> : > //    }
> : > //  }
> : >
> : > +  private void setupPolling(String intervalStr) {
> : > +    pollIntervalStr = intervalStr;
> : > +    pollInterval = readInterval(pollIntervalStr);
> : > +    if (pollInterval == null || pollInterval <= 0) {
> : > +      LOG.info(" No value set for 'pollInterval'. Timer Task not
> : > started.");
> : > +      return;
> : > +    }
> : > +
> : > +    Runnable task = new Runnable() {
> : > +      @Override
> : > +      public void run() {
> : > +        if (pollDisabled.get()) {
> : > +          LOG.info("Poll disabled");
> : > +          return;
> : > +        }
> : > +        try {
> : > +          LOG.debug("Polling for index modifications");
> : > +          executorStartTime = System.currentTimeMillis();
> : > +          doFetch(null, false);
> : > +        } catch (Exception e) {
> : > +          LOG.error("Exception in fetching index", e);
> : > +        }
> : > +      }
> : > +    };
> : > +    executorService = Executors.newSingleThreadScheduledExecutor(
> : > +        new DefaultSolrThreadFactory("indexFetcher"));
> : > +    long initialDelay = pollInterval - (System.currentTimeMillis() %
> : > pollInterval);
> : > +    executorService.scheduleAtFixedRate(task, initialDelay,
> pollInterval,
> : > TimeUnit.MILLISECONDS);
> : > +    LOG.info("Poll Scheduled at an interval of " + pollInterval +
> "ms");
> : > +  }
> : > +
> : >   @Override
> : >   @SuppressWarnings("unchecked")
> : >   public void inform(SolrCore core) {
> : > @@ -901,7 +955,8 @@ public class ReplicationHandler extends
> : >     NamedList slave = (NamedList) initArgs.get("slave");
> : >     boolean enableSlave = isEnabled( slave );
> : >     if (enableSlave) {
> : > -      tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
> : > +      currentIndexFetcher = pollingIndexFetcher = new
> IndexFetcher(slave,
> : > this, core);
> : > +      setupPolling((String) slave.get(POLL_INTERVAL));
> : >       isSlave = true;
> : >     }
> : >     NamedList master = (NamedList) initArgs.get("master");
> : > @@ -1005,7 +1060,7 @@ public class ReplicationHandler extends
> : >       }
> : >       String reserve = (String) master.get(RESERVE);
> : >       if (reserve != null && !reserve.trim().equals("")) {
> : > -        reserveCommitDuration = SnapPuller.readInterval(reserve);
> : > +        reserveCommitDuration = readInterval(reserve);
> : >       }
> : >       LOG.info("Commits will be reserved for  " +
> reserveCommitDuration);
> : >       isMaster = true;
> : > @@ -1029,11 +1084,20 @@ public class ReplicationHandler extends
> : >     core.addCloseHook(new CloseHook() {
> : >       @Override
> : >       public void preClose(SolrCore core) {
> : > -        if (snapPuller != null) {
> : > -          snapPuller.destroy();
> : > +        try {
> : > +          if (executorService != null) executorService.shutdown();
> : > +        } finally {
> : > +          try {
> : > +            if (pollingIndexFetcher != null) {
> : > +              pollingIndexFetcher.destroy();
> : > +            }
> : > +          } finally {
> : > +            if (executorService != null) ExecutorUtil
> : > +                .shutdownNowAndAwaitTermination(executorService);
> : > +          }
> : >         }
> : > -        if (tempSnapPuller != null && tempSnapPuller != snapPuller) {
> : > -          tempSnapPuller.destroy();
> : > +        if (currentIndexFetcher != null && currentIndexFetcher !=
> : > pollingIndexFetcher) {
> : > +          currentIndexFetcher.destroy();
> : >         }
> : >       }
> : >
> : > @@ -1307,8 +1371,40 @@ public class ReplicationHandler extends
> : >         releaseCommitPointAndExtendReserve();
> : >       }
> : >     }
> : > -  }
> : > -
> : > +  }
> : > +
> : > +  static Integer readInterval(String interval) {
> : > +    if (interval == null)
> : > +      return null;
> : > +    int result = 0;
> : > +    if (interval != null) {
> : > +      Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
> : > +      if (m.find()) {
> : > +        String hr = m.group(1);
> : > +        String min = m.group(2);
> : > +        String sec = m.group(3);
> : > +        result = 0;
> : > +        try {
> : > +          if (sec != null && sec.length() > 0)
> : > +            result += Integer.parseInt(sec);
> : > +          if (min != null && min.length() > 0)
> : > +            result += (60 * Integer.parseInt(min));
> : > +          if (hr != null && hr.length() > 0)
> : > +            result += (60 * 60 * Integer.parseInt(hr));
> : > +          result *= 1000;
> : > +        } catch (NumberFormatException e) {
> : > +          throw new
> SolrException(SolrException.ErrorCode.SERVER_ERROR,
> : > +              INTERVAL_ERR_MSG);
> : > +        }
> : > +      } else {
> : > +        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> : > +            INTERVAL_ERR_MSG);
> : > +      }
> : > +
> : > +    }
> : > +    return result;
> : > +  }
> : > +
> : >   public static final String MASTER_URL = "masterUrl";
> : >
> : >   public static final String STATUS = "status";
> : > @@ -1369,6 +1465,12 @@ public class ReplicationHandler extends
> : >
> : >   public static final String FILE_STREAM = "filestream";
> : >
> : > +  public static final String POLL_INTERVAL = "pollInterval";
> : > +
> : > +  public static final String INTERVAL_ERR_MSG = "The " +
> POLL_INTERVAL +
> : > " must be in this format 'HH:mm:ss'";
> : > +
> : > +  private static final Pattern INTERVAL_PATTERN =
> Pattern.compile("(\\d*?
> : > ):(\\d*?):(\\d*)");
> : > +
> : >   public static final int PACKET_SZ = 1024 * 1024; // 1MB
> : >
> : >   public static final String RESERVE = "commitReserveDuration";
> : >
> : > Modified:
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapShooter.java?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
> : > (original)
> : > +++
> : >
> lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -144,7 +144,7 @@ public class SnapShooter {
> : >       details.add("snapshotName", snapshotName);
> : >       LOG.info("Done creating backup snapshot: " + (snapshotName ==
> null ?
> : > "<not named>" : snapshotName));
> : >     } catch (Exception e) {
> : > -      SnapPuller.delTree(snapShotDir);
> : > +      IndexFetcher.delTree(snapShotDir);
> : >       LOG.error("Exception while creating snapshot", e);
> : >       details.add("snapShootException", e.getMessage());
> : >     } finally {
> : > @@ -170,7 +170,7 @@ public class SnapShooter {
> : >     int i=1;
> : >     for (OldBackupDirectory dir : dirs) {
> : >       if (i++ > numberToKeep) {
> : > -        SnapPuller.delTree(dir.dir);
> : > +        IndexFetcher.delTree(dir.dir);
> : >       }
> : >     }
> : >   }
> : > @@ -181,7 +181,7 @@ public class SnapShooter {
> : >     NamedList<Object> details = new NamedList<>();
> : >     boolean isSuccess;
> : >     File f = new File(snapDir, "snapshot." + snapshotName);
> : > -    isSuccess = SnapPuller.delTree(f);
> : > +    isSuccess = IndexFetcher.delTree(f);
> : >
> : >     if(isSuccess) {
> : >       details.add("status", "success");
> : >
> : > Modified: lucene/dev/trunk/solr/core/src/test-files/log4j.properties
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/log4j.properties?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > --- lucene/dev/trunk/solr/core/src/test-files/log4j.properties
> (original)
> : > +++ lucene/dev/trunk/solr/core/src/test-files/log4j.properties Wed
> Mar  4
> : > 19:45:09 2015
> : > @@ -25,7 +25,7 @@ log4j.logger.org.apache.solr.hadoop=INFO
> : > #log4j.logger.org.apache.solr.cloud.ChaosMonkey=DEBUG
> : > #log4j.logger.org.apache.solr.update.TransactionLog=DEBUG
> : > #log4j.logger.org.apache.solr.handler.ReplicationHandler=DEBUG
> : > -#log4j.logger.org.apache.solr.handler.SnapPuller=DEBUG
> : > +#log4j.logger.org.apache.solr.handler.IndexFetcher=DEBUG
> : >
> : > #log4j.logger.org.apache.solr.common.cloud.ClusterStateUtil=DEBUG
> : >
> #log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG
> : > \ No newline at end of file
> : >
> : > Modified:
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java
> : > (original)
> : > +++
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -36,7 +36,7 @@ import org.apache.lucene.store.Directory
> : > import org.apache.lucene.util.IOUtils;
> : > import org.apache.solr.common.SolrException;
> : > import org.apache.solr.common.params.CommonParams;
> : > -import org.apache.solr.handler.SnapPuller;
> : > +import org.apache.solr.handler.IndexFetcher;
> : > import org.apache.solr.util.AbstractSolrTestCase;
> : > import org.apache.solr.util.TestHarness;
> : > import org.junit.AfterClass;
> : > @@ -93,7 +93,7 @@ public class TestArbitraryIndexDir exten
> : >     assertU(adoc("id", String.valueOf(1),
> : >         "name", "name"+String.valueOf(1)));
> : >     //create a new index dir and index.properties file
> : > -    File idxprops = new File(h.getCore().getDataDir() +
> : > SnapPuller.INDEX_PROPERTIES);
> : > +    File idxprops = new File(h.getCore().getDataDir() +
> : > IndexFetcher.INDEX_PROPERTIES);
> : >     Properties p = new Properties();
> : >     File newDir = new File(h.getCore().getDataDir() + "index_temp");
> : >     newDir.mkdirs();
> : > @@ -104,7 +104,7 @@ public class TestArbitraryIndexDir exten
> : >       p.store(os, "index properties");
> : >     } catch (Exception e) {
> : >       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> : > -          "Unable to write " + SnapPuller.INDEX_PROPERTIES, e);
> : > +          "Unable to write " + IndexFetcher.INDEX_PROPERTIES, e);
> : >     } finally {
> : >       IOUtils.closeWhileHandlingException(os);
> : >     }
> : >
> : > Modified:
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
> : > (original)
> : > +++
> : >
> lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -172,17 +172,13 @@ public class TestReplicationHandler exte
> : >   }
> : >
> : >   NamedList query(String query, SolrClient s) throws
> SolrServerException,
> : > IOException {
> : > -    NamedList res = new SimpleOrderedMap();
> : >     ModifiableSolrParams params = new ModifiableSolrParams();
> : >
> : >     params.add("q", query);
> : >     params.add("sort","id desc");
> : >
> : >     QueryResponse qres = s.query(params);
> : > -
> : > -    res = qres.getResponse();
> : > -
> : > -    return res;
> : > +    return qres.getResponse();
> : >   }
> : >
> : >   /** will sleep up to 30 seconds, looking for expectedDocCount */
> : > @@ -304,7 +300,7 @@ public class TestReplicationHandler exte
> : >       assertNotNull("slave has slave section",
> : >                     details.get("slave"));
> : >       // SOLR-2677: assert not false negatives
> : > -      Object timesFailed =
> : > ((NamedList)details.get("slave")).get(SnapPuller.TIMES_FAILED);
> : > +      Object timesFailed =
> : > ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED);
> : >       assertEquals("slave has fetch error count",
> : >                    null, timesFailed);
> : >
> : > @@ -513,7 +509,7 @@ public class TestReplicationHandler exte
> : >     slaveClient.close();
> : >     slaveClient = createNewSolrClient(slaveJetty.getLocalPort());
> : >
> : > -    //add a doc with new field and commit on master to trigger
> snappull
> : > from slave.
> : > +    //add a doc with new field and commit on master to trigger index
> : > fetch from slave.
> : >     index(masterClient, "id", "2000", "name", "name = " + 2000,
> "newname",
> : > "newname = " + 2000);
> : >     masterClient.commit();
> : >
> : > @@ -581,7 +577,7 @@ public class TestReplicationHandler exte
> : >   }
> : >
> : >   @Test
> : > -  public void doTestSnapPullWithMasterUrl() throws Exception {
> : > +  public void doTestIndexFetchWithMasterUrl() throws Exception {
> : >     //change solrconfig on slave
> : >     //this has no entry for pollinginterval
> : >     slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml",
> : > "solrconfig.xml");
> : > @@ -608,7 +604,7 @@ public class TestReplicationHandler exte
> : >     SolrDocumentList masterQueryResult = (SolrDocumentList)
> : > masterQueryRsp.get("response");
> : >     assertEquals(nDocs, masterQueryResult.getNumFound());
> : >
> : > -    // snappull
> : > +    // index fetch
> : >     String masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/" +
> : > DEFAULT_TEST_CORENAME + "/replication?command=fetchindex&masterUrl=";
> : >     masterUrl += buildUrl(masterJetty.getLocalPort()) + "/" +
> : > DEFAULT_TEST_CORENAME + "/replication";
> : >     URL url = new URL(masterUrl);
> : > @@ -623,7 +619,7 @@ public class TestReplicationHandler exte
> : >     String cmp =
> BaseDistributedSearchTestCase.compare(masterQueryResult,
> : > slaveQueryResult, 0, null);
> : >     assertEquals(null, cmp);
> : >
> : > -    // snappull from the slave to the master
> : > +    // index fetch from the slave to the master
> : >
> : >     for (int i = nDocs; i < nDocs + 3; i++)
> : >       index(slaveClient, "id", i, "name", "name = " + i);
> : > @@ -765,7 +761,7 @@ public class TestReplicationHandler exte
> : >             .get("response");
> : >         assertEquals(totalDocs, masterQueryResult.getNumFound());
> : >
> : > -        // snappull
> : > +        // index fetch
> : >         Date slaveCoreStart = watchCoreStartAt(slaveClient, 30*1000,
> null);
> : >         pullFromMasterToSlave();
> : >         if (confCoreReload) {
> : > @@ -1219,7 +1215,7 @@ public class TestReplicationHandler exte
> : >     // record collection1's start time on slave
> : >     final Date slaveStartTime = watchCoreStartAt(slaveClient, 30*1000,
> : > null);
> : >
> : > -    //add a doc with new field and commit on master to trigger
> snappull
> : > from slave.
> : > +    //add a doc with new field and commit on master to trigger index
> : > fetch from slave.
> : >     index(masterClient, "id", "2000", "name", "name = " + 2000,
> "newname",
> : > "n2000");
> : >     masterClient.commit();
> : >     rQuery(1, "newname:n2000", masterClient);  // sanity check
> : >
> : > Modified:
> : >
> lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java
> : > URL:
> : >
> http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java?rev=1664126&r1=1664125&r2=1664126&view=diff
> : >
> : >
> ==============================================================================
> : > ---
> : >
> lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java
> : > (original)
> : > +++
> : >
> lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java
> : > Wed Mar  4 19:45:09 2015
> : > @@ -69,7 +69,7 @@ public class MockDirectoryFactory extend
> : >       // already been created.
> : >       mockDirWrapper.setPreventDoubleWrite(false);
> : >
> : > -      // snappuller & co don't seem ready for this:
> : > +      // IndexFetcher & co don't seem ready for this:
> : >       mockDirWrapper.setEnableVirusScanner(false);
> : >
> : >       if (allowReadingFilesStillOpenForWrite) {
> : >
> : >
> : >
> : >
> :
> :
> : --
> : Not sent from my iPhone or my Blackberry or anyone else's
> :
>
> -Hoss
> http://www.lucidworks.com/
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
> For additional commands, e-mail: dev-h...@lucene.apache.org
>
>


-- 
Not sent from my iPhone or my Blackberry or anyone else's

Reply via email to