This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 78dc351005 pre load ondemand tablets for linear scans (#3429)
78dc351005 is described below
commit 78dc3510057bbe8b90f4829b2df8b51058b301f7
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jul 5 16:35:59 2023 -0400
pre load ondemand tablets for linear scans (#3429)
---
.../core/clientImpl/ClientTabletCache.java | 36 +++++++-
.../core/clientImpl/ClientTabletCacheImpl.java | 71 ++++++++++++++--
.../core/clientImpl/RootClientTabletCache.java | 2 +-
.../core/clientImpl/SyncingClientTabletCache.java | 8 +-
.../accumulo/core/clientImpl/ThriftScanner.java | 97 +++++++++++++++++++++-
.../core/clientImpl/TimeoutClientTabletCache.java | 8 +-
.../apache/accumulo/test/functional/SplitIT.java | 6 --
7 files changed, 204 insertions(+), 24 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
index fb8c4f7a8c..594e2f85bd 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
@@ -75,6 +75,36 @@ public abstract class ClientTabletCache {
REQUIRED, NOT_REQUIRED
}
+ /**
+ * This method allows linear scans to host tablet ahead of time that they
may read in the future.
+ * The goal of this method is to allow tablets to request hosting of tablet
for a scan before the
+ * scan actually needs it. Below is an example of how this method could work
with a scan when
+ * {@code minimumHostAhead=4} is passed and avoid the scan having to wait on
tablet hosting.
+ *
+ * <ol>
+ * <li>4*2 tablets are initially hosted (the scan has to wait on this)</li>
+ * <li>The 1st,2nd,3rd, and 4th tablets are read by the scan</li>
+ * <li>The request to read the 5th tablets causes a request to host 4 more
tablets (this would be
+ * the 9th,10th,11th, and 12th tablets)</li>
+ * <li>The 5th,6th,7th, and 8th tablet are read by the scan</li>
+ * <li>While the scan does the read above, the 9th,10th,11th, and 12th
tablets are actually
+ * hosted. This happens concurrently with the scan above.</li>
+ * <li>When the scan goes to read the 9th tablet, hopefully its already
hosted. Also attempting to
+ * read the 9th tablet will cause a request to host the 13th,14th,15th, and
16th tablets.</li>
+ * </ol>
+ *
+ * In the situation above, the goal is that while we are reading 4 hosted
tablets the 4 following
+ * tablets are in the process of being hosted.
+ *
+ * @param minimumHostAhead Attempts to keep between minimumHostAhead and
2*minimumHostAhead
+ * tablets following the found tablet hosted.
+ * @param hostAheadRange Only host following tablets that are within this
range.
+ */
+ public abstract CachedTablet findTablet(ClientContext context, Text row,
boolean skipRow,
+ LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException,
+ InvalidTabletHostingRequestException;
+
/**
* Finds the tablet that contains the given row.
*
@@ -85,9 +115,11 @@ public abstract class ClientTabletCache {
* @return overlapping tablet. If no overlapping tablet exists, returns
null. If location is
* required and the tablet currently has no location ,returns null.
*/
- public abstract CachedTablet findTablet(ClientContext context, Text row,
boolean skipRow,
+ public CachedTablet findTablet(ClientContext context, Text row, boolean
skipRow,
LocationNeed locationNeed) throws AccumuloException,
AccumuloSecurityException,
- TableNotFoundException, InvalidTabletHostingRequestException;
+ TableNotFoundException, InvalidTabletHostingRequestException {
+ return findTablet(context, row, skipRow, locationNeed, 0, null);
+ }
public CachedTablet findTabletWithRetry(ClientContext context, Text row,
boolean skipRow,
LocationNeed locationNeed) throws AccumuloException,
AccumuloSecurityException,
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
index 5e3e8bb440..85271cac69 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
@@ -67,6 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -535,8 +536,9 @@ public class ClientTabletCacheImpl extends
ClientTabletCache {
@Override
public CachedTablet findTablet(ClientContext context, Text row, boolean
skipRow,
- LocationNeed locationNeed) throws AccumuloException,
AccumuloSecurityException,
- TableNotFoundException, InvalidTabletHostingRequestException {
+ LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException,
+ InvalidTabletHostingRequestException {
OpTimer timer = null;
@@ -556,15 +558,74 @@ public class ClientTabletCacheImpl extends
ClientTabletCache {
String.format("%.3f secs", timer.scale(SECONDS)));
}
- if (tl != null && locationNeed == LocationNeed.REQUIRED &&
tl.getTserverLocation().isEmpty()) {
- requestTabletHosting(context, List.of(tl));
- return null;
+ if (tl != null && locationNeed == LocationNeed.REQUIRED) {
+ // Look at the next (minimumHostAhead * 2) tablets and return which ones
need hosting. See the
+ // javadoc in the superclass of this method for more details.
+ Map<KeyExtent,CachedTablet> extentsToHost = findExtentsToHost(context,
minimumHostAhead * 2,
+ hostAheadRange, lcSession, tl, locationNeed);
+
+ if (!extentsToHost.isEmpty()) {
+ if (extentsToHost.containsKey(tl.getExtent()) || extentsToHost.size()
>= minimumHostAhead) {
+ requestTabletHosting(context, extentsToHost.values());
+ }
+ }
+
+ if (tl.getTserverLocation().isEmpty()) {
+ return null;
+ }
}
return tl;
}
+ private Map<KeyExtent,CachedTablet> findExtentsToHost(ClientContext context,
int hostAheadCount,
+ Range hostAheadRange, LockCheckerSession lcSession, CachedTablet
firstTablet,
+ LocationNeed locationNeed) throws AccumuloException,
TableNotFoundException,
+ InvalidTabletHostingRequestException, AccumuloSecurityException {
+
+ // its only expected that this method is called when location need is
required
+ Preconditions.checkArgument(locationNeed == LocationNeed.REQUIRED);
+
+ Map<KeyExtent,CachedTablet> extentsToHost;
+
+ if (hostAheadCount > 0) {
+ extentsToHost = new HashMap<>();
+ if (firstTablet.getTserverLocation().isEmpty()) {
+ extentsToHost.put(firstTablet.getExtent(), firstTablet);
+ }
+
+ KeyExtent extent = firstTablet.getExtent();
+
+ var currTablet = extent;
+
+ for (int i = 0; i < hostAheadCount; i++) {
+ if (currTablet.endRow() == null || !hostAheadRange.contains(new
Key(currTablet.endRow()))) {
+ break;
+ }
+
+ CachedTablet followingTablet =
+ _findTablet(context, currTablet.endRow(), true, false, true,
lcSession, locationNeed);
+
+ if (followingTablet == null) {
+ break;
+ }
+
+ currTablet = followingTablet.getExtent();
+
+ if (followingTablet.getTserverLocation().isEmpty()
+ && !followingTablet.wasHostingRequested()) {
+ extentsToHost.put(followingTablet.getExtent(), followingTablet);
+ }
+ }
+ } else if (firstTablet.getTserverLocation().isEmpty()) {
+ extentsToHost = Map.of(firstTablet.getExtent(), firstTablet);
+ } else {
+ extentsToHost = Map.of();
+ }
+ return extentsToHost;
+ }
+
@Override
public long getTabletHostingRequestCount() {
return tabletHostingRequestCount.get();
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
index 7e35fb259e..f86d90b6f8 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java
@@ -140,7 +140,7 @@ public class RootClientTabletCache extends
ClientTabletCache {
@Override
public CachedTablet findTablet(ClientContext context, Text row, boolean
skipRow,
- LocationNeed locationNeed) {
+ LocationNeed locationNeed, int hostAheadCount, Range hostAheadRange) {
CachedTablet cachedTablet = getRootTabletLocation(context);
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
index d8be1c639a..cd6e707bf7 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java
@@ -67,9 +67,11 @@ public class SyncingClientTabletCache extends
ClientTabletCache {
@Override
public CachedTablet findTablet(ClientContext context, Text row, boolean
skipRow,
- LocationNeed locationNeed) throws AccumuloException,
AccumuloSecurityException,
- TableNotFoundException, InvalidTabletHostingRequestException {
- return syncLocator().findTablet(context, row, skipRow, locationNeed);
+ LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException,
+ InvalidTabletHostingRequestException {
+ return syncLocator().findTablet(context, row, skipRow, locationNeed,
minimumHostAhead,
+ hostAheadRange);
}
@Override
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index f7b2aeb9f2..46d5be5e89 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -253,6 +253,10 @@ public class ThriftScanner {
Duration busyTimeout;
+ int tabletsScanned;
+
+ KeyExtent prevExtent = null;
+
public ScanState(ClientContext context, TableId tableId, Authorizations
authorizations,
Range range, SortedSet<Column> fetchedColumns, int size,
List<IterInfo> serverSideIteratorList,
@@ -302,6 +306,18 @@ public class ThriftScanner {
if (useScanServer) {
scanAttempts = new ScanServerAttemptsImpl();
}
+
+ this.tabletsScanned = 0;
+ }
+
+ long startTimeNanos = 0;
+ long getNextScanAddressTimeNanos = 0;
+
+ public void incrementTabletsScanned(KeyExtent extent) {
+ if (!extent.equals(prevExtent)) {
+ tabletsScanned++;
+ prevExtent = extent;
+ }
}
}
@@ -456,6 +472,63 @@ public class ThriftScanner {
return Optional.of(addr);
}
+ /**
+ * @see ClientTabletCache#findTablet(ClientContext, Text, boolean,
ClientTabletCache.LocationNeed,
+ * int, Range)
+ */
+ private static int computeMinimumHostAhead(ScanState scanState,
+ ClientTabletCache.LocationNeed hostingNeed) {
+ int minimumHostAhead = 0;
+
+ if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) {
+ long currTime = System.nanoTime();
+
+ double timeRatio = 0;
+
+ long totalTime = currTime - scanState.startTimeNanos;
+ if (totalTime > 0) {
+ // The following computes (total time spent in this method)/(total
time scanning and time
+ // spent in this method)
+ timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double)
(totalTime);
+ }
+
+ if (timeRatio < 0 || timeRatio > 1) {
+ log.warn("Computed ratio of time spent in getNextScanAddress has
unexpected value : {} ",
+ timeRatio);
+ timeRatio = Math.max(0.0, Math.min(1.0, timeRatio));
+ }
+
+ //
+ // Do not want to host all tablets in the scan range in case not all
data in the range is
+ // read. Need to determine how many tablets to host ahead of time. The
following information
+ // is used to determine how many tablets to host ahead of time.
+ //
+ // 1. The number of tablets this scan has already read. The more tablets
that this scan has
+ // read, the more likely that it will read many more tablets.
+ //
+ // 2. The timeRatio computed above. As timeRatio approaches 1.0 it means
we are spending
+ // most of our time waiting on the next tablet to have an address. When
are spending most of
+ // our time waiting for a tablet to have an address we want to increase
the number of tablets
+ // we request to host ahead of time so we can hopefully spend less time
waiting on that.
+ //
+
+ if (timeRatio > .9) {
+ minimumHostAhead = 16;
+ } else if (timeRatio > .75) {
+ minimumHostAhead = 8;
+ } else if (timeRatio > .5) {
+ minimumHostAhead = 4;
+ } else if (timeRatio > .25) {
+ minimumHostAhead = 2;
+ } else {
+ minimumHostAhead = 1;
+ }
+
+ minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead);
+ }
+ return minimumHostAhead;
+ }
+
static ScanAddress getNextScanAddress(ClientContext context, ScanState
scanState, long timeOut,
long startTime, long maxSleepTime)
throws TableNotFoundException, AccumuloSecurityException,
AccumuloServerException,
@@ -470,6 +543,8 @@ public class ThriftScanner {
var hostingNeed = scanState.runOnScanServer ?
ClientTabletCache.LocationNeed.NOT_REQUIRED
: ClientTabletCache.LocationNeed.REQUIRED;
+ int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed);
+
while (addr == null) {
long currentTime = System.currentTimeMillis();
if ((currentTime - startTime) / 1000.0 > timeOut) {
@@ -482,7 +557,8 @@ public class ThriftScanner {
try (Scope locateSpan = child1.makeCurrent()) {
loc = ClientTabletCache.getInstance(context,
scanState.tableId).findTablet(context,
- scanState.startRow, scanState.skipStartRow, hostingNeed);
+ scanState.startRow, scanState.skipStartRow, hostingNeed,
minimumHostAhead,
+ scanState.range);
if (loc == null) {
context.requireNotDeleted(scanState.tableId);
@@ -498,6 +574,8 @@ public class ThriftScanner {
lastError = error;
sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
} else {
+ scanState.incrementTabletsScanned(loc.getExtent());
+
// when a tablet splits we do want to continue scanning the low child
// of the split if we are already passed it
Range dataRange = loc.getExtent().toDataRange();
@@ -578,7 +656,20 @@ public class ThriftScanner {
"Failed to retrieve next batch of key values before timeout");
}
- ScanAddress addr = getNextScanAddress(context, scanState, timeOut,
startTime, maxSleepTime);
+ ScanAddress addr;
+ long beginTime = System.nanoTime();
+ try {
+ addr = getNextScanAddress(context, scanState, timeOut, startTime,
maxSleepTime);
+ } finally {
+ // track the initial time that we started tracking the time for
getting the next scan
+ // address
+ if (scanState.startTimeNanos == 0) {
+ scanState.startTimeNanos = beginTime;
+ }
+
+ // track the total amount of time spent getting the next scan address
+ scanState.getNextScanAddressTimeNanos += System.nanoTime() -
beginTime;
+ }
Span child2 = TraceUtil.startSpan(ThriftScanner.class,
"scan::location",
Map.of("tserver", addr.serverAddress));
@@ -838,8 +929,6 @@ public class ThriftScanner {
sr.results.size(), scanState.scanID);
}
} else {
- // log.debug("No more : tab end row =
"+loc.tablet_extent.getEndRow()+" range =
- // "+scanState.range);
if (addr.getExtent().endRow() == null) {
scanState.finished = true;
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
index 1c8e3c3a07..b9610da386 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java
@@ -62,10 +62,12 @@ public class TimeoutClientTabletCache extends
SyncingClientTabletCache {
@Override
public CachedTablet findTablet(ClientContext context, Text row, boolean
skipRow,
- LocationNeed locationNeed) throws AccumuloException,
AccumuloSecurityException,
- TableNotFoundException, InvalidTabletHostingRequestException {
+ LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException,
+ InvalidTabletHostingRequestException {
try {
- CachedTablet ret = super.findTablet(context, row, skipRow, locationNeed);
+ CachedTablet ret =
+ super.findTablet(context, row, skipRow, locationNeed,
minimumHostAhead, hostAheadRange);
if (ret == null) {
failed();
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 1530c9ad34..852d2c2ec3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -50,13 +50,11 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -315,10 +313,6 @@ public class SplitIT extends AccumuloClusterHarness {
assertEquals(totalSplits, new
HashSet<>(c.tableOperations().listSplits(tableName)),
"Did not see expected splits");
- // ELASTICITY_TODO the following could be removed after #3309. Currently
scanning an ondemand
- // table with lots of tablets will cause the test to timeout.
- c.tableOperations().setTabletHostingGoal(tableName, new Range(),
TabletHostingGoal.ALWAYS);
-
log.debug("Verifying {} rows ingested into {}", numRows, tableName);
VerifyIngest.verifyIngest(c, params);
}