HBASE-17356 Add replica get support
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/db66e6cc Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/db66e6cc Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/db66e6cc Branch: refs/heads/HBASE-21512 Commit: db66e6cc9e1c6ea027631388aba688cb623b7d0a Parents: e4b6b4a Author: zhangduo <zhang...@apache.org> Authored: Tue Jan 1 21:59:37 2019 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Jan 3 08:38:20 2019 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/RegionLocations.java | 30 +- .../client/AsyncBatchRpcRetryingCaller.java | 114 +- .../client/AsyncConnectionConfiguration.java | 12 + .../hbase/client/AsyncConnectionImpl.java | 1 - .../hbase/client/AsyncMetaRegionLocator.java | 125 +- .../hbase/client/AsyncNonMetaRegionLocator.java | 291 +-- .../hadoop/hbase/client/AsyncRegionLocator.java | 129 +- .../hbase/client/AsyncRegionLocatorHelper.java | 147 ++ .../hbase/client/AsyncRpcRetryingCaller.java | 15 +- .../client/AsyncRpcRetryingCallerFactory.java | 55 +- .../AsyncSingleRequestRpcRetryingCaller.java | 71 +- .../hbase/client/AsyncTableRegionLocator.java | 28 +- .../client/AsyncTableRegionLocatorImpl.java | 6 +- .../hbase/client/ConnectionConfiguration.java | 5 +- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 2033 +++++++++--------- .../hadoop/hbase/client/RawAsyncTableImpl.java | 208 +- .../apache/hadoop/hbase/util/FutureUtils.java | 60 + .../hbase/client/RegionReplicaTestHelper.java | 161 ++ .../client/TestAsyncMetaRegionLocator.java | 55 +- .../client/TestAsyncNonMetaRegionLocator.java | 126 +- ...syncNonMetaRegionLocatorConcurrenyLimit.java | 20 +- ...TestAsyncSingleRequestRpcRetryingCaller.java | 56 +- .../client/TestAsyncTableLocatePrefetch.java | 4 +- .../client/TestAsyncTableRegionReplicasGet.java | 204 ++ .../hbase/client/TestZKAsyncRegistry.java | 44 +- 25 files changed, 2366 insertions(+), 1634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java index fd6f3c7..f98bf03 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java @@ -56,8 +56,8 @@ public class RegionLocations { int index = 0; for (HRegionLocation loc : locations) { if (loc != null) { - if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) { - maxReplicaId = loc.getRegionInfo().getReplicaId(); + if (loc.getRegion().getReplicaId() >= maxReplicaId) { + maxReplicaId = loc.getRegion().getReplicaId(); maxReplicaIdIndex = index; } } @@ -72,7 +72,7 @@ public class RegionLocations { this.locations = new HRegionLocation[maxReplicaId + 1]; for (HRegionLocation loc : locations) { if (loc != null) { - this.locations[loc.getRegionInfo().getReplicaId()] = loc; + this.locations[loc.getRegion().getReplicaId()] = loc; } } } @@ -146,7 +146,7 @@ public class RegionLocations { public RegionLocations remove(HRegionLocation location) { if (location == null) return this; if (location.getRegion() == null) return this; - int replicaId = location.getRegionInfo().getReplicaId(); + int replicaId = location.getRegion().getReplicaId(); if (replicaId >= locations.length) return this; // check whether something to remove. HRL.compareTo() compares ONLY the @@ -203,14 +203,14 @@ public class RegionLocations { // in case of region replication going down, we might have a leak here. int max = other.locations.length; - HRegionInfo regionInfo = null; + RegionInfo regionInfo = null; for (int i = 0; i < max; i++) { HRegionLocation thisLoc = this.getRegionLocation(i); HRegionLocation otherLoc = other.getRegionLocation(i); - if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) { + if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) { // regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that // all replica region infos belong to the same region with same region id. - regionInfo = otherLoc.getRegionInfo(); + regionInfo = otherLoc.getRegion(); } HRegionLocation selectedLoc = selectRegionLocation(thisLoc, @@ -232,7 +232,7 @@ public class RegionLocations { for (int i=0; i < newLocations.length; i++) { if (newLocations[i] != null) { if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo, - newLocations[i].getRegionInfo())) { + newLocations[i].getRegion())) { newLocations[i] = null; } } @@ -273,9 +273,9 @@ public class RegionLocations { boolean checkForEquals, boolean force) { assert location != null; - int replicaId = location.getRegionInfo().getReplicaId(); + int replicaId = location.getRegion().getReplicaId(); - HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId()); + HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId()); HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location, checkForEquals, force); @@ -288,8 +288,8 @@ public class RegionLocations { // ensure that all replicas share the same start code. Otherwise delete them for (int i=0; i < newLocations.length; i++) { if (newLocations[i] != null) { - if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(), - newLocations[i].getRegionInfo())) { + if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(), + newLocations[i].getRegion())) { newLocations[i] = null; } } @@ -317,8 +317,8 @@ public class RegionLocations { public HRegionLocation getRegionLocationByRegionName(byte[] regionName) { for (HRegionLocation loc : locations) { if (loc != null) { - if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName) - || Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) { + if (Bytes.equals(loc.getRegion().getRegionName(), regionName) + || Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) { return loc; } } @@ -331,7 +331,7 @@ public class RegionLocations { } public HRegionLocation getDefaultRegionLocation() { - return locations[HRegionInfo.DEFAULT_REPLICA_ID]; + return locations[RegionInfo.DEFAULT_REPLICA_ID]; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 51b89a9..e268b2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.ArrayList; @@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; - import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Retry caller for batch. @@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller<T> { private static final class ServerRequest { public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = - new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); public void addAction(HRegionLocation loc, Action action) { - computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(), + computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), () -> new RegionRequest(loc)).actions.add(action); } } @@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller<T> { Throwable error, ServerName serverName) { if (tries > startLogErrorsCnt) { String regions = - regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'") - .collect(Collectors.joining(",", "[", "]")); - LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName - + " failed, tries=" + tries, - error); + regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") + .collect(Collectors.joining(",", "[", "]")); + LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName + + " failed, tries=" + tries, error); } } @@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller<T> { errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); } errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), - getExtraContextForError(serverName))); + getExtraContextForError(serverName))); } private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { @@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller<T> { return; } ThrowableWithExtraContext errorWithCtx = - new ThrowableWithExtraContext(error, currentTime, extras); + new ThrowableWithExtraContext(error, currentTime, extras); List<ThrowableWithExtraContext> errors = removeErrors(action); if (errors == null) { errors = Collections.singletonList(errorWithCtx); @@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller<T> { return; } future.completeExceptionally(new RetriesExhaustedException(tries, - Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); + Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); }); } @@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller<T> { // multiRequestBuilder will be populated with region actions. // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the // action list. - RequestConverter.buildNoDataRegionActions(entry.getKey(), - entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder, - mutationBuilder, nonceGroup, rowMutationsIndexMap); + RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells, + multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, + rowMutationsIndexMap); } return multiRequestBuilder.build(); } @@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller<T> { RegionResult regionResult, List<Action> failedActions, Throwable regionException) { Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); if (result == null) { - LOG.error("Server " + serverName + " sent us neither result nor exception for row '" - + Bytes.toStringBinary(action.getAction().getRow()) + "' of " - + regionReq.loc.getRegionInfo().getRegionNameAsString()); + LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + + Bytes.toStringBinary(action.getAction().getRow()) + "' of " + + regionReq.loc.getRegion().getRegionNameAsString()); addError(action, new RuntimeException("Invalid response"), serverName); failedActions.add(action); } else if (result instanceof Throwable) { Throwable error = translateException((Throwable) result); logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); + conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), getExtraContextForError(serverName)); @@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller<T> { RegionResult regionResult = resp.getResults().get(rn); Throwable regionException = resp.getException(rn); if (regionResult != null) { - regionReq.actions.forEach( - action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions, - regionException)); + regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, + regionResult, failedActions, regionException)); } else { Throwable error; if (regionException == null) { - LOG.error( - "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); + LOG + .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); error = new RuntimeException("Invalid response"); } else { error = translateException(regionException); } logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); + conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failAll(regionReq.actions.stream(), tries, error, serverName); return; @@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller<T> { remainingNs = remainingTimeNs(); if (remainingNs <= 0) { failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) - .flatMap(r -> r.actions.stream()), - tries); + .flatMap(r -> r.actions.stream()), tries); return; } } else { @@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller<T> { ServerName serverName) { Throwable error = translateException(t); logException(tries, () -> actionsByRegion.values().stream(), error, serverName); - actionsByRegion - .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error)); + actionsByRegion.forEach( + (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, serverName); return; } List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); addError(copiedActions, error, serverName); tryResubmit(copiedActions.stream(), tries); } @@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller<T> { } ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); - CompletableFuture.allOf(actions - .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), - RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { - if (error != null) { - error = translateException(error); - if (error instanceof DoNotRetryIOException) { - failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); - return; - } - addError(action, error, null); - locateFailed.add(action); - } else { - computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new) - .addAction(loc, action); + addListener(CompletableFuture.allOf(actions + .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), + RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { + if (error != null) { + error = translateException(error); + if (error instanceof DoNotRetryIOException) { + failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); + return; } - })) - .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { - if (!actionsByServer.isEmpty()) { - send(actionsByServer, tries); - } - if (!locateFailed.isEmpty()) { - tryResubmit(locateFailed.stream(), tries); + addError(action, error, null); + locateFailed.add(action); + } else { + computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, + action); } - }); + })) + .toArray(CompletableFuture[]::new)), (v, r) -> { + if (!actionsByServer.isEmpty()) { + send(actionsByServer, tries); + } + if (!locateFailed.isEmpty()) { + tryResubmit(locateFailed.stream(), tries); + } + }); } public List<CompletableFuture<T>> call() { http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 915e9dd..fa051a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; @@ -94,6 +96,10 @@ class AsyncConnectionConfiguration { private final long writeBufferPeriodicFlushTimeoutNs; + // this is for supporting region replica get, if the primary does not finished within this + // timeout, we will send request to secondaries. + private final long primaryCallTimeoutNs; + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( @@ -124,6 +130,8 @@ class AsyncConnectionConfiguration { this.writeBufferPeriodicFlushTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); + this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( + conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); } long getMetaOperationTimeoutNs() { @@ -181,4 +189,8 @@ class AsyncConnectionConfiguration { long getWriteBufferPeriodicFlushTimeoutNs() { return writeBufferPeriodicFlushTimeoutNs; } + + long getPrimaryCallTimeoutNs() { + return primaryCallTimeoutNs; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 078395b..361d5b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -152,7 +152,6 @@ class AsyncConnectionImpl implements AsyncConnection { } // we will override this method for testing retry caller, so do not remove this method. - @VisibleForTesting AsyncRegionLocator getLocator() { return locator; } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 06b5b57..9fef15d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,43 +41,43 @@ class AsyncMetaRegionLocator { private final AsyncRegistry registry; - private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>(); + private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>(); - private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture = - new AtomicReference<>(); + private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture = + new AtomicReference<>(); AsyncMetaRegionLocator(AsyncRegistry registry) { this.registry = registry; } - CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) { + /** + * Get the region locations for meta region. If the location for the given replica is not + * available in the cached locations, then fetch from the HBase cluster. + * <p/> + * The <code>replicaId</code> parameter is important. If the region replication config for meta + * region is changed, then the cached region locations may not have the locations for new + * replicas. If we do not check the location for the given replica, we will always return the + * cached region locations and cause an infinite loop. + */ + CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) { for (;;) { if (!reload) { - HRegionLocation metaRegionLocation = this.metaRegionLocation.get(); - if (metaRegionLocation != null) { - return CompletableFuture.completedFuture(metaRegionLocation); + RegionLocations locs = this.metaRegionLocations.get(); + if (isGood(locs, replicaId)) { + return CompletableFuture.completedFuture(locs); } } - if (LOG.isTraceEnabled()) { - LOG.trace("Meta region location cache is null, try fetching from registry."); - } + LOG.trace("Meta region location cache is null, try fetching from registry."); if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Start fetching meta region location from registry."); - } - CompletableFuture<HRegionLocation> future = metaRelocateFuture.get(); + LOG.debug("Start fetching meta region location from registry."); + CompletableFuture<RegionLocations> future = metaRelocateFuture.get(); registry.getMetaRegionLocation().whenComplete((locs, error) -> { if (error != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to fetch meta region location from registry", error); - } + LOG.debug("Failed to fetch meta region location from registry", error); metaRelocateFuture.getAndSet(null).completeExceptionally(error); return; } - HRegionLocation loc = locs.getDefaultRegionLocation(); - if (LOG.isDebugEnabled()) { - LOG.debug("The fetched meta region location is " + loc); - } + LOG.debug("The fetched meta region location is {}" + locs); // Here we update cache before reset future, so it is possible that someone can get a // stale value. Consider this: // 1. update cache @@ -82,12 +87,12 @@ class AsyncMetaRegionLocator { // cleared in step 2. // But we do not think it is a big deal as it rarely happens, and even if it happens, the // caller will retry again later, no correctness problems. - this.metaRegionLocation.set(loc); + this.metaRegionLocations.set(locs); metaRelocateFuture.set(null); - future.complete(loc); + future.complete(locs); }); } else { - CompletableFuture<HRegionLocation> future = metaRelocateFuture.get(); + CompletableFuture<RegionLocations> future = metaRelocateFuture.get(); if (future != null) { return future; } @@ -95,30 +100,56 @@ class AsyncMetaRegionLocator { } } - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(), - newLoc -> { - for (;;) { - HRegionLocation oldLoc = metaRegionLocation.get(); - if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() || - oldLoc.getServerName().equals(newLoc.getServerName()))) { - return; - } - if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) { - return; - } - } - }, l -> { - for (;;) { - HRegionLocation oldLoc = metaRegionLocation.get(); - if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) { - return; - } + private HRegionLocation getCacheLocation(HRegionLocation loc) { + RegionLocations locs = metaRegionLocations.get(); + return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; + } + + private void addLocationToCache(HRegionLocation loc) { + for (;;) { + int replicaId = loc.getRegion().getReplicaId(); + RegionLocations oldLocs = metaRegionLocations.get(); + if (oldLocs == null) { + RegionLocations newLocs = createRegionLocations(loc); + if (metaRegionLocations.compareAndSet(null, newLocs)) { + return; } - }); + } + HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId); + if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() || + oldLoc.getServerName().equals(loc.getServerName()))) { + return; + } + RegionLocations newLocs = replaceRegionLocation(oldLocs, loc); + if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) { + return; + } + } + } + + private void removeLocationFromCache(HRegionLocation loc) { + for (;;) { + RegionLocations oldLocs = metaRegionLocations.get(); + if (oldLocs == null) { + return; + } + HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); + if (!canUpdateOnError(loc, oldLoc)) { + return; + } + RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); + if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) { + return; + } + } + } + + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { + AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation, + this::addLocationToCache, this::removeLocationFromCache); } void clearCache() { - metaRegionLocation.set(null); + metaRegionLocations.set(null); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 7e3d56c..1fcfbb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.NINES; import static org.apache.hadoop.hbase.HConstants.ZEROES; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; @@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.commons.lang3.ObjectUtils; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -53,6 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Objects; /** * The asynchronous locator for regions other than meta. @@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator { private static final class LocateRequest { - public final byte[] row; + private final byte[] row; - public final RegionLocateType locateType; + private final RegionLocateType locateType; public LocateRequest(byte[] row, RegionLocateType locateType) { this.row = row; @@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator { private static final class TableCache { - public final ConcurrentNavigableMap<byte[], HRegionLocation> cache = + private final ConcurrentNavigableMap<byte[], RegionLocations> cache = new ConcurrentSkipListMap<>(BYTES_COMPARATOR); - public final Set<LocateRequest> pendingRequests = new HashSet<>(); + private final Set<LocateRequest> pendingRequests = new HashSet<>(); - public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests = + private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests = new LinkedHashMap<>(); public boolean hasQuota(int max) { @@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator { return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); } - public void clearCompletedRequests(Optional<HRegionLocation> location) { - for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = + public void clearCompletedRequests(Optional<RegionLocations> locations) { + for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter = allRequests.entrySet().iterator(); iter.hasNext();) { - Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next(); - if (tryComplete(entry.getKey(), entry.getValue(), location)) { + Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next(); + if (tryComplete(entry.getKey(), entry.getValue(), locations)) { iter.remove(); } } } - private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future, - Optional<HRegionLocation> location) { + private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future, + Optional<RegionLocations> locations) { if (future.isDone()) { return true; } - if (!location.isPresent()) { + if (!locations.isPresent()) { return false; } - HRegionLocation loc = location.get(); + RegionLocations locs = locations.get(); + HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations()); + // we should at least have one location available, otherwise the request should fail and + // should not arrive here + assert loc != null; boolean completed; if (req.locateType.equals(RegionLocateType.BEFORE)) { // for locating the row before current row, the common case is to find the previous region @@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator { completed = loc.getRegion().containsRow(req.row); } if (completed) { - future.complete(loc); + future.complete(locs); return true; } else { return false; @@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator { return computeIfAbsent(cache, tableName, TableCache::new); } - private void removeFromCache(HRegionLocation loc) { - TableCache tableCache = cache.get(loc.getRegion().getTable()); - if (tableCache == null) { - return; + private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { + HRegionLocation[] locArr1 = locs1.getRegionLocations(); + HRegionLocation[] locArr2 = locs2.getRegionLocations(); + if (locArr1.length != locArr2.length) { + return false; } - tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> { - if (oldLoc.getSeqNum() > loc.getSeqNum() || - !oldLoc.getServerName().equals(loc.getServerName())) { - return oldLoc; + for (int i = 0; i < locArr1.length; i++) { + // do not need to compare region info + HRegionLocation loc1 = locArr1[i]; + HRegionLocation loc2 = locArr2[i]; + if (loc1 == null) { + if (loc2 != null) { + return false; + } + } else { + if (loc2 == null) { + return false; + } + if (loc1.getSeqNum() != loc2.getSeqNum()) { + return false; + } + if (Objects.equal(loc1.getServerName(), loc2.getServerName())) { + return false; + } } - return null; - }); + } + return true; } // return whether we add this loc to cache - private boolean addToCache(TableCache tableCache, HRegionLocation loc) { - if (LOG.isTraceEnabled()) { - LOG.trace("Try adding " + loc + " to cache"); - } - byte[] startKey = loc.getRegion().getStartKey(); - HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc); - if (oldLoc == null) { - return true; - } - if (oldLoc.getSeqNum() > loc.getSeqNum() || - oldLoc.getServerName().equals(loc.getServerName())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + - " is newer than us or has the same server name"); - } - return false; - } - return loc == tableCache.cache.compute(startKey, (k, oldValue) -> { - if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { - return loc; + private boolean addToCache(TableCache tableCache, RegionLocations locs) { + LOG.trace("Try adding {} to cache", locs); + byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey(); + for (;;) { + RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); + if (oldLocs == null) { + return true; } - if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + + RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs); + if (isEqual(mergedLocs, oldLocs)) { + // the merged one is the same with the old one, give up + LOG.trace("Will not add {} to cache because the old value {} " + " is newer than us or has the same server name." + - " Maybe it is updated before we replace it"); + " Maybe it is updated before we replace it", locs, oldLocs); + return false; } - return oldValue; - }); - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "Called by lambda expression") - private void addToCache(HRegionLocation loc) { - addToCache(getTableCache(loc.getRegion().getTable()), loc); - LOG.trace("Try adding {} to cache", loc); + if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { + return true; + } + } } - private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, + private void complete(TableName tableName, LocateRequest req, RegionLocations locs, Throwable error) { if (error != null) { LOG.warn("Failed to locate region in '" + tableName + "', row='" + @@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator { } Optional<LocateRequest> toSend = Optional.empty(); TableCache tableCache = getTableCache(tableName); - if (loc != null) { - if (!addToCache(tableCache, loc)) { + if (locs != null) { + if (!addToCache(tableCache, locs)) { // someone is ahead of us. synchronized (tableCache) { tableCache.pendingRequests.remove(req); @@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator { future.completeExceptionally(error); } } - tableCache.clearCompletedRequests(Optional.ofNullable(loc)); + tableCache.clearCompletedRequests(Optional.ofNullable(locs)); // Remove a complete locate request in a synchronized block, so the table cache must have // quota to send a candidate request. toSend = tableCache.getCandidate(); @@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator { Bytes.toStringBinary(req.row), req.locateType, locs); } + // the default region location should always be presented when fetching from meta, otherwise + // let's fail the request. if (locs == null || locs.getDefaultRegionLocation() == null) { complete(tableName, req, null, - new IOException(String.format("No location found for '%s', row='%s', locateType=%s", + new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", tableName, Bytes.toStringBinary(req.row), req.locateType))); return true; } @@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator { RegionInfo info = loc.getRegion(); if (info == null) { complete(tableName, req, null, - new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", + new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", tableName, Bytes.toStringBinary(req.row), req.locateType))); return true; } if (info.isSplitParent()) { return false; } - if (loc.getServerName() == null) { - complete(tableName, req, null, - new IOException( - String.format("No server address listed for region '%s', row='%s', locateType=%s", - info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType))); - return true; - } - complete(tableName, req, loc, null); + complete(tableName, req, locs, null); return true; } - private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) { - Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row); + private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, + int replicaId) { + Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row); if (entry == null) { return null; } - HRegionLocation loc = entry.getValue(); + RegionLocations locs = entry.getValue(); + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + return null; + } byte[] endKey = loc.getRegion().getEndKey(); if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + - Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); + LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, + Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); } - return loc; + return locs; } else { return null; } } - private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName, - byte[] row) { + private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName, + byte[] row, int replicaId) { boolean isEmptyStopRow = isEmptyStopRow(row); - Map.Entry<byte[], HRegionLocation> entry = - isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); + Map.Entry<byte[], RegionLocations> entry = + isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); if (entry == null) { return null; } - HRegionLocation loc = entry.getValue(); + RegionLocations locs = entry.getValue(); + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + return null; + } if (isEmptyStopRow(loc.getRegion().getEndKey()) || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) { if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + - Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); + LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, + Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); } - return loc; + return locs; } else { return null; } @@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator { if (tableNotFound) { complete(tableName, req, null, new TableNotFoundException(tableName)); } else if (!completeNormally) { - complete(tableName, req, null, new IOException( - "Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName)); + complete(tableName, req, null, new IOException("Unable to find region for '" + + Bytes.toStringBinary(req.row) + "' in " + tableName)); } } @@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator { continue; } RegionInfo info = loc.getRegion(); - if (info == null || info.isOffline() || info.isSplitParent() || - loc.getServerName() == null) { + if (info == null || info.isOffline() || info.isSplitParent()) { continue; } - if (addToCache(tableCache, loc)) { + if (addToCache(tableCache, locs)) { synchronized (tableCache) { - tableCache.clearCompletedRequests(Optional.of(loc)); + tableCache.clearCompletedRequests(Optional.of(locs)); } } } @@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator { }); } - private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, - RegionLocateType locateType) { + private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row, + int replicaId, RegionLocateType locateType) { return locateType.equals(RegionLocateType.BEFORE) - ? locateRowBeforeInCache(tableCache, tableName, row) - : locateRowInCache(tableCache, tableName, row); + ? locateRowBeforeInCache(tableCache, tableName, row, replicaId) + : locateRowInCache(tableCache, tableName, row, replicaId); } // locateToPrevious is true means we will use the start key of a region to locate the region // placed before it. Used for reverse scan. See the comment of // AsyncRegionLocator.getPreviousRegionLocation. - private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName, - byte[] row, RegionLocateType locateType, boolean reload) { + private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, + byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { // AFTER should be convert to CURRENT before calling this method assert !locateType.equals(RegionLocateType.AFTER); TableCache tableCache = getTableCache(tableName); if (!reload) { - HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); - if (loc != null) { - return CompletableFuture.completedFuture(loc); + RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); + if (isGood(locs, replicaId)) { + return CompletableFuture.completedFuture(locs); } } - CompletableFuture<HRegionLocation> future; + CompletableFuture<RegionLocations> future; LocateRequest req; boolean sendRequest = false; synchronized (tableCache) { // check again if (!reload) { - HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); - if (loc != null) { - return CompletableFuture.completedFuture(loc); + RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); + if (isGood(locs, replicaId)) { + return CompletableFuture.completedFuture(locs); } } req = new LocateRequest(row, locateType); @@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator { return future; } - CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - RegionLocateType locateType, boolean reload) { - if (locateType.equals(RegionLocateType.BEFORE)) { - return getRegionLocationInternal(tableName, row, locateType, reload); - } else { - // as we know the exact row after us, so we can just create the new row, and use the same - // algorithm to locate it. - if (locateType.equals(RegionLocateType.AFTER)) { - row = createClosestRowAfter(row); - } - return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload); + CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, + int replicaId, RegionLocateType locateType, boolean reload) { + // as we know the exact row after us, so we can just create the new row, and use the same + // algorithm to locate it. + if (locateType.equals(RegionLocateType.AFTER)) { + row = createClosestRowAfter(row); + locateType = RegionLocateType.CURRENT; } + return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); } - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - AsyncRegionLocator.updateCachedLocation(loc, exception, l -> { - TableCache tableCache = cache.get(l.getRegion().getTable()); - if (tableCache == null) { - return null; + private void removeLocationFromCache(HRegionLocation loc) { + TableCache tableCache = cache.get(loc.getRegion().getTable()); + if (tableCache == null) { + return; + } + byte[] startKey = loc.getRegion().getStartKey(); + for (;;) { + RegionLocations oldLocs = tableCache.cache.get(startKey); + HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); + if (!canUpdateOnError(loc, oldLoc)) { + return; } - return tableCache.cache.get(l.getRegion().getStartKey()); - }, this::addToCache, this::removeFromCache); + RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); + if (newLocs == null) { + if (tableCache.cache.remove(startKey, oldLocs)) { + return; + } + } else { + if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { + return; + } + } + } + } + + private void addLocationToCache(HRegionLocation loc) { + addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc)); + } + + private HRegionLocation getCachedLocation(HRegionLocation loc) { + TableCache tableCache = cache.get(loc.getRegion().getTable()); + if (tableCache == null) { + return null; + } + RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey()); + return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; + } + + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { + AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, + this::addLocationToCache, this::removeLocationFromCache); } void clearCache(TableName tableName) { @@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator { // only used for testing whether we have cached the location for a region. @VisibleForTesting - HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) { + RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { TableCache tableCache = cache.get(tableName); if (tableCache == null) { return null; } - return locateRowInCache(tableCache, tableName, row); + return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 56228ab..d624974 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -18,26 +18,24 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; -import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; -import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; -import org.apache.hbase.thirdparty.io.netty.util.Timeout; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; - +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.util.Bytes; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timeout; /** * The asynchronous region locator. @@ -59,8 +57,8 @@ class AsyncRegionLocator { this.retryTimer = retryTimer; } - private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future, - long timeoutNs, Supplier<String> timeoutMsg) { + private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs, + Supplier<String> timeoutMsg) { if (future.isDone() || timeoutNs <= 0) { return future; } @@ -78,74 +76,75 @@ class AsyncRegionLocator { }); } - CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + private boolean isMeta(TableName tableName) { + return TableName.isMetaTableName(tableName); + } + + CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, RegionLocateType type, boolean reload, long timeoutNs) { + CompletableFuture<RegionLocations> future = isMeta(tableName) + ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, + RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region locations for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "'"); + } + + CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { // meta region can not be split right now so we always call the same method. // Change it later if the meta table can have more than one regions. - CompletableFuture<HRegionLocation> future = - tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload) - : nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload); + CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); + CompletableFuture<RegionLocations> locsFuture = + isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); + addListener(locsFuture, (locs, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + future + .completeExceptionally(new RegionException("No location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); + } else if (loc.getServerName() == null) { + future.completeExceptionally(new HBaseIOException("No server address listed for region '" + + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + + "', locateType=" + type + ", replicaId=" + replicaId)); + } else { + future.complete(loc); + } + }); return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + - "ms) waiting for region location for " + tableName + ", row='" + - Bytes.toStringBinary(row) + "'"); + "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + + "', replicaId=" + replicaId); } CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - RegionLocateType type, long timeoutNs) { - return getRegionLocation(tableName, row, type, false, timeoutNs); + int replicaId, RegionLocateType type, long timeoutNs) { + return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); } - static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { - // Do not need to update if no such location, or the location is newer, or the location is not - // same with us - return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() && - oldLoc.getServerName().equals(loc.getServerName()); + CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + RegionLocateType type, boolean reload, long timeoutNs) { + return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, + timeoutNs); } - static void updateCachedLocation(HRegionLocation loc, Throwable exception, - Function<HRegionLocation, HRegionLocation> cachedLocationSupplier, - Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) { - HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); - if (LOG.isDebugEnabled()) { - LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception); - } - if (!canUpdate(loc, oldLoc)) { - return; - } - Throwable cause = findException(exception); - if (LOG.isDebugEnabled()) { - LOG.debug("The actual exception when updating " + loc, cause); - } - if (cause == null || !isMetaClearingException(cause)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Will not update " + loc + " because the exception is null or not the one we care about"); - } - return; - } - if (cause instanceof RegionMovedException) { - RegionMovedException rme = (RegionMovedException) cause; - HRegionLocation newLoc = - new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme); - } - addToCache.accept(newLoc); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Try removing " + loc + " from cache"); - } - removeFromCache.accept(loc); - } + CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + RegionLocateType type, long timeoutNs) { + return getRegionLocation(tableName, row, type, false, timeoutNs); } - void updateCachedLocation(HRegionLocation loc, Throwable exception) { + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { if (loc.getRegion().isMetaRegion()) { - metaRegionLocator.updateCachedLocation(loc, exception); + metaRegionLocator.updateCachedLocationOnError(loc, exception); } else { - nonMetaRegionLocator.updateCachedLocation(loc, exception); + nonMetaRegionLocator.updateCachedLocationOnError(loc, exception); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java new file mode 100644 index 0000000..5c9c091 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; +import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; + +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for asynchronous region locator. + */ +@InterfaceAudience.Private +final class AsyncRegionLocatorHelper { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class); + + private AsyncRegionLocatorHelper() { + } + + static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) { + // Do not need to update if no such location, or the location is newer, or the location is not + // the same with us + return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() && + oldLoc.getServerName().equals(loc.getServerName()); + } + + static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception, + Function<HRegionLocation, HRegionLocation> cachedLocationSupplier, + Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) { + HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); + LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception); + if (!canUpdateOnError(loc, oldLoc)) { + return; + } + Throwable cause = findException(exception); + LOG.debug("The actual exception when updating {}", loc, cause); + if (cause == null || !isMetaClearingException(cause)) { + LOG.debug("Will not update {} because the exception is null or not the one we care about", + loc); + return; + } + if (cause instanceof RegionMovedException) { + RegionMovedException rme = (RegionMovedException) cause; + HRegionLocation newLoc = + new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum()); + LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme); + addToCache.accept(newLoc); + } else { + LOG.debug("Try removing {} from cache", loc); + removeFromCache.accept(loc); + } + } + + static RegionLocations createRegionLocations(HRegionLocation loc) { + int replicaId = loc.getRegion().getReplicaId(); + HRegionLocation[] locs = new HRegionLocation[replicaId + 1]; + locs[replicaId] = loc; + return new RegionLocations(locs); + } + + /** + * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the + * location for the given {@code replicaId} with the given {@code loc}. + * <p/> + * All the {@link RegionLocations} in async locator related class are immutable because we want to + * access them concurrently, so here we need to create a new one, instead of calling + * {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}. + */ + static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) { + int replicaId = loc.getRegion().getReplicaId(); + HRegionLocation[] locs = oldLocs.getRegionLocations(); + locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length)); + locs[replicaId] = loc; + return new RegionLocations(locs); + } + + /** + * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the + * location for the given {@code replicaId}. + * <p/> + * All the {@link RegionLocations} in async locator related class are immutable because we want to + * access them concurrently, so here we need to create a new one, instead of calling + * {@link RegionLocations#remove(int)}. + */ + static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) { + HRegionLocation[] locs = oldLocs.getRegionLocations(); + if (locs.length < replicaId + 1) { + // Here we do not modify the oldLocs so it is safe to return it. + return oldLocs; + } + locs = Arrays.copyOf(locs, locs.length); + locs[replicaId] = null; + if (ObjectUtils.firstNonNull(locs) != null) { + return new RegionLocations(locs); + } else { + // if all the locations are null, just return null + return null; + } + } + + /** + * Create a new {@link RegionLocations} which is the merging result for the given two + * {@link RegionLocations}. + * <p/> + * All the {@link RegionLocations} in async locator related class are immutable because we want to + * access them concurrently, so here we need to create a new one, instead of calling + * {@link RegionLocations#mergeLocations(RegionLocations)} directly. + */ + static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) { + RegionLocations locs = new RegionLocations(newLocs.getRegionLocations()); + locs.mergeLocations(oldLocs); + return locs; + } + + static boolean isGood(RegionLocations locs, int replicaId) { + if (locs == null) { + return false; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + return loc != null && loc.getServerName() != null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index d30012f..e03049a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller<T> { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); } - protected long remainingTimeNs() { + protected final long remainingTimeNs() { return operationTimeoutNs - (System.nanoTime() - startNs); } - protected void completeExceptionally() { + protected final void completeExceptionally() { future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); } - protected void resetCallTimeout() { + protected final void resetCallTimeout() { long callTimeoutNs; if (operationTimeoutNs > 0) { callTimeoutNs = remainingTimeNs(); @@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller<T> { resetController(controller, callTimeoutNs); } - protected void onError(Throwable error, Supplier<String> errMsg, + protected final void onError(Throwable error, Supplier<String> errMsg, Consumer<Throwable> updateCachedLocation) { + if (future.isDone()) { + // Give up if the future is already done, this is possible if user has already canceled the + // future. And for timeline consistent read, we will also cancel some requests if we have + // already get one of the responses. + LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); + return; + } error = translateException(error); if (error instanceof DoNotRetryIOException) { future.completeExceptionally(error); http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index f80b4e5..a660e74 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -17,22 +17,22 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; @@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory { private RegionLocateType locateType = RegionLocateType.CURRENT; + private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + public SingleRequestCallerBuilder<T> table(TableName tableName) { this.tableName = tableName; return this; @@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SingleRequestCallerBuilder<T> replicaId(int replicaId) { + this.replicaId = replicaId; + return this; + } + public AsyncSingleRequestRpcRetryingCaller<T> build() { + checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, - checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), - checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), - pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId, + checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), + pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory { public AsyncScanSingleRegionRpcRetryingCaller build() { checkArgument(scannerId != null, "invalid scannerId %d", scannerId); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, - checkNotNull(scan, "scan is null"), scanMetrics, scannerId, - checkNotNull(resultCache, "resultCache is null"), - checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, - pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(scan, "scan is null"), scanMetrics, scannerId, + checkNotNull(resultCache, "resultCache is null"), + checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), + checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, + pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory { public <T> AsyncBatchRpcRetryingCaller<T> build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, - maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } public <T> List<CompletableFuture<T>> call() { @@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory { public AsyncMasterRequestRpcRetryingCaller<T> build() { return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, - checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** @@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory { private ServerName serverName; - public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) { + public AdminRequestCallerBuilder<T> action( + AsyncAdminRequestRetryingCaller.Callable<T> callable) { this.callable = callable; return this; } @@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory { return this; } - public AdminRequestCallerBuilder<T> serverName(ServerName serverName){ + public AdminRequestCallerBuilder<T> serverName(ServerName serverName) { this.serverName = serverName; return this; } public AsyncAdminRequestRetryingCaller<T> build() { return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, - "serverName is null"), checkNotNull(callable, "action is null")); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } public CompletableFuture<T> call() { @@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory { } } - public <T> AdminRequestCallerBuilder<T> adminRequest(){ + public <T> AdminRequestCallerBuilder<T> adminRequest() { return new AdminRequestCallerBuilder<>(); } @@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory { public AsyncServerRequestRpcRetryingCaller<T> build() { return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, - "serverName is null"), checkNotNull(callable, "action is null")); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } public CompletableFuture<T> call() { http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 56c82fb..1a52e5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -17,17 +17,19 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; - import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; /** * Retry caller for a single request, such as get, put, delete, etc. @@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> { private final byte[] row; + private final int replicaId; + private final RegionLocateType locateType; private final Callable<T> callable; public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable, - long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt) { + TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, + Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + startLogErrorsCnt); this.tableName = tableName; this.row = row; + this.replicaId = replicaId; this.locateType = locateType; this.callable = callable; } @@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> { stub = conn.getRegionServerStub(loc.getServerName()); } catch (IOException e) { onError(e, - () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) - + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed", - err -> conn.getLocator().updateCachedLocation(loc, err)); + () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); return; } resetCallTimeout(); - callable.call(controller, loc, stub).whenComplete( - (result, error) -> { - if (error != null) { - onError(error, - () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " - + loc.getRegion().getEncodedName() + " of " + tableName + " failed", - err -> conn.getLocator().updateCachedLocation(loc, err)); - return; - } - future.complete(result); - }); + callable.call(controller, loc, stub).whenComplete((result, error) -> { + if (error != null) { + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); + return; + } + future.complete(result); + }); } @Override @@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> { } else { locateTimeoutNs = -1L; } - conn.getLocator() - .getRegionLocation(tableName, row, locateType, locateTimeoutNs) - .whenComplete( - (loc, error) -> { - if (error != null) { - onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName - + " failed", err -> { - }); - return; - } - call(loc); - }); + addListener( + conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs), + (loc, error) -> { + if (error != null) { + onError(error, + () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> { + }); + return; + } + call(loc); + }); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java index dbfcef5..3bda38e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import java.util.concurrent.CompletableFuture; - import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator { * @param row Row to find. * @param reload true to reload information or false to use cached information */ - CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload); + default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) { + return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload); + } + + /** + * Finds the region with the given <code>replicaId</code> on which the given row is being served. + * <p> + * Returns the location of the region with the given <code>replicaId</code> to which the row + * belongs. + * @param row Row to find. + * @param replicaId the replica id of the region + */ + default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId) { + return getRegionLocation(row, replicaId, false); + } + + /** + * Finds the region with the given <code>replicaId</code> on which the given row is being served. + * <p> + * Returns the location of the region with the given <code>replicaId</code> to which the row + * belongs. + * @param row Row to find. + * @param replicaId the replica id of the region + * @param reload true to reload information or false to use cached information + */ + CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload); } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index 7d199df..465a411 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { } @Override - public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) { - return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L); + public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, + boolean reload) { + return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, + -1L); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index d996004..55c62e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -38,6 +38,9 @@ public class ConnectionConfiguration { public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760; + public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND = + "hbase.client.primaryCallTimeout.get"; + public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; @@ -86,7 +89,7 @@ public class ConnectionConfiguration { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.primaryCallTimeoutMicroSecond = - conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms + conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT); this.replicaCallTimeoutMicroSecondScan = conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms