eribeiro commented on a change in pull request #864: SOLR-13101 : Shared 
storage support in SolrCloud
URL: https://github.com/apache/lucene-solr/pull/864#discussion_r323982385
 
 

 ##########
 File path: 
solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
 ##########
 @@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.blob.process;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.metadata.CorePushPull;
+import org.apache.solr.store.blob.metadata.ServerSideMetadata;
+import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil;
+import 
org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
+import org.apache.solr.store.blob.process.CorePullerFeeder.PullCoreInfo;
+import org.apache.solr.store.blob.provider.BlobStorageProvider;
+import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.blob.util.DeduplicatingList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Code for pulling updates on a specific core to the Blob store. see 
{@CorePushTask} for the push version of this.
+ */
+public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Minimum delay between to pull retries for a given core. Setting this 
higher than the push retry to reduce noise
+   * we get from a flood of queries for a stale core
+   * 
+   * TODO: make configurable
+   */
+  private static final long MIN_RETRY_DELAY_MS = 20000;
+
+  /** Cores currently being pulled and timestamp of pull start (to identify 
stuck ones in logs) */
+  private static final HashMap<String, Long> pullsInFlight = Maps.newHashMap();
+
+  /** Cores unknown locally that got created as part of the pull process but 
for which no data has been pulled yet
+   * from Blob store. If we ignore this transitory state, these cores can be 
accessed locally and simply look empty.
+   * We'd rather treat threads attempting to access such cores like threads 
attempting to access an unknown core and
+   * do a pull (or more likely wait for an ongoing pull to finish).<p>
+   *
+   * When this lock has to be taken as well as {@link #pullsInFlight}, then 
{@link #pullsInFlight} has to be taken first.
+   * Reading this set implies acquiring the monitor of the set (as if 
@GuardedBy("itself")), but writing to the set
+   * additionally implies holding the {@link #pullsInFlight}. This guarantees 
that while {@link #pullsInFlight}
+   * is held, no element in the set is changing.
+   */
+  private static final Set<String> coresCreatedNotPulledYet = 
Sets.newHashSet();
+
+  private final CoreContainer coreContainer;
+  private final PullCoreInfo pullCoreInfo;
+  private final long queuedTimeMs;
+  private int attempts;
+  private long lastAttemptTimestamp;
+  private final PullCoreCallback callback;
+
+  CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, 
PullCoreCallback callback) {
+    this(coreContainer, pullCoreInfo, System.currentTimeMillis(), 0, 0L, 
callback);
+  }
+
+  private CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, 
long queuedTimeMs, int attempts,
+      long lastAttemptTimestamp, PullCoreCallback callback) {
+    this.coreContainer = coreContainer;
+    this.pullCoreInfo = pullCoreInfo;
+    this.queuedTimeMs = queuedTimeMs;
+    this.attempts = attempts;
+    this.lastAttemptTimestamp = lastAttemptTimestamp;
+    this.callback = callback;
+  }
+
+  /**
+   * Returns a _hint_ that the given core might be locally empty because it is 
awaiting pull from Blob store.
+   * This is just a hint because as soon as the lock is released when the 
method returns, the status of the core could change.
+   */
+  public static boolean isEmptyCoreAwaitingPull(String corename) {
+    synchronized (coresCreatedNotPulledYet) {
+      return coresCreatedNotPulledYet.contains(corename);
+    }
+  }
+
+  /**
+   * Needed for the {@link CorePullTask} to be used in a {@link 
DeduplicatingList}.
+   */
+  @Override
+  public String getDedupeKey() {
+    return this.pullCoreInfo.getSharedStoreName();
+  }
+
+  /**
+   * Needed for the {@link CorePullTask} to be used in a {@link 
DeduplicatingList}.
+   */
+  static class PullTaskMerger implements DeduplicatingList.Merger<String, 
CorePullTask> {
+    /**
+     * Given two tasks (that have not yet started executing!) that target the 
same core (and would basically do the
+     * same things were they both executed), returns a merged task that can 
replace both and that retains the oldest
+     * enqueue time and the smallest number of attempts, so we don't "lose" 
retries because of the merge yet we
+     * correctly report that tasks might have been waiting for execution for a 
long while.
+     * 
+     * @return a merged {@link CorePullTask} that can replace the two tasks 
passed as parameters.
+     */
+    @Override
+    public CorePullTask merge(CorePullTask task1, CorePullTask task2) {
+      // The asserts below are not guaranteed by construction but we know 
that's the case
+      assert task1.coreContainer == task2.coreContainer;
+      assert task1.callback == task2.callback;
+
+      int mergedAttempts;
+      long mergedLatAttemptsTimestamp;
+
+      // Synchronizing on the tasks separately to not risk deadlock (even 
though in practice there's only one
+      // concurrent
+      // call to this method anyway since it's called from 
DeduplicatingList.addDeduplicated() and we syncrhonize
+      // on the
+      // list there).
+      synchronized (task1) {
+        mergedAttempts = task1.attempts;
+        mergedLatAttemptsTimestamp = task1.lastAttemptTimestamp;
+      }
+
+      synchronized (task2) {
+        // We allow more opportunities to try as the core is changed again by 
Solr...
+        mergedAttempts = Math.min(mergedAttempts, task2.attempts);
+        // ...and base the delay computation on the time of last attempt
+        mergedLatAttemptsTimestamp = Math.max(mergedLatAttemptsTimestamp, 
task2.lastAttemptTimestamp);
+      }
+
+      PullCoreInfo mergedPullCoreInfo = 
CorePullerFeeder.PullCoreInfoMerger.mergePullCoreInfos(task1.pullCoreInfo, 
task2.pullCoreInfo);
+      
+      // Try to set the callback for the deduplicated task to TASK_MERGED
+      try {
+        task2.callback.finishedPull(task2, null, CoreSyncStatus.TASK_MERGED, 
"CorePullTask merged with duplicate task in queue.");
+      } catch (Exception e) {
+        // Do nothing - tried to set callback for deduplicated task; if an 
error is thrown, ignore
+      }
+      
+      // We merge the tasks.
+      return new CorePullTask(task1.coreContainer, mergedPullCoreInfo,
+          Math.min(task1.queuedTimeMs, task2.queuedTimeMs), mergedAttempts, 
mergedLatAttemptsTimestamp,
+          task1.callback);
+    }
+  }
+
+  public synchronized void setAttempts(int attempts) {
+    this.attempts = attempts;
+  }
+
+  public synchronized int getAttempts() {
+    return this.attempts;
+  }
+
+  synchronized void setLastAttemptTimestamp(long lastAttemptTimestamp) {
+    this.lastAttemptTimestamp = lastAttemptTimestamp;
+  }
+
+  /**
+   * This method is only used in this class for now because the "reenqueue 
with delay" implementation is imperfect.
+   * Longer term, such a reenqueue should be handled outside this class.
+   */
+  synchronized long getLastAttemptTimestamp() {
+    return this.lastAttemptTimestamp;
+  }
+
+  public PullCoreInfo getPullCoreInfo() {
+    return pullCoreInfo;
+  }
+
+  /**
+   * Pulls the local core updates from the Blob store then calls the task 
callback to notify the
+   * {@link CorePullerFeeder} of success or failure of the operation, give an 
indication of the reason the periodic
+   * puller can decide to retry or not.
+   */
+  void pullCoreFromBlob() throws InterruptedException {
+    BlobCoreMetadata blobMetadata = null;
+    if (coreContainer.isShutDown()) {
+      this.callback.finishedPull(this, blobMetadata, 
CoreSyncStatus.SHUTTING_DOWN, null);
+      // TODO could throw InterruptedException here or interrupt ourselves if 
we wanted to signal to
+      // CorePullerThread to stop everything.
+      return;
+    }
+
+    synchronized (pullsInFlight) {
+      Long pullInFlightTimestamp = 
pullsInFlight.get(pullCoreInfo.getSharedStoreName());
+      if (pullInFlightTimestamp != null) {
+        // Another pull is in progress, we'll retry later.
+        // Note we can't just cancel this pull, because the other pull might 
be working on a previous commit
+        // point.
+        long prevPullMs = System.currentTimeMillis() - pullInFlightTimestamp;
+        this.callback.finishedPull(this, blobMetadata, 
CoreSyncStatus.CONCURRENT_SYNC,
+            "Skipping core pull for " + pullCoreInfo.getSharedStoreName()
+            + " because another thread is currently pulling it (started " + 
prevPullMs
+            + " ms ago).");
+        return;
+      } else {
+        pullsInFlight.put(pullCoreInfo.getSharedStoreName(), 
System.currentTimeMillis());
+      }
+    }
+
+    // Copying the non final variables so we're clean wrt the Java memory 
model and values do not change as we go
+    // (even though we know that no other thread can be working on this 
CorePullTask when we handle it here).
+    final int attemptsCopy = getAttempts();
+    final long lastAttemptTimestampCopy = getLastAttemptTimestamp();
+
+    if (attemptsCopy != 0) {
+      long now = System.currentTimeMillis();
+      if (now - lastAttemptTimestampCopy < MIN_RETRY_DELAY_MS) {
+        Thread.sleep(MIN_RETRY_DELAY_MS - now + lastAttemptTimestampCopy);
+      }
+    }
+
+    CoreSyncStatus syncStatus = CoreSyncStatus.FAILURE;
+    // Auxiliary information related to pull outcome. It can be metadata 
resolver message which can be null or exception detail in case of failure 
+    String message = null;
+    try {
+      // Do the sequence of actions required to pull a core from the Blob 
store.
+      BlobStorageProvider blobProvider = 
coreContainer.getSharedStoreManager().getBlobStorageProvider(); 
+      CoreStorageClient blobClient = blobProvider.getClient();
+
+      // Get blob metadata
+      String blobCoreMetadataName = 
BlobStoreUtils.buildBlobStoreMetadataName(pullCoreInfo.getLastReadMetadataSuffix());
+      blobMetadata = 
blobClient.pullCoreMetadata(pullCoreInfo.getSharedStoreName(), 
blobCoreMetadataName);
+      
+      // Handle callback
+      if (blobMetadata == null) {
+        syncStatus = CoreSyncStatus.BLOB_MISSING;
+        this.callback.finishedPull(this, blobMetadata, syncStatus, null);
+        return;
+      } else if (blobMetadata.getIsDeleted()) {
+        syncStatus = CoreSyncStatus.BLOB_DELETED_FOR_PULL;
+        this.callback.finishedPull(this, blobMetadata, syncStatus, "deleted 
flag is set on core in Blob store. Not pulling.");
+        return;
+      } else if (blobMetadata.getIsCorrupt()) {
+        // TODO this might mean we have no local core at this stage. If that's 
the case, we may need to do something about it so that Core App does not 
immediately reindex into a new core...
+        //      likely changes needed here for W-5388477 Blob store corruption 
repair
+        syncStatus = CoreSyncStatus.BLOB_CORRUPT;
+        this.callback.finishedPull(this, blobMetadata, syncStatus, "corrupt 
flag is set on core in Blob store. Not pulling.");
+        return;
+      }
+
+      // TODO unknown core pulls in context of solr cloud
+      if (!coreExists(pullCoreInfo.getCoreName())) {
+        if (pullCoreInfo.shouldCreateCoreIfAbsent()) {
+          // We set the core as created awaiting pull before creating it, 
otherwise it's too late.
+          // If we get to this point, we're setting the "created not pulled 
yet" status of the core here (only place
+          // in the code where this happens) and we're clearing it in the 
finally below.
+          // We're not leaking entries in coresCreatedNotPulledYet that might 
stay there forever...
+          synchronized (pullsInFlight) {
+            synchronized (coresCreatedNotPulledYet) {
+              coresCreatedNotPulledYet.add(pullCoreInfo.getSharedStoreName());
+            }
+          }
+          createCore(pullCoreInfo);
+        } else {
+          syncStatus = CoreSyncStatus.LOCAL_MISSING_FOR_PULL;
+          this.callback.finishedPull(this, blobMetadata, syncStatus, null);
+          return;
+        }
+      }
+
+      // Get local metadata + resolve with blob metadata
+      ServerSideMetadata serverMetadata = new 
ServerSideMetadata(pullCoreInfo.getCoreName(), coreContainer);
+      SharedMetadataResolutionResult resolutionResult = 
SharedStoreResolutionUtil.resolveMetadata(
+          serverMetadata, blobMetadata);
+      
+      // If there is nothing to pull, we should report SUCCESS_EQUIVALENT and 
do nothing.
+      // If we call pullUpdateFromBlob with an empty list of files to pull, 
we'll see an NPE down the line.
+      // TODO: might be better to handle this error in 
CorePushPull.pullUpdateFromBlob
+      if (resolutionResult.getFilesToPull().size() > 0) {
+        BlobDeleteManager deleteManager = 
coreContainer.getSharedStoreManager().getBlobDeleteManager();
+        CorePushPull cp = new CorePushPull(blobClient, deleteManager, 
pullCoreInfo, resolutionResult, serverMetadata, blobMetadata);
+        cp.pullUpdateFromBlob(/* waitForSearcher */ true);
+        syncStatus = CoreSyncStatus.SUCCESS;
+      } else {
+        syncStatus = CoreSyncStatus.SUCCESS_EQUIVALENT;
+      }
+
+      // The following call can fail if blob is corrupt (in non trivial ways, 
trivial ways are identified by other cases)
+      // pull was successful
+      if(isEmptyCoreAwaitingPull(pullCoreInfo.getCoreName())){
+        // the javadoc for pulledBlob suggests that it is only meant to be 
called if we pulled from scratch
+        // therefore only limiting this call when we created the local core 
for this pull ourselves
+        // 
BlobTransientLog.get().getCorruptCoreTracker().pulledBlob(pullCoreInfo.coreName,
 blobMetadata);
+      }
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      syncStatus = CoreSyncStatus.FAILURE;
+      message = Throwables.getStackTraceAsString(e);
+      log.warn("Failed (attempt=" + attemptsCopy + ") to pull core " + 
pullCoreInfo.getSharedStoreName(), e);
+    } finally {
+      // Remove ourselves from the in flight set before calling the callback 
method (just in case it takes
+      // forever)
+      synchronized (pullsInFlight) {
+        // No matter how the pull ends (success or any kind of error), we 
don't want to consider the core as awaiting pull,
+        // since it doesn't anymore (code is inline here rather than in a 
method or in notifyEndOfPull() to make
+        // it clear how coresCreatedNotPulledYet is managed).
+        synchronized (coresCreatedNotPulledYet) {
+          // TODO: Can we move this business of core creation and deletion 
outside of this task so that
+          //       we may not sub-optimally repeatedly create/delete core in 
case of reattempt of a transient pull error?
+          //       or get whether a reattempt will be made or not, and if 
there is a guaranteed reattempt then do not delete it
+          if 
(coresCreatedNotPulledYet.remove(pullCoreInfo.getSharedStoreName())) {
+            if (!syncStatus.isSuccess()) {
+              // If we created the core and we could not pull successfully 
then we should cleanup after ourselves by deleting it
+              // otherwise queries can incorrectly return 0 results from that 
core.
+              if(coreExists(pullCoreInfo.getCoreName())) {
+                try {
+                  // try to delete core within 3 minutes. In future when we 
time box our pull task then we 
+                  // need to make sure this value is within that bound. 
+                  // CoreDeleter.deleteCoreByName(coreContainer, 
pullCoreInfo.coreName, 3, TimeUnit.MINUTES);
+                  // TODO: need to migrate deleter
+                } catch (Exception ex) {
+                  // TODO: should we gack?
+                  //       can we do anything more here since we are unable to 
delete and we are leaving an empty core behind
+                  //       when we should not. Should we keep the core in 
coresCreatedNotPulledYet and try few more times
+                  //       but at some point we would have to let it go
+                  //       So may be, few more attempts here and then gack
+                  log.warn("CorePullTask successfully created local core but 
failed to pull it" +
+                      " and now is unable to delete that local core " + 
pullCoreInfo.getCoreName(), ex);
+                }
+              }
+            }
+          }
+        }
+        pullsInFlight.remove(pullCoreInfo.getSharedStoreName());
+      }
+    }
+    this.callback.finishedPull(this, blobMetadata, syncStatus, message);
+  }
+
+  /**
+   * Returns true if the given core exists.
+   */
+  private boolean coreExists(String coreName) {
+
+    SolrCore core = null;
+    File coreIndexDir = new File(coreContainer.getCoreRootDirectory() + "/" + 
coreName);
+    if (coreIndexDir.exists()) {
+      core = coreContainer.getCore(coreName);
+    }
+
+    log.info("Core " + coreName + " expected in dir " + 
coreIndexDir.getAbsolutePath() + " exists=" + coreIndexDir.exists()
+    + " and location.instanceDirectory.getAbsolutePath()=" + 
coreIndexDir.getAbsolutePath());
+
+    if (core != null) {
+      // Core exists.
+      core.close();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * TODO - code part of unknown core pulls that will likely change in the 
Solr Cloud world!
+   * 
+   * Creates a local (empty) core. This is required before we can fill this 
core with data pulled from Blob.
+   */
+  private void createCore(PullCoreInfo pci) throws Exception {
+
+    log.info("About to create local core " + pci.getCoreName());
+
+    // The location here might be different from the location used in 
coreExists() above. This is ok, if the core
+    // did not exist and we're creating it, it's ok to create on another drive 
(and hopefully the HDD/SSD core placement
+    // code will go away with the move to Blob based Storage of cores).
+    //CoreMetadata.CoreLocation location = 
CoreMetadataProvider.get(coreContainer).getCoreLocation(coreName);
+
+    //create(String coreName, Path instancePath, Map<String, String> 
parameters, boolean newCollection) {
+    Map<String, String> params = new HashMap<>();
+    params.put(CoreDescriptor.CORE_CONFIGSET, "coreset");
+    params.put(CoreDescriptor.CORE_TRANSIENT, "false"); // transient not 
supported in cloud mode
+    params.put(CoreDescriptor.CORE_LOADONSTARTUP, "true"); // recommended true 
in cloud mode
+
+    ZkController controller = coreContainer.getZkController();
+    DocCollection collection = controller.getZkStateReader().
+        getClusterState().getCollection(pci.getCollectionName());
+    Collection<Replica> replicas = collection.getReplicas();
+    Replica replica = null;
 
 Review comment:
   Lines 404-410 could be replaced by:
   ```
   Optional<Replica> replica = replicas.stream().filter( r -> 
r.getCoreName().equals(pci.getCoreName() ).findFirst()
   
   if (replica.isEmpty()) {
      throw new Exception("Replica " + pci.getCoreName() + " for collection " +
             pci.getCollectionName() + " does not exist in ZK");
   }
   ```
   Then lines below need to replace calls like `replica.getName()` by 
`replica.get().getName()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org

Reply via email to