This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 78dc351005 pre load ondemand tablets for linear scans (#3429) 78dc351005 is described below commit 78dc3510057bbe8b90f4829b2df8b51058b301f7 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jul 5 16:35:59 2023 -0400 pre load ondemand tablets for linear scans (#3429) --- .../core/clientImpl/ClientTabletCache.java | 36 +++++++- .../core/clientImpl/ClientTabletCacheImpl.java | 71 ++++++++++++++-- .../core/clientImpl/RootClientTabletCache.java | 2 +- .../core/clientImpl/SyncingClientTabletCache.java | 8 +- .../accumulo/core/clientImpl/ThriftScanner.java | 97 +++++++++++++++++++++- .../core/clientImpl/TimeoutClientTabletCache.java | 8 +- .../apache/accumulo/test/functional/SplitIT.java | 6 -- 7 files changed, 204 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java index fb8c4f7a8c..594e2f85bd 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java @@ -75,6 +75,36 @@ public abstract class ClientTabletCache { REQUIRED, NOT_REQUIRED } + /** + * This method allows linear scans to host tablet ahead of time that they may read in the future. + * The goal of this method is to allow tablets to request hosting of tablet for a scan before the + * scan actually needs it. Below is an example of how this method could work with a scan when + * {@code minimumHostAhead=4} is passed and avoid the scan having to wait on tablet hosting. + * + * <ol> + * <li>4*2 tablets are initially hosted (the scan has to wait on this)</li> + * <li>The 1st,2nd,3rd, and 4th tablets are read by the scan</li> + * <li>The request to read the 5th tablets causes a request to host 4 more tablets (this would be + * the 9th,10th,11th, and 12th tablets)</li> + * <li>The 5th,6th,7th, and 8th tablet are read by the scan</li> + * <li>While the scan does the read above, the 9th,10th,11th, and 12th tablets are actually + * hosted. This happens concurrently with the scan above.</li> + * <li>When the scan goes to read the 9th tablet, hopefully its already hosted. Also attempting to + * read the 9th tablet will cause a request to host the 13th,14th,15th, and 16th tablets.</li> + * </ol> + * + * In the situation above, the goal is that while we are reading 4 hosted tablets the 4 following + * tablets are in the process of being hosted. + * + * @param minimumHostAhead Attempts to keep between minimumHostAhead and 2*minimumHostAhead + * tablets following the found tablet hosted. + * @param hostAheadRange Only host following tablets that are within this range. + */ + public abstract CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + /** * Finds the tablet that contains the given row. * @@ -85,9 +115,11 @@ public abstract class ClientTabletCache { * @return overlapping tablet. If no overlapping tablet exists, returns null. If location is * required and the tablet currently has no location ,returns null. */ - public abstract CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException, InvalidTabletHostingRequestException; + TableNotFoundException, InvalidTabletHostingRequestException { + return findTablet(context, row, skipRow, locationNeed, 0, null); + } public CachedTablet findTabletWithRetry(ClientContext context, Text row, boolean skipRow, LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java index 5e3e8bb440..85271cac69 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java @@ -67,6 +67,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -535,8 +536,9 @@ public class ClientTabletCacheImpl extends ClientTabletCache { @Override public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, - LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException, InvalidTabletHostingRequestException { + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { OpTimer timer = null; @@ -556,15 +558,74 @@ public class ClientTabletCacheImpl extends ClientTabletCache { String.format("%.3f secs", timer.scale(SECONDS))); } - if (tl != null && locationNeed == LocationNeed.REQUIRED && tl.getTserverLocation().isEmpty()) { - requestTabletHosting(context, List.of(tl)); - return null; + if (tl != null && locationNeed == LocationNeed.REQUIRED) { + // Look at the next (minimumHostAhead * 2) tablets and return which ones need hosting. See the + // javadoc in the superclass of this method for more details. + Map<KeyExtent,CachedTablet> extentsToHost = findExtentsToHost(context, minimumHostAhead * 2, + hostAheadRange, lcSession, tl, locationNeed); + + if (!extentsToHost.isEmpty()) { + if (extentsToHost.containsKey(tl.getExtent()) || extentsToHost.size() >= minimumHostAhead) { + requestTabletHosting(context, extentsToHost.values()); + } + } + + if (tl.getTserverLocation().isEmpty()) { + return null; + } } return tl; } + private Map<KeyExtent,CachedTablet> findExtentsToHost(ClientContext context, int hostAheadCount, + Range hostAheadRange, LockCheckerSession lcSession, CachedTablet firstTablet, + LocationNeed locationNeed) throws AccumuloException, TableNotFoundException, + InvalidTabletHostingRequestException, AccumuloSecurityException { + + // its only expected that this method is called when location need is required + Preconditions.checkArgument(locationNeed == LocationNeed.REQUIRED); + + Map<KeyExtent,CachedTablet> extentsToHost; + + if (hostAheadCount > 0) { + extentsToHost = new HashMap<>(); + if (firstTablet.getTserverLocation().isEmpty()) { + extentsToHost.put(firstTablet.getExtent(), firstTablet); + } + + KeyExtent extent = firstTablet.getExtent(); + + var currTablet = extent; + + for (int i = 0; i < hostAheadCount; i++) { + if (currTablet.endRow() == null || !hostAheadRange.contains(new Key(currTablet.endRow()))) { + break; + } + + CachedTablet followingTablet = + _findTablet(context, currTablet.endRow(), true, false, true, lcSession, locationNeed); + + if (followingTablet == null) { + break; + } + + currTablet = followingTablet.getExtent(); + + if (followingTablet.getTserverLocation().isEmpty() + && !followingTablet.wasHostingRequested()) { + extentsToHost.put(followingTablet.getExtent(), followingTablet); + } + } + } else if (firstTablet.getTserverLocation().isEmpty()) { + extentsToHost = Map.of(firstTablet.getExtent(), firstTablet); + } else { + extentsToHost = Map.of(); + } + return extentsToHost; + } + @Override public long getTabletHostingRequestCount() { return tabletHostingRequestCount.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java index 7e35fb259e..f86d90b6f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootClientTabletCache.java @@ -140,7 +140,7 @@ public class RootClientTabletCache extends ClientTabletCache { @Override public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, - LocationNeed locationNeed) { + LocationNeed locationNeed, int hostAheadCount, Range hostAheadRange) { CachedTablet cachedTablet = getRootTabletLocation(context); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java index d8be1c639a..cd6e707bf7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingClientTabletCache.java @@ -67,9 +67,11 @@ public class SyncingClientTabletCache extends ClientTabletCache { @Override public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, - LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException, InvalidTabletHostingRequestException { - return syncLocator().findTablet(context, row, skipRow, locationNeed); + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { + return syncLocator().findTablet(context, row, skipRow, locationNeed, minimumHostAhead, + hostAheadRange); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index f7b2aeb9f2..46d5be5e89 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -253,6 +253,10 @@ public class ThriftScanner { Duration busyTimeout; + int tabletsScanned; + + KeyExtent prevExtent = null; + public ScanState(ClientContext context, TableId tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size, List<IterInfo> serverSideIteratorList, @@ -302,6 +306,18 @@ public class ThriftScanner { if (useScanServer) { scanAttempts = new ScanServerAttemptsImpl(); } + + this.tabletsScanned = 0; + } + + long startTimeNanos = 0; + long getNextScanAddressTimeNanos = 0; + + public void incrementTabletsScanned(KeyExtent extent) { + if (!extent.equals(prevExtent)) { + tabletsScanned++; + prevExtent = extent; + } } } @@ -456,6 +472,63 @@ public class ThriftScanner { return Optional.of(addr); } + /** + * @see ClientTabletCache#findTablet(ClientContext, Text, boolean, ClientTabletCache.LocationNeed, + * int, Range) + */ + private static int computeMinimumHostAhead(ScanState scanState, + ClientTabletCache.LocationNeed hostingNeed) { + int minimumHostAhead = 0; + + if (hostingNeed == ClientTabletCache.LocationNeed.REQUIRED) { + long currTime = System.nanoTime(); + + double timeRatio = 0; + + long totalTime = currTime - scanState.startTimeNanos; + if (totalTime > 0) { + // The following computes (total time spent in this method)/(total time scanning and time + // spent in this method) + timeRatio = (double) scanState.getNextScanAddressTimeNanos / (double) (totalTime); + } + + if (timeRatio < 0 || timeRatio > 1) { + log.warn("Computed ratio of time spent in getNextScanAddress has unexpected value : {} ", + timeRatio); + timeRatio = Math.max(0.0, Math.min(1.0, timeRatio)); + } + + // + // Do not want to host all tablets in the scan range in case not all data in the range is + // read. Need to determine how many tablets to host ahead of time. The following information + // is used to determine how many tablets to host ahead of time. + // + // 1. The number of tablets this scan has already read. The more tablets that this scan has + // read, the more likely that it will read many more tablets. + // + // 2. The timeRatio computed above. As timeRatio approaches 1.0 it means we are spending + // most of our time waiting on the next tablet to have an address. When are spending most of + // our time waiting for a tablet to have an address we want to increase the number of tablets + // we request to host ahead of time so we can hopefully spend less time waiting on that. + // + + if (timeRatio > .9) { + minimumHostAhead = 16; + } else if (timeRatio > .75) { + minimumHostAhead = 8; + } else if (timeRatio > .5) { + minimumHostAhead = 4; + } else if (timeRatio > .25) { + minimumHostAhead = 2; + } else { + minimumHostAhead = 1; + } + + minimumHostAhead = Math.min(scanState.tabletsScanned, minimumHostAhead); + } + return minimumHostAhead; + } + static ScanAddress getNextScanAddress(ClientContext context, ScanState scanState, long timeOut, long startTime, long maxSleepTime) throws TableNotFoundException, AccumuloSecurityException, AccumuloServerException, @@ -470,6 +543,8 @@ public class ThriftScanner { var hostingNeed = scanState.runOnScanServer ? ClientTabletCache.LocationNeed.NOT_REQUIRED : ClientTabletCache.LocationNeed.REQUIRED; + int minimumHostAhead = computeMinimumHostAhead(scanState, hostingNeed); + while (addr == null) { long currentTime = System.currentTimeMillis(); if ((currentTime - startTime) / 1000.0 > timeOut) { @@ -482,7 +557,8 @@ public class ThriftScanner { try (Scope locateSpan = child1.makeCurrent()) { loc = ClientTabletCache.getInstance(context, scanState.tableId).findTablet(context, - scanState.startRow, scanState.skipStartRow, hostingNeed); + scanState.startRow, scanState.skipStartRow, hostingNeed, minimumHostAhead, + scanState.range); if (loc == null) { context.requireNotDeleted(scanState.tableId); @@ -498,6 +574,8 @@ public class ThriftScanner { lastError = error; sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); } else { + scanState.incrementTabletsScanned(loc.getExtent()); + // when a tablet splits we do want to continue scanning the low child // of the split if we are already passed it Range dataRange = loc.getExtent().toDataRange(); @@ -578,7 +656,20 @@ public class ThriftScanner { "Failed to retrieve next batch of key values before timeout"); } - ScanAddress addr = getNextScanAddress(context, scanState, timeOut, startTime, maxSleepTime); + ScanAddress addr; + long beginTime = System.nanoTime(); + try { + addr = getNextScanAddress(context, scanState, timeOut, startTime, maxSleepTime); + } finally { + // track the initial time that we started tracking the time for getting the next scan + // address + if (scanState.startTimeNanos == 0) { + scanState.startTimeNanos = beginTime; + } + + // track the total amount of time spent getting the next scan address + scanState.getNextScanAddressTimeNanos += System.nanoTime() - beginTime; + } Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location", Map.of("tserver", addr.serverAddress)); @@ -838,8 +929,6 @@ public class ThriftScanner { sr.results.size(), scanState.scanID); } } else { - // log.debug("No more : tab end row = "+loc.tablet_extent.getEndRow()+" range = - // "+scanState.range); if (addr.getExtent().endRow() == null) { scanState.finished = true; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java index 1c8e3c3a07..b9610da386 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TimeoutClientTabletCache.java @@ -62,10 +62,12 @@ public class TimeoutClientTabletCache extends SyncingClientTabletCache { @Override public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, - LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException, InvalidTabletHostingRequestException { + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException { try { - CachedTablet ret = super.findTablet(context, row, skipRow, locationNeed); + CachedTablet ret = + super.findTablet(context, row, skipRow, locationNeed, minimumHostAhead, hostAheadRange); if (ret == null) { failed(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index 1530c9ad34..852d2c2ec3 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@ -50,13 +50,11 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -315,10 +313,6 @@ public class SplitIT extends AccumuloClusterHarness { assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)), "Did not see expected splits"); - // ELASTICITY_TODO the following could be removed after #3309. Currently scanning an ondemand - // table with lots of tablets will cause the test to timeout. - c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); - log.debug("Verifying {} rows ingested into {}", numRows, tableName); VerifyIngest.verifyIngest(c, params); }