This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 78dc351005 pre load ondemand tablets for linear scans (#3429)
78dc351005 is described below

commit 78dc3510057bbe8b90f4829b2df8b51058b301f7
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Jul 5 16:35:59 2023 -0400

    pre load ondemand tablets for linear scans (#3429)
---
 .../core/clientImpl/ClientTabletCache.java         | 36 +++++++-
 .../core/clientImpl/ClientTabletCacheImpl.java     | 71 ++++++++++++++--
 .../core/clientImpl/RootClientTabletCache.java     |  2 +-
 .../core/clientImpl/SyncingClientTabletCache.java  |  8 +-
 .../accumulo/core/clientImpl/ThriftScanner.java    | 97 +++++++++++++++++++++-
 .../core/clientImpl/TimeoutClientTabletCache.java  |  8 +-
 .../apache/accumulo/test/functional/SplitIT.java   |  6 --
 7 files changed, 204 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
index fb8c4f7a8c..594e2f85bd 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
@@ -75,6 +75,36 @@ public abstract class ClientTabletCache {
     REQUIRED, NOT_REQUIRED
   }
 
+  /**
+   * This method allows linear scans to host tablet ahead of time that they 
may read in the future.
+   * The goal of this method is to allow tablets to request hosting of tablet 
for a scan before the
+   * scan actually needs it. Below is an example of how this method could work 
with a scan when
+   * {@code minimumHostAhead=4} is passed and avoid the scan having to wait on 
tablet hosting.
+   *
+   * <ol>
+   * <li>4*2 tablets are initially hosted (the scan has to wait on this)</li>
+   * <li>The 1st,2nd,3rd, and 4th tablets are read by the scan</li>
+   * <li>The request to read the 5th tablets causes a request to host 4 more 
tablets (this would be
+   * the 9th,10th,11th, and 12th tablets)</li>
+   * <li>The 5th,6th,7th, and 8th tablet are read by the scan</li>
+   * <li>While the scan does the read above, the 9th,10th,11th, and 12th 
tablets are actually
+   * hosted. This happens concurrently with the scan above.</li>
+   * <li>When the scan goes to read the 9th tablet, hopefully its already 
hosted. Also attempting to
+   * read the 9th tablet will cause a request to host the 13th,14th,15th, and 
16th tablets.</li>
+   * </ol>
+   *
+   * In the situation above, the goal is that while we are reading 4 hosted 
tablets the 4 following
+   * tablets are in the process of being hosted.
+   *
+   * @param minimumHostAhead Attempts to keep between minimumHostAhead and 
2*minimumHostAhead
+   *        tablets following the found tablet hosted.
+   * @param hostAheadRange Only host following tablets that are within this 
range.
+   */
+  public abstract CachedTablet findTablet(ClientContext context, Text row, 
boolean skipRow,
+      LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException;
+
   /**
    * Finds the tablet that contains the given row.
    *
@@ -85,9 +115,11 @@ public abstract class ClientTabletCache {
    * @return overlapping tablet. If no overlapping tablet exists, returns 
null. If location is
    *         required and the tablet currently has no location ,returns null.
    */
-  public abstract CachedTablet findTablet(ClientContext context, Text row, 
boolean skipRow,
+  public CachedTablet findTablet(ClientContext context, Text row, boolean 
skipRow,
       LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
-      TableNotFoundException, InvalidTabletHostingRequestException;
+      TableNotFoundException, InvalidTabletHostingRequestException {
+    return findTablet(context, row, skipRow, locationNeed, 0, null);
+  }
 
   public CachedTablet findTabletWithRetry(ClientContext context, Text row, 
boolean skipRow,
       LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
index 5e3e8bb440..85271cac69 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
@@ -67,6 +67,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
@@ -535,8 +536,9 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
   @Override
   public CachedTablet findTablet(ClientContext context, Text row, boolean 
skipRow,
-      LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
-      TableNotFoundException, InvalidTabletHostingRequestException {
+      LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException {
 
     OpTimer timer = null;
 
@@ -556,15 +558,74 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
           String.format("%.3f secs", timer.scale(SECONDS)));
     }
 
-    if (tl != null && locationNeed == LocationNeed.REQUIRED && 
tl.getTserverLocation().isEmpty()) {
-      requestTabletHosting(context, List.of(tl));
-      return null;
+    if (tl != null && locationNeed == LocationNeed.REQUIRED) {
+      // Look at the next (minimumHostAhead * 2) tablets and return which ones 
need hosting. See the
+      // javadoc in the superclass of this method for more details.
+      Map<KeyExtent,CachedTablet> extentsToHost = findExtentsToHost(context, 
minimumHostAhead * 2,
+          hostAheadRange, lcSession, tl, locationNeed);
+
+      if (!extentsToHost.isEmpty()) {
+        if (extentsToHost.containsKey(tl.getExtent()) || extentsToHost.size() 
>= minimumHostAhead) {
+          requestTabletHosting(context, extentsToHost.values());
+        }
+      }
+
+      if (tl.getTserverLocation().isEmpty()) {
+        return null;
+      }
     }
 
     return tl;
 
   }
 
+  private Map<KeyExtent,CachedTablet> findExtentsToHost(ClientContext context, 
int hostAheadCount,
+      Range hostAheadRange, LockCheckerSession lcSession, CachedTablet 
firstTablet,
+      LocationNeed locationNeed) throws AccumuloException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException, AccumuloSecurityException {
+
+    // its only expected that this method is called when location need is 
required
+    Preconditions.checkArgument(locationNeed == LocationNeed.REQUIRED);
+
+    Map<KeyExtent,CachedTablet> extentsToHost;
+
+    if (hostAheadCount > 0) {
+      extentsToHost = new HashMap<>();
+      if (firstTablet.getTserverLocation().isEmpty()) {
+        extentsToHost.put(firstTablet.getExtent(), firstTablet);
+      }
+
+      KeyExtent extent = firstTablet.getExtent();
+
+      var currTablet = extent;
+
+      for (int i = 0; i < hostAheadCount; i++) {
+        if (currTablet.endRow() == null || !hostAheadRange.contains(new 
Key(currTablet.endRow()))) {
+          break;
+        }
+
+        CachedTablet followingTablet =
+            _findTablet(context, currTablet.endRow(), true, false, true, 
lcSession, locationNeed);
+
+        if (followingTablet == null) {
+          break;
+        }
+
+        currTablet = followingTablet.getExtent();
+
+        if (followingTablet.getTserverLocation().isEmpty()
+            && !followingTablet.wasHostingRequested()) {
+          extentsToHost.put(followingTablet.getExtent(), followingTablet);
+        }
+      }
+    } else if (firstTablet.getTserverLocation().isEmpty()) {
+      extentsToHost = Map.of(firstTablet.getExtent(), firstTablet);
+    } else {
+      extentsToHost = Map.of();
+    }
+    return extentsToHost;
+  }
+
   @Override
   public long getTabletHostingRequestCount() {
     return tabletHostingRequestCount.get();
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
index 7e35fb259e..f86d90b6f8 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
@@ -140,7 +140,7 @@ public class RootClientTabletCache extends 
ClientTabletCache {
 
   @Override
   public CachedTablet findTablet(ClientContext context, Text row, boolean 
skipRow,
-      LocationNeed locationNeed) {
+      LocationNeed locationNeed, int hostAheadCount, Range hostAheadRange) {
 
     CachedTablet cachedTablet = getRootTabletLocation(context);
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
index d8be1c639a..cd6e707bf7 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
@@ -67,9 +67,11 @@ public class SyncingClientTabletCache extends 
ClientTabletCache {
 
   @Override
   public CachedTablet findTablet(ClientContext context, Text row, boolean 
skipRow,
-      LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
-      TableNotFoundException, InvalidTabletHostingRequestException {
-    return syncLocator().findTablet(context, row, skipRow, locationNeed);
+      LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException {
+    return syncLocator().findTablet(context, row, skipRow, locationNeed, 
minimumHostAhead,
+        hostAheadRange);
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index f7b2aeb9f2..46d5be5e89 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -253,6 +253,10 @@ public class ThriftScanner {
 
     Duration busyTimeout;
 
+    int tabletsScanned;
+
+    KeyExtent prevExtent = null;
+
     public ScanState(ClientContext context, TableId tableId, Authorizations 
authorizations,
         Range range, SortedSet<Column> fetchedColumns, int size,
         List<IterInfo> serverSideIteratorList,
@@ -302,6 +306,18 @@ public class ThriftScanner {
       if (useScanServer) {
         scanAttempts = new ScanServerAttemptsImpl();
       }
+
+      this.tabletsScanned = 0;
+    }
+
+    long startTimeNanos = 0;
+    long getNextScanAddressTimeNanos = 0;
+
+    public void incrementTabletsScanned(KeyExtent extent) {
+      if (!extent.equals(prevExtent)) {
+        tabletsScanned++;
+        prevExtent = extent;
+      }
     }
   }
 
@@ -456,6 +472,63 @@ public class ThriftScanner {
     return Optional.of(addr);
   }
 
+  /**
+   * @see ClientTabletCache#findTablet(ClientContext, Text, boolean, 
ClientTabletCache.LocationNeed,
+   *      int, Range)
+   */
+  private static int computeMinimumHostAhead(ScanState scanState,
+      ClientTabletCache.LocationNeed hostingNeed) {
+    int minimumHostAhead = 0;
+
+    if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) {
+      long currTime = System.nanoTime();
+
+      double timeRatio = 0;
+
+      long totalTime = currTime - scanState.startTimeNanos;
+      if (totalTime > 0) {
+        // The following computes (total time spent in this method)/(total 
time scanning and time
+        // spent in this method)
+        timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double) 
(totalTime);
+      }
+
+      if (timeRatio < 0 || timeRatio > 1) {
+        log.warn("Computed ratio of time spent in getNextScanAddress has 
unexpected value : {} ",
+            timeRatio);
+        timeRatio = Math.max(0.0, Math.min(1.0, timeRatio));
+      }
+
+      //
+      // Do not want to host all tablets in the scan range in case not all 
data in the range is
+      // read. Need to determine how many tablets to host ahead of time. The 
following information
+      // is used to determine how many tablets to host ahead of time.
+      //
+      // 1. The number of tablets this scan has already read. The more tablets 
that this scan has
+      // read, the more likely that it will read many more tablets.
+      //
+      // 2. The timeRatio computed above. As timeRatio approaches 1.0 it means 
we are spending
+      // most of our time waiting on the next tablet to have an address. When 
are spending most of
+      // our time waiting for a tablet to have an address we want to increase 
the number of tablets
+      // we request to host ahead of time so we can hopefully spend less time 
waiting on that.
+      //
+
+      if (timeRatio > .9) {
+        minimumHostAhead = 16;
+      } else if (timeRatio > .75) {
+        minimumHostAhead = 8;
+      } else if (timeRatio > .5) {
+        minimumHostAhead = 4;
+      } else if (timeRatio > .25) {
+        minimumHostAhead = 2;
+      } else {
+        minimumHostAhead = 1;
+      }
+
+      minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead);
+    }
+    return minimumHostAhead;
+  }
+
   static ScanAddress getNextScanAddress(ClientContext context, ScanState 
scanState, long timeOut,
       long startTime, long maxSleepTime)
       throws TableNotFoundException, AccumuloSecurityException, 
AccumuloServerException,
@@ -470,6 +543,8 @@ public class ThriftScanner {
     var hostingNeed = scanState.runOnScanServer ? 
ClientTabletCache.LocationNeed.NOT_REQUIRED
         : ClientTabletCache.LocationNeed.REQUIRED;
 
+    int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed);
+
     while (addr == null) {
       long currentTime = System.currentTimeMillis();
       if ((currentTime - startTime) / 1000.0 > timeOut) {
@@ -482,7 +557,8 @@ public class ThriftScanner {
       try (Scope locateSpan = child1.makeCurrent()) {
 
         loc = ClientTabletCache.getInstance(context, 
scanState.tableId).findTablet(context,
-            scanState.startRow, scanState.skipStartRow, hostingNeed);
+            scanState.startRow, scanState.skipStartRow, hostingNeed, 
minimumHostAhead,
+            scanState.range);
 
         if (loc == null) {
           context.requireNotDeleted(scanState.tableId);
@@ -498,6 +574,8 @@ public class ThriftScanner {
           lastError = error;
           sleepMillis = pause(sleepMillis, maxSleepTime, 
scanState.runOnScanServer);
         } else {
+          scanState.incrementTabletsScanned(loc.getExtent());
+
           // when a tablet splits we do want to continue scanning the low child
           // of the split if we are already passed it
           Range dataRange = loc.getExtent().toDataRange();
@@ -578,7 +656,20 @@ public class ThriftScanner {
               "Failed to retrieve next batch of key values before timeout");
         }
 
-        ScanAddress addr = getNextScanAddress(context, scanState, timeOut, 
startTime, maxSleepTime);
+        ScanAddress addr;
+        long beginTime = System.nanoTime();
+        try {
+          addr = getNextScanAddress(context, scanState, timeOut, startTime, 
maxSleepTime);
+        } finally {
+          // track the initial time that we started tracking the time for 
getting the next scan
+          // address
+          if (scanState.startTimeNanos == 0) {
+            scanState.startTimeNanos = beginTime;
+          }
+
+          // track the total amount of time spent getting the next scan address
+          scanState.getNextScanAddressTimeNanos += System.nanoTime() - 
beginTime;
+        }
 
         Span child2 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::location",
             Map.of("tserver", addr.serverAddress));
@@ -838,8 +929,6 @@ public class ThriftScanner {
               sr.results.size(), scanState.scanID);
         }
       } else {
-        // log.debug("No more : tab end row = 
"+loc.tablet_extent.getEndRow()+" range =
-        // "+scanState.range);
         if (addr.getExtent().endRow() == null) {
           scanState.finished = true;
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
index 1c8e3c3a07..b9610da386 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
@@ -62,10 +62,12 @@ public class TimeoutClientTabletCache extends 
SyncingClientTabletCache {
 
   @Override
   public CachedTablet findTablet(ClientContext context, Text row, boolean 
skipRow,
-      LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
-      TableNotFoundException, InvalidTabletHostingRequestException {
+      LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
+      InvalidTabletHostingRequestException {
     try {
-      CachedTablet ret = super.findTablet(context, row, skipRow, locationNeed);
+      CachedTablet ret =
+          super.findTablet(context, row, skipRow, locationNeed, 
minimumHostAhead, hostAheadRange);
 
       if (ret == null) {
         failed();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 1530c9ad34..852d2c2ec3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -50,13 +50,11 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -315,10 +313,6 @@ public class SplitIT extends AccumuloClusterHarness {
       assertEquals(totalSplits, new 
HashSet<>(c.tableOperations().listSplits(tableName)),
           "Did not see expected splits");
 
-      // ELASTICITY_TODO the following could be removed after #3309. Currently 
scanning an ondemand
-      // table with lots of tablets will cause the test to timeout.
-      c.tableOperations().setTabletHostingGoal(tableName, new Range(), 
TabletHostingGoal.ALWAYS);
-
       log.debug("Verifying {} rows ingested into {}", numRows, tableName);
       VerifyIngest.verifyIngest(c, params);
     }

Reply via email to