This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 149b65ce24 Improves scan server resolution in batch scanner code.
(#3273)
149b65ce24 is described below
commit 149b65ce242901ed9a579a8d2b0badc53c24ae04
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 5 16:46:59 2023 -0400
Improves scan server resolution in batch scanner code. (#3273)
Refactors the batch scanner code to make two improvements. First
mapping ranges to scan servers or tservers was moved under one method.
Second the process of mapping ranges to scan servers using the tablet
location cache was improved. The tablet location cache used to organized
data in a a way that was useful for immediate scans and eventual scans
had to reorganize the data. A new method was added to the tablet
location cache that passes the raw data to a consumer that can orgranize
it in any way. This new method is used for scan servers.
---
.../core/clientImpl/RootTabletLocator.java | 8 +-
.../core/clientImpl/SyncingTabletLocator.java | 8 ++
.../accumulo/core/clientImpl/TabletLocator.java | 23 +++-
.../core/clientImpl/TabletLocatorImpl.java | 32 ++---
.../TabletServerBatchReaderIterator.java | 147 ++++++++++++++-------
.../accumulo/server/client/BulkImporterTest.java | 5 +-
6 files changed, 149 insertions(+), 74 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java
index a24a236998..f29a27b591 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/RootTabletLocator.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
import org.apache.accumulo.core.Constants;
import
org.apache.accumulo.core.clientImpl.TabletLocatorImpl.TabletServerLockChecker;
@@ -67,14 +68,13 @@ public class RootTabletLocator extends TabletLocator {
}
@Override
- public List<Range> binRanges(ClientContext context, List<Range> ranges,
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
+ public List<Range> locateTablets(ClientContext context, List<Range> ranges,
+ BiConsumer<TabletLocation,Range> rangeConsumer) {
TabletLocation rootTabletLocation = getRootTabletLocation(context);
if (rootTabletLocation != null) {
for (Range range : ranges) {
- TabletLocatorImpl.addRange(binnedRanges,
rootTabletLocation.getTserverLocation(),
- RootTable.EXTENT, range);
+ rangeConsumer.accept(rootTabletLocation, range);
}
return Collections.emptyList();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java
index df40a21ce1..dc38d18f8a 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/SyncingTabletLocator.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.clientImpl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.accumulo.core.client.AccumuloException;
@@ -76,6 +77,13 @@ public class SyncingTabletLocator extends TabletLocator {
syncLocator().binMutations(context, mutations, binnedMutations, failures);
}
+ @Override
+ public List<Range> locateTablets(ClientContext context, List<Range> ranges,
+ BiConsumer<TabletLocation,Range> rangeConsumer)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+ return syncLocator().locateTablets(context, ranges, rangeConsumer);
+ }
+
@Override
public List<Range> binRanges(ClientContext context, List<Range> ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
index fd30d620b9..a60e63d213 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.BiConsumer;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -64,10 +65,28 @@ public abstract class TabletLocator {
Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException;
- public abstract List<Range> binRanges(ClientContext context, List<Range>
ranges,
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
+ /**
+ * This method finds what tablets overlap a given set of ranges, passing
each range and its
+ * associated tablet to the range consumer. If a range overlaps multiple
tablets then it can be
+ * passed to the range consumer multiple times.
+ */
+ public abstract List<Range> locateTablets(ClientContext context, List<Range>
ranges,
+ BiConsumer<TabletLocation,Range> rangeConsumer)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException;
+ /**
+ * The behavior of this method is similar to
+ * {@link #locateTablets(ClientContext, List, BiConsumer)}, except it bins
ranges to the passed in
+ * binnedRanges map instead of passing them to a consumer.
+ *
+ */
+ public List<Range> binRanges(ClientContext context, List<Range> ranges,
+ Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+ return locateTablets(context, ranges,
+ ((cachedTablet, range) -> TabletLocatorImpl.addRange(binnedRanges,
cachedTablet, range)));
+ }
+
public abstract void invalidateCache(KeyExtent failedExtent);
public abstract void invalidateCache(Collection<KeyExtent> keySet);
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java
index 69345af8cd..a0b4335ff6 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java
@@ -38,6 +38,7 @@ import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -293,8 +294,8 @@ public class TabletLocatorImpl extends TabletLocator {
return true;
}
- private List<Range> binRanges(ClientContext context, List<Range> ranges,
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache,
+ private List<Range> locateTablets(ClientContext context, List<Range> ranges,
+ BiConsumer<TabletLocation,Range> rangeConsumer, boolean useCache,
LockCheckerSession lcSession)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
List<Range> failures = new ArrayList<>();
@@ -359,8 +360,7 @@ public class TabletLocatorImpl extends TabletLocator {
// then after that merges and splits happen.
if (isContiguous(tabletLocations)) {
for (TabletLocation tl2 : tabletLocations) {
- TabletLocatorImpl.addRange(binnedRanges, tl2.getTserverLocation(),
tl2.getExtent(),
- range);
+ rangeConsumer.accept(tl2, range);
}
} else {
failures.add(range);
@@ -375,8 +375,8 @@ public class TabletLocatorImpl extends TabletLocator {
}
@Override
- public List<Range> binRanges(ClientContext context, List<Range> ranges,
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
+ public List<Range> locateTablets(ClientContext context, List<Range> ranges,
+ BiConsumer<TabletLocation,Range> rangeConsumer)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
/*
@@ -404,7 +404,7 @@ public class TabletLocatorImpl extends TabletLocator {
// sort ranges... therefore try binning ranges using only the cache
// and sort whatever fails and retry
- failures = binRanges(context, ranges, binnedRanges, true, lcSession);
+ failures = locateTablets(context, ranges, rangeConsumer, true,
lcSession);
} finally {
rLock.unlock();
}
@@ -416,7 +416,7 @@ public class TabletLocatorImpl extends TabletLocator {
// try lookups again
wLock.lock();
try {
- failures = binRanges(context, failures, binnedRanges, false,
lcSession);
+ failures = locateTablets(context, failures, rangeConsumer, false,
lcSession);
} finally {
wLock.unlock();
}
@@ -424,9 +424,8 @@ public class TabletLocatorImpl extends TabletLocator {
if (timer != null) {
timer.stop();
- log.trace("tid={} Binned {} ranges for table {} to {} tservers in {}",
- Thread.currentThread().getId(), ranges.size(), tableId,
binnedRanges.size(),
- String.format("%.3f secs", timer.scale(SECONDS)));
+ log.trace("tid={} Binned {} ranges for table {} in {}",
Thread.currentThread().getId(),
+ ranges.size(), tableId, String.format("%.3f secs",
timer.scale(SECONDS)));
}
return failures;
@@ -839,7 +838,8 @@ public class TabletLocatorImpl extends TabletLocator {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
- parent.binRanges(context, lookups, binnedRanges);
+ parent.locateTablets(context, lookups,
+ (cachedTablet, range) -> addRange(binnedRanges, cachedTablet,
range));
// randomize server order
ArrayList<String> tabletServers = new ArrayList<>(binnedRanges.keySet());
@@ -861,10 +861,10 @@ public class TabletLocatorImpl extends TabletLocator {
}
}
- protected static void addRange(Map<String,Map<KeyExtent,List<Range>>>
binnedRanges,
- String location, KeyExtent ke, Range range) {
- binnedRanges.computeIfAbsent(location, k -> new HashMap<>())
- .computeIfAbsent(ke, k -> new ArrayList<>()).add(range);
+ static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
TabletLocation ct,
+ Range range) {
+ binnedRanges.computeIfAbsent(ct.getTserverLocation(), k -> new HashMap<>())
+ .computeIfAbsent(ct.getExtent(), k -> new ArrayList<>()).add(range);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 4dbfbffab8..0c37ac125e 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -34,6 +34,7 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -238,21 +239,36 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
- binRanges(locator, ranges, binnedRanges);
+ var ssd = binRanges(locator, ranges, binnedRanges);
- doLookups(binnedRanges, receiver, columns);
+ doLookups(binnedRanges, receiver, columns, ssd);
}
- private void binRanges(TabletLocator tabletLocator, List<Range> ranges,
+ private ScanServerData binRanges(TabletLocator tabletLocator, List<Range>
ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
int lastFailureSize = Integer.MAX_VALUE;
+ ScanServerData ssd;
+
+ long startTime = System.currentTimeMillis();
+
while (true) {
binnedRanges.clear();
- List<Range> failures = tabletLocator.binRanges(context, ranges,
binnedRanges);
+
+ List<Range> failures;
+
+ if (options.getConsistencyLevel().equals(ConsistencyLevel.IMMEDIATE)) {
+ failures = tabletLocator.binRanges(context, ranges, binnedRanges);
+ ssd = new ScanServerData();
+ } else if
(options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
+ ssd = binRangesForScanServers(tabletLocator, ranges, binnedRanges);
+ failures = ssd.failures;
+ } else {
+ throw new IllegalStateException();
+ }
if (failures.isEmpty()) {
break;
@@ -275,6 +291,12 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
failures.size());
}
+ if (System.currentTimeMillis() - startTime > retryTimeout) {
+ // TODO exception used for timeout is inconsistent
+ throw new TimedOutException(
+ "Failed to find servers to process scans before timeout was
exceeded.");
+ }
+
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -302,6 +324,8 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
binnedRanges.clear();
binnedRanges.putAll(binnedRanges2);
+
+ return ssd;
}
private void processFailures(Map<KeyExtent,List<Range>> failures,
ResultReceiver receiver,
@@ -337,9 +361,10 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
// since the first call to binRanges clipped the ranges to within a
tablet, we should not get
// only
// bin to the set of failed tablets
- binRanges(locator, allRanges, binnedRanges);
- doLookups(binnedRanges, receiver, columns);
+ var ssd = binRanges(locator, allRanges, binnedRanges);
+
+ doLookups(binnedRanges, receiver, columns, ssd);
}
private String getTableInfo() {
@@ -494,21 +519,11 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
}
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
- final ResultReceiver receiver, List<Column> columns) {
+ final ResultReceiver receiver, List<Column> columns, ScanServerData ssd)
{
int maxTabletsPerRequest = Integer.MAX_VALUE;
- long busyTimeout = 0;
- Duration scanServerSelectorDelay = null;
- Map<String,ScanServerAttemptReporter> reporters = Map.of();
-
- if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
- var scanServerData = rebinToScanServers(binnedRanges);
- busyTimeout = scanServerData.actions.getBusyTimeout().toMillis();
- reporters = scanServerData.reporters;
- scanServerSelectorDelay = scanServerData.actions.getDelay();
- binnedRanges = scanServerData.binnedRanges;
- } else {
+ if (options.getConsistencyLevel().equals(ConsistencyLevel.IMMEDIATE)) {
// when there are lots of threads and a few tablet servers
// it is good to break request to tablet servers up, the
// following code determines if this is the case
@@ -558,16 +573,16 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
final Map<KeyExtent,List<Range>> tabletsRanges =
binnedRanges.get(tsLocation);
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() ==
1) {
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges,
failures, receiver, columns,
- busyTimeout, reporters.getOrDefault(tsLocation, r -> {}),
scanServerSelectorDelay);
+ ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r ->
{}), ssd.getDelay());
queryTasks.add(queryTask);
} else {
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
tabletSubset.put(entry.getKey(), entry.getValue());
if (tabletSubset.size() >= maxTabletsPerRequest) {
- QueryTask queryTask =
- new QueryTask(tsLocation, tabletSubset, failures, receiver,
columns, busyTimeout,
- reporters.getOrDefault(tsLocation, r -> {}),
scanServerSelectorDelay);
+ QueryTask queryTask = new QueryTask(tsLocation, tabletSubset,
failures, receiver,
+ columns, ssd.getBusyTimeout(),
ssd.reporters.getOrDefault(tsLocation, r -> {}),
+ ssd.getDelay());
queryTasks.add(queryTask);
tabletSubset = new HashMap<>();
}
@@ -575,7 +590,8 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
if (!tabletSubset.isEmpty()) {
QueryTask queryTask = new QueryTask(tsLocation, tabletSubset,
failures, receiver, columns,
- busyTimeout, reporters.getOrDefault(tsLocation, r -> {}),
scanServerSelectorDelay);
+ ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r
-> {}),
+ ssd.getDelay());
queryTasks.add(queryTask);
}
}
@@ -591,17 +607,59 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
}
private static class ScanServerData {
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
- ScanServerSelections actions;
- Map<String,ScanServerAttemptReporter> reporters;
+ final List<Range> failures;
+ final ScanServerSelections actions;
+ final Map<String,ScanServerAttemptReporter> reporters;
+
+ public ScanServerData(List<Range> failures) {
+ this.failures = failures;
+ this.actions = null;
+ this.reporters = Map.of();
+ }
+
+ public ScanServerData(ScanServerSelections actions,
+ Map<String,ScanServerAttemptReporter> reporters) {
+ this.actions = actions;
+ this.reporters = reporters;
+ this.failures = List.of();
+ }
+
+ public ScanServerData() {
+ this.failures = List.of();
+ this.actions = null;
+ this.reporters = Map.of();
+ }
+
+ public long getBusyTimeout() {
+ return actions == null ? 0L : actions.getBusyTimeout().toMillis();
+ }
+
+ public Duration getDelay() {
+ return actions == null ? null : actions.getDelay();
+ }
}
- private ScanServerData
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
+ private ScanServerData binRangesForScanServers(TabletLocator tabletLocator,
List<Range> ranges,
+ Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
+ throws AccumuloException, TableNotFoundException,
AccumuloSecurityException {
+
ScanServerSelector ecsm = context.getScanServerSelector();
- List<TabletIdImpl> tabletIds =
- binnedRanges.values().stream().flatMap(extentMap ->
extentMap.keySet().stream())
- .map(TabletIdImpl::new).collect(Collectors.toList());
+ Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
+ Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>();
+
+ Set<TabletIdImpl> tabletIds = new HashSet<>();
+
+ List<Range> failures = tabletLocator.locateTablets(context, ranges,
(cachedTablet, range) -> {
+ extentToTserverMap.put(cachedTablet.getExtent(),
cachedTablet.getTserverLocation());
+ extentToRangesMap.computeIfAbsent(cachedTablet.getExtent(), k -> new
ArrayList<>())
+ .add(range);
+ tabletIds.add(new TabletIdImpl(cachedTablet.getExtent()));
+ });
+
+ if (!failures.isEmpty()) {
+ return new ScanServerData(failures);
+ }
// get a snapshot of this once,not each time the plugin request it
var scanAttemptsSnapshot = scanAttempts.snapshot();
@@ -625,18 +683,6 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
var actions = ecsm.selectServers(params);
- Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
- Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>();
-
- binnedRanges.forEach((server, extentMap) -> {
- extentMap.forEach((extent, ranges) -> {
- extentToTserverMap.put(extent, server);
- extentToRangesMap.put(extent, ranges);
- });
- });
-
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
-
Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
for (TabletIdImpl tabletId : tabletIds) {
@@ -644,25 +690,26 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
String serverToUse = actions.getScanServer(tabletId);
if (serverToUse == null) {
// no scan server was given so use the tablet server
- serverToUse = extentToTserverMap.get(extent);
+ serverToUse = Objects.requireNonNull(extentToTserverMap.get(extent));
log.trace("For tablet {} scan server selector chose tablet_server",
tabletId);
} else {
log.trace("For tablet {} scan server selector chose scan_server:{}",
tabletId, serverToUse);
}
- var rangeMap = binnedRanges2.computeIfAbsent(serverToUse, k -> new
HashMap<>());
- List<Range> ranges = extentToRangesMap.get(extent);
- rangeMap.put(extent, ranges);
+ var rangeMap = binnedRanges.computeIfAbsent(serverToUse, k -> new
HashMap<>());
+ List<Range> extentRanges = extentToRangesMap.get(extent);
+ rangeMap.put(extent, extentRanges);
var server = serverToUse;
reporters.computeIfAbsent(serverToUse, k ->
scanAttempts.createReporter(server, tabletId));
}
- ScanServerData ssd = new ScanServerData();
+ if (!failures.isEmpty()) {
+ return new ScanServerData(failures);
+ }
+
+ ScanServerData ssd = new ScanServerData(actions, reporters);
- ssd.binnedRanges = binnedRanges2;
- ssd.actions = actions;
- ssd.reporters = reporters;
log.trace("Scan server selector chose delay:{} busyTimeout:{}",
actions.getDelay(),
actions.getBusyTimeout());
return ssd;
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index e37783e826..d72e8331cd 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.function.BiConsumer;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletLocator;
@@ -83,8 +84,8 @@ public class BulkImporterTest {
}
@Override
- public List<Range> binRanges(ClientContext context, List<Range> ranges,
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
+ public List<Range> locateTablets(ClientContext context, List<Range> ranges,
+ BiConsumer<TabletLocation,Range> rangeConsumer) {
throw new UnsupportedOperationException();
}