This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new b3d32efc87 Resolves server to use for scan in a single place (#3272)
b3d32efc87 is described below
commit b3d32efc873b6511ab49e15287e99f3218c9954b
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 5 16:45:42 2023 -0400
Resolves server to use for scan in a single place (#3272)
The code for resolving which tserver or sserver to use for a scan was
spread out across multiple methods responsible for executing a scan.
Pulled the code to resolve which server to use into a single place in
the code that executes a scan.
Also introduced a new class to represent the server and server type
(sserver or tserver) used to process a scan.
These changes clean up two problems in the code. First the tablet
server location class was being used to represent a scan server
with a special string placed in the tserver session field. Second
the decision to use a scan server was deeper in the scan code than
error reporting code and the resulted in the need for an odd
instance variable to remember that a scan server was used for error
reporting. Removing these two problems makes the code easier to
modify and maintain.
---
.../accumulo/core/clientImpl/ThriftScanner.java | 419 ++++++++++++---------
1 file changed, 243 insertions(+), 176 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index 663b3748c1..acb2e9b2bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@ -30,6 +30,7 @@ import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -80,6 +81,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import io.opentelemetry.api.trace.Span;
@@ -171,6 +173,44 @@ public class ThriftScanner {
throw new AccumuloException("getBatchFromServer: failed");
}
+ enum ServerType {
+ TSERVER, SSERVER
+ }
+
+ static class ScanAddress {
+ final String serverAddress;
+ final ServerType serverType;
+ final TabletLocation tabletInfo;
+
+ public ScanAddress(String serverAddress, ServerType serverType,
TabletLocation tabletInfo) {
+ this.serverAddress = Objects.requireNonNull(serverAddress);
+ this.serverType = Objects.requireNonNull(serverType);
+ this.tabletInfo = Objects.requireNonNull(tabletInfo);
+ }
+
+ public KeyExtent getExtent() {
+ return tabletInfo.getExtent();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ScanAddress that = (ScanAddress) o;
+ return serverAddress.equals(that.serverAddress) && serverType ==
that.serverType
+ && getExtent().equals(that.getExtent());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(serverAddress, serverType, tabletInfo);
+ }
+ }
+
public static class ScanState {
boolean isolated;
@@ -189,7 +229,7 @@ public class ThriftScanner {
Authorizations authorizations;
List<Column> columns;
- TabletLocation prevLoc;
+ ScanAddress prevLoc;
Long scanID;
String classLoaderContext;
@@ -207,10 +247,6 @@ public class ThriftScanner {
Duration busyTimeout;
- TabletLocation getErrorLocation() {
- return prevLoc;
- }
-
public ScanState(ClientContext context, TableId tableId, Authorizations
authorizations,
Range range, SortedSet<Column> fetchedColumns, int size,
List<IterInfo> serverSideIteratorList,
@@ -280,10 +316,169 @@ public class ThriftScanner {
return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble()
/ 5));
}
+ private static ScanAddress getScanServerAddress(ClientContext context,
ScanState scanState,
+ TabletLocation loc) {
+ Preconditions.checkArgument(scanState.runOnScanServer);
+
+ ScanAddress addr = null;
+
+ if (scanState.scanID != null && scanState.prevLoc != null
+ && scanState.prevLoc.serverType == ServerType.SSERVER
+ && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
+ // this is the case of continuing a scan on a scan server for the same
tablet, so lets not
+ // call the scan server selector and just go back to the previous scan
server
+ addr = scanState.prevLoc;
+ log.trace(
+ "For tablet {} continuing scan on scan server {} without consulting
scan server selector, using busyTimeout {}",
+ loc.getExtent(), addr.serverAddress, scanState.busyTimeout);
+ } else {
+ var tabletId = new TabletIdImpl(loc.getExtent());
+ // obtain a snapshot once and only expose this snapshot to the plugin
for consistency
+ var attempts = scanState.scanAttempts.snapshot();
+
+ var params = new ScanServerSelector.SelectorParameters() {
+
+ @Override
+ public List<TabletId> getTablets() {
+ return List.of(tabletId);
+ }
+
+ @Override
+ public Collection<? extends ScanServerAttempt> getAttempts(TabletId
tabletId) {
+ return attempts.getOrDefault(tabletId, Set.of());
+ }
+
+ @Override
+ public Map<String,String> getHints() {
+ if (scanState.executionHints == null) {
+ return Map.of();
+ }
+ return scanState.executionHints;
+ }
+ };
+
+ ScanServerSelections actions =
context.getScanServerSelector().selectServers(params);
+
+ Duration delay = null;
+
+ String scanServer = actions.getScanServer(tabletId);
+ if (scanServer != null) {
+ addr = new ScanAddress(scanServer, ServerType.SSERVER, loc);
+ delay = actions.getDelay();
+ scanState.busyTimeout = actions.getBusyTimeout();
+ log.trace("For tablet {} scan server selector chose scan_server:{}
delay:{} busyTimeout:{}",
+ loc.getExtent(), scanServer, delay, scanState.busyTimeout);
+ } else {
+ addr = new ScanAddress(loc.getTserverLocation(), ServerType.TSERVER,
loc);
+ delay = actions.getDelay();
+ scanState.busyTimeout = Duration.ZERO;
+ log.trace("For tablet {} scan server selector chose tablet_server",
loc.getExtent());
+ }
+
+ if (!delay.isZero()) {
+ try {
+ Thread.sleep(delay.toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return addr;
+ }
+
+ static ScanAddress getNextScanAddress(ClientContext context, ScanState
scanState, long timeOut,
+ long startTime, long maxSleepTime) throws TableNotFoundException,
AccumuloSecurityException,
+ AccumuloServerException, InterruptedException, ScanTimedOutException {
+
+ String lastError = null;
+ String error = null;
+ long sleepMillis = 100;
+
+ ScanAddress addr = null;
+
+ while (addr == null) {
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - startTime) / 1000.0 > timeOut) {
+ throw new ScanTimedOutException();
+ }
+
+ TabletLocation loc = null;
+
+ Span child1 = TraceUtil.startSpan(ThriftScanner.class,
"scan::locateTablet");
+ try (Scope locateSpan = child1.makeCurrent()) {
+ loc = TabletLocator.getLocator(context,
scanState.tableId).locateTablet(context,
+ scanState.startRow, scanState.skipStartRow, false);
+
+ if (loc == null) {
+ context.requireNotDeleted(scanState.tableId);
+ context.requireNotOffline(scanState.tableId, null);
+
+ error = "Failed to locate tablet for table : " + scanState.tableId +
" row : "
+ + scanState.startRow;
+ if (!error.equals(lastError)) {
+ log.debug("{}", error);
+ } else if (log.isTraceEnabled()) {
+ log.trace("{}", error);
+ }
+ lastError = error;
+ sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
+ } else {
+ // when a tablet splits we do want to continue scanning the low child
+ // of the split if we are already passed it
+ Range dataRange = loc.getExtent().toDataRange();
+
+ if (scanState.range.getStartKey() != null
+ && dataRange.afterEndKey(scanState.range.getStartKey())) {
+ // go to the next tablet
+ scanState.startRow = loc.getExtent().endRow();
+ scanState.skipStartRow = true;
+ // force another lookup
+ loc = null;
+ } else if (scanState.range.getEndKey() != null
+ && dataRange.beforeStartKey(scanState.range.getEndKey())) {
+ // should not happen
+ throw new RuntimeException("Unexpected tablet, extent : " +
loc.getExtent()
+ + " range : " + scanState.range + " startRow : " +
scanState.startRow);
+ }
+ }
+ } catch (AccumuloServerException e) {
+ TraceUtil.setException(child1, e, true);
+ log.debug("Scan failed, server side exception : {}", e.getMessage());
+ throw e;
+ } catch (AccumuloException e) {
+ error = "exception from tablet loc " + e.getMessage();
+ if (!error.equals(lastError)) {
+ log.debug("{}", error);
+ } else if (log.isTraceEnabled()) {
+ log.trace("{}", error);
+ }
+
+ TraceUtil.setException(child1, e, false);
+
+ lastError = error;
+ sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
+ } finally {
+ child1.end();
+ }
+
+ if (loc != null) {
+ if (scanState.runOnScanServer) {
+ addr = getScanServerAddress(context, scanState, loc);
+ } else {
+ addr = new ScanAddress(loc.getTserverLocation(), ServerType.TSERVER,
loc);
+ }
+ }
+ }
+
+ return addr;
+ }
+
public static List<KeyValue> scan(ClientContext context, ScanState
scanState, long timeOut)
throws ScanTimedOutException, AccumuloException,
AccumuloSecurityException,
TableNotFoundException {
- TabletLocation loc = null;
+
long startTime = System.currentTimeMillis();
String lastError = null;
String error = null;
@@ -305,73 +500,12 @@ public class ThriftScanner {
throw new ScanTimedOutException();
}
- while (loc == null) {
- long currentTime = System.currentTimeMillis();
- if ((currentTime - startTime) / 1000.0 > timeOut) {
- throw new ScanTimedOutException();
- }
-
- Span child1 = TraceUtil.startSpan(ThriftScanner.class,
"scan::locateTablet");
- try (Scope locateSpan = child1.makeCurrent()) {
- loc = TabletLocator.getLocator(context,
scanState.tableId).locateTablet(context,
- scanState.startRow, scanState.skipStartRow, false);
-
- if (loc == null) {
- context.requireNotDeleted(scanState.tableId);
- context.requireNotOffline(scanState.tableId, null);
-
- error = "Failed to locate tablet for table : " +
scanState.tableId + " row : "
- + scanState.startRow;
- if (!error.equals(lastError)) {
- log.debug("{}", error);
- } else if (log.isTraceEnabled()) {
- log.trace("{}", error);
- }
- lastError = error;
- sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
- } else {
- // when a tablet splits we do want to continue scanning the low
child
- // of the split if we are already passed it
- Range dataRange = loc.getExtent().toDataRange();
-
- if (scanState.range.getStartKey() != null
- && dataRange.afterEndKey(scanState.range.getStartKey())) {
- // go to the next tablet
- scanState.startRow = loc.getExtent().endRow();
- scanState.skipStartRow = true;
- loc = null;
- } else if (scanState.range.getEndKey() != null
- && dataRange.beforeStartKey(scanState.range.getEndKey())) {
- // should not happen
- throw new RuntimeException("Unexpected tablet, extent : " +
loc.getExtent()
- + " range : " + scanState.range + " startRow : " +
scanState.startRow);
- }
- }
- } catch (AccumuloServerException e) {
- TraceUtil.setException(child1, e, true);
- log.debug("Scan failed, server side exception : {}",
e.getMessage());
- throw e;
- } catch (AccumuloException e) {
- error = "exception from tablet loc " + e.getMessage();
- if (!error.equals(lastError)) {
- log.debug("{}", error);
- } else if (log.isTraceEnabled()) {
- log.trace("{}", error);
- }
-
- TraceUtil.setException(child1, e, false);
-
- lastError = error;
- sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
- } finally {
- child1.end();
- }
- }
+ ScanAddress addr = getNextScanAddress(context, scanState, timeOut,
startTime, maxSleepTime);
Span child2 = TraceUtil.startSpan(ThriftScanner.class,
"scan::location",
- Map.of("tserver", loc.getTserverLocation()));
+ Map.of("tserver", addr.serverAddress));
try (Scope scanLocation = child2.makeCurrent()) {
- results = scan(loc, scanState, context);
+ results = scan(addr, scanState, context);
} catch (AccumuloSecurityException e) {
context.clearTableListCache();
context.requireNotDeleted(scanState.tableId);
@@ -380,14 +514,14 @@ public class ThriftScanner {
throw e;
} catch (TApplicationException tae) {
TraceUtil.setException(child2, tae, true);
- throw new
AccumuloServerException(scanState.getErrorLocation().getTserverLocation(), tae);
+ throw new AccumuloServerException(addr.serverAddress, tae);
} catch (TSampleNotPresentException tsnpe) {
String message = "Table " +
context.getPrintableTableInfoFromId(scanState.tableId)
+ " does not have sampling configured or built";
TraceUtil.setException(child2, tsnpe, true);
throw new SampleNotPresentException(message, tsnpe);
} catch (NotServingTabletException e) {
- error = "Scan failed, not serving tablet " +
scanState.getErrorLocation();
+ error = "Scan failed, not serving tablet " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
@@ -395,8 +529,7 @@ public class ThriftScanner {
}
lastError = error;
- TabletLocator.getLocator(context,
scanState.tableId).invalidateCache(loc.getExtent());
- loc = null;
+ TabletLocator.getLocator(context,
scanState.tableId).invalidateCache(addr.getExtent());
// no need to try the current scan id somewhere else
scanState.scanID = null;
@@ -409,7 +542,7 @@ public class ThriftScanner {
TraceUtil.setException(child2, e, false);
sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
} catch (ScanServerBusyException e) {
- error = "Scan failed, scan server was busy " +
scanState.getErrorLocation();
+ error = "Scan failed, scan server was busy " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
@@ -425,8 +558,7 @@ public class ThriftScanner {
TraceUtil.setException(child2, e, false);
scanState.scanID = null;
} catch (NoSuchScanIDException e) {
- error = "Scan failed, no such scan id " + scanState.scanID + " "
- + scanState.getErrorLocation();
+ error = "Scan failed, no such scan id " + scanState.scanID + " " +
addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
@@ -442,7 +574,7 @@ public class ThriftScanner {
TraceUtil.setException(child2, e, false);
scanState.scanID = null;
} catch (TooManyFilesException e) {
- error = "Tablet has too many files " + scanState.getErrorLocation()
+ " retrying...";
+ error = "Tablet has too many files " + addr.serverAddress + "
retrying...";
if (error.equals(lastError)) {
tooManyFilesCount++;
if (tooManyFilesCount == 300) {
@@ -469,17 +601,20 @@ public class ThriftScanner {
TraceUtil.setException(child2, e, false);
sleepMillis = pause(sleepMillis, maxSleepTime,
scanState.runOnScanServer);
} catch (TException e) {
- TabletLocator.getLocator(context,
scanState.tableId).invalidateCache(context,
- loc.getTserverLocation());
+ if (addr.serverType == ServerType.TSERVER) {
+ // only tsever locations are in cache, invalidating a scan server
would not find
+ // anything the cache
+ TabletLocator.getLocator(context,
scanState.tableId).invalidateCache(context,
+ addr.serverAddress);
+ }
error = "Scan failed, thrift error " + e.getClass().getName() + " "
+ e.getMessage()
- + " " + scanState.getErrorLocation();
+ + " " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
log.trace("{}", error);
}
lastError = error;
- loc = null;
// do not want to continue using the same scan id, if a timeout
occurred could cause a
// batch to be skipped
@@ -511,99 +646,33 @@ public class ThriftScanner {
}
}
- private static List<KeyValue> scan(TabletLocation loc, ScanState scanState,
ClientContext context)
+ private static List<KeyValue> scan(ScanAddress addr, ScanState scanState,
ClientContext context)
throws AccumuloSecurityException, NotServingTabletException, TException,
NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException
{
if (scanState.finished) {
return null;
}
- if (scanState.runOnScanServer) {
-
- TabletLocation newLoc;
-
- var tabletId = new TabletIdImpl(loc.getExtent());
-
- if (scanState.scanID != null && scanState.prevLoc != null
- && scanState.prevLoc.getTserverSession().equals("scan_server")
- && scanState.prevLoc.getExtent().equals(loc.getExtent())) {
- // this is the case of continuing a scan on a scan server for the same
tablet, so lets not
- // call the scan server selector and just go back to the previous scan
server
- newLoc = scanState.prevLoc;
- log.trace(
- "For tablet {} continuing scan on scan server {} without
consulting scan server selector, using busyTimeout {}",
- loc.getExtent(), newLoc.getTserverLocation(),
scanState.busyTimeout);
- } else {
- // obtain a snapshot once and only expose this snapshot to the plugin
for consistency
- var attempts = scanState.scanAttempts.snapshot();
-
- var params = new ScanServerSelector.SelectorParameters() {
-
- @Override
- public List<TabletId> getTablets() {
- return List.of(tabletId);
- }
-
- @Override
- public Collection<? extends ScanServerAttempt> getAttempts(TabletId
tabletId) {
- return attempts.getOrDefault(tabletId, Set.of());
- }
-
- @Override
- public Map<String,String> getHints() {
- if (scanState.executionHints == null) {
- return Map.of();
- }
- return scanState.executionHints;
- }
- };
-
- ScanServerSelections actions =
context.getScanServerSelector().selectServers(params);
-
- Duration delay = null;
-
- String scanServer = actions.getScanServer(tabletId);
- if (scanServer != null) {
- newLoc = new TabletLocation(loc.getExtent(), scanServer,
"scan_server");
- delay = actions.getDelay();
- scanState.busyTimeout = actions.getBusyTimeout();
- log.trace(
- "For tablet {} scan server selector chose scan_server:{}
delay:{} busyTimeout:{}",
- loc.getExtent(), scanServer, delay, scanState.busyTimeout);
- } else {
- newLoc = loc;
- delay = actions.getDelay();
- scanState.busyTimeout = Duration.ZERO;
- log.trace("For tablet {} scan server selector chose tablet_server",
loc.getExtent());
- }
-
- if (!delay.isZero()) {
- try {
- Thread.sleep(delay.toMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
- }
-
- var reporter =
scanState.scanAttempts.createReporter(newLoc.getTserverLocation(), tabletId);
-
+ if (addr.serverType == ServerType.SSERVER) {
try {
- return scanRpc(newLoc, scanState, context,
scanState.busyTimeout.toMillis());
+ return scanRpc(addr, scanState, context,
scanState.busyTimeout.toMillis());
} catch (ScanServerBusyException ssbe) {
+ var reporter =
scanState.scanAttempts.createReporter(addr.serverAddress,
+ new TabletIdImpl(addr.getExtent()));
reporter.report(ScanServerAttempt.Result.BUSY);
throw ssbe;
} catch (Exception e) {
+ var reporter =
scanState.scanAttempts.createReporter(addr.serverAddress,
+ new TabletIdImpl(addr.getExtent()));
reporter.report(ScanServerAttempt.Result.ERROR);
throw e;
}
} else {
- return scanRpc(loc, scanState, context, 0L);
+ return scanRpc(addr, scanState, context, 0L);
}
}
- private static List<KeyValue> scanRpc(TabletLocation loc, ScanState
scanState,
+ private static List<KeyValue> scanRpc(ScanAddress addr, ScanState scanState,
ClientContext context, long busyTimeout) throws
AccumuloSecurityException,
NotServingTabletException, TException, NoSuchScanIDException,
TooManyFilesException,
TSampleNotPresentException, ScanServerBusyException {
@@ -612,7 +681,7 @@ public class ThriftScanner {
final TInfo tinfo = TraceUtil.traceInfo();
- final HostAndPort parsedLocation =
HostAndPort.fromString(loc.getTserverLocation());
+ final HostAndPort parsedLocation =
HostAndPort.fromString(addr.serverAddress);
TabletScanClientService.Client client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedLocation,
context);
@@ -620,31 +689,29 @@ public class ThriftScanner {
try {
ScanResult sr;
- if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc)) {
+ if (scanState.prevLoc != null && !scanState.prevLoc.equals(addr)) {
scanState.scanID = null;
}
- scanState.prevLoc = loc;
+ scanState.prevLoc = addr;
if (scanState.scanID == null) {
- Thread.currentThread().setName("Starting scan tserver=" +
loc.getTserverLocation()
- + " tableId=" + loc.getExtent().tableId());
+ Thread.currentThread().setName("Starting scan tserver=" +
addr.serverAddress + " tableId="
+ + addr.getExtent().tableId());
if (log.isTraceEnabled()) {
- String msg = "Starting scan tserver=" + loc.getTserverLocation() + "
tablet="
- + loc.getExtent() + " range=" + scanState.range + " ssil="
- + scanState.serverSideIteratorList + " ssio=" +
scanState.serverSideIteratorOptions
- + " context=" + scanState.classLoaderContext;
+ String msg = "Starting scan server=" + addr.serverAddress + "
tablet=" + addr.getExtent()
+ + " range=" + scanState.range + " ssil=" +
scanState.serverSideIteratorList + " ssio="
+ + scanState.serverSideIteratorOptions + " context=" +
scanState.classLoaderContext;
log.trace("tid={} {}", Thread.currentThread().getId(), msg);
timer = new OpTimer().start();
}
- TabletType ttype = TabletType.type(loc.getExtent());
- boolean waitForWrites =
-
!serversWaitedForWrites.get(ttype).contains(loc.getTserverLocation());
+ TabletType ttype = TabletType.type(addr.getExtent());
+ boolean waitForWrites =
!serversWaitedForWrites.get(ttype).contains(addr.serverAddress);
InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(),
- loc.getExtent().toThrift(), scanState.range.toThrift(),
+ addr.getExtent().toThrift(), scanState.range.toThrift(),
scanState.columns.stream().map(Column::toThrift).collect(Collectors.toList()),
scanState.size, scanState.serverSideIteratorList,
scanState.serverSideIteratorOptions,
scanState.authorizations.getAuthorizationsBB(), waitForWrites,
scanState.isolated,
@@ -652,7 +719,7 @@ public class ThriftScanner {
SamplerConfigurationImpl.toThrift(scanState.samplerConfig),
scanState.batchTimeOut,
scanState.classLoaderContext, scanState.executionHints,
busyTimeout);
if (waitForWrites) {
- serversWaitedForWrites.get(ttype).add(loc.getTserverLocation());
+ serversWaitedForWrites.get(ttype).add(addr.serverAddress);
}
sr = is.result;
@@ -666,7 +733,7 @@ public class ThriftScanner {
} else {
// log.debug("Calling continue scan : "+scanState.range+" loc = "+loc);
String msg =
- "Continuing scan tserver=" + loc.getTserverLocation() + " scanid="
+ scanState.scanID;
+ "Continuing scan tserver=" + addr.serverAddress + " scanid=" +
scanState.scanID;
Thread.currentThread().setName(msg);
if (log.isTraceEnabled()) {
@@ -691,7 +758,7 @@ public class ThriftScanner {
} else {
// log.debug("No more : tab end row =
"+loc.tablet_extent.getEndRow()+" range =
// "+scanState.range);
- if (loc.getExtent().endRow() == null) {
+ if (addr.getExtent().endRow() == null) {
scanState.finished = true;
if (timer != null) {
@@ -702,8 +769,8 @@ public class ThriftScanner {
}
} else if (scanState.range.getEndKey() == null || !scanState.range
- .afterEndKey(new
Key(loc.getExtent().endRow()).followingKey(PartialKey.ROW))) {
- scanState.startRow = loc.getExtent().endRow();
+ .afterEndKey(new
Key(addr.getExtent().endRow()).followingKey(PartialKey.ROW))) {
+ scanState.startRow = addr.getExtent().endRow();
scanState.skipStartRow = true;
if (timer != null) {
@@ -750,7 +817,7 @@ public class ThriftScanner {
TInfo tinfo = TraceUtil.traceInfo();
log.debug("Closing active scan {} {}", scanState.prevLoc,
scanState.scanID);
- HostAndPort parsedLocation =
HostAndPort.fromString(scanState.prevLoc.getTserverLocation());
+ HostAndPort parsedLocation =
HostAndPort.fromString(scanState.prevLoc.serverAddress);
TabletScanClientService.Client client = null;
try {
client =