This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 5cc614f23f9 HBASE-27093 AsyncNonMetaRegionLocator:put Complete
CompletableFuture outside lock block (#4496)
5cc614f23f9 is described below
commit 5cc614f23f916fb713319099cd4dcbe341d23c71
Author: wangzhi <[email protected]>
AuthorDate: Tue Jun 7 12:17:52 2022 +0800
HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture
outside lock block (#4496)
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit 176c43c5ad1aab01eb2d2b05c0cb90132e8d19b1)
---
.../hbase/client/AsyncNonMetaRegionLocator.java | 54 ++++++++++++++++++----
1 file changed, 44 insertions(+), 10 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index df6c6b753ed..0009415142c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -35,10 +35,12 @@ import static
org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -122,6 +124,26 @@ class AsyncNonMetaRegionLocator {
}
}
+ private static final class RegionLocationsFutureResult {
+ private final CompletableFuture<RegionLocations> future;
+ private final RegionLocations result;
+ private final Throwable e;
+
+ public RegionLocationsFutureResult(CompletableFuture<RegionLocations>
future,
+ RegionLocations result, Throwable e) {
+ this.future = future;
+ this.result = result;
+ this.e = e;
+ }
+
+ public void complete() {
+ if (e != null) {
+ future.completeExceptionally(e);
+ }
+ future.complete(result);
+ }
+ }
+
private static final class TableCache {
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
@@ -148,18 +170,20 @@ class AsyncNonMetaRegionLocator {
return allRequests.keySet().stream().filter(r ->
!isPending(r)).findFirst();
}
- public void clearCompletedRequests(RegionLocations locations) {
+ public List<RegionLocationsFutureResult>
clearCompletedRequests(RegionLocations locations) {
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
for (Iterator<Map.Entry<LocateRequest,
CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry =
iter.next();
- if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
+ if (tryComplete(entry.getKey(), entry.getValue(), locations,
futureResultList)) {
iter.remove();
}
}
+ return futureResultList;
}
private boolean tryComplete(LocateRequest req,
CompletableFuture<RegionLocations> future,
- RegionLocations locations) {
+ RegionLocations locations, List<RegionLocationsFutureResult>
futureResultList) {
if (future.isDone()) {
return true;
}
@@ -185,7 +209,7 @@ class AsyncNonMetaRegionLocator {
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
- future.complete(locations);
+ futureResultList.add(new RegionLocationsFutureResult(future,
locations, null));
return true;
} else {
return false;
@@ -320,32 +344,36 @@ class AsyncNonMetaRegionLocator {
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
RegionLocations addedLocs = addToCache(tableCache, locs);
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
- tableCache.clearCompletedRequests(addedLocs);
+ futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
// Remove a complete locate request in a synchronized block, so the
table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
} else {
// we meet an error
assert error != null;
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
// fail the request itself, no matter whether it is a
DoNotRetryIOException, as we have
// already retried several times
- CompletableFuture<?> future = tableCache.allRequests.remove(req);
+ CompletableFuture<RegionLocations> future =
tableCache.allRequests.remove(req);
if (future != null) {
- future.completeExceptionally(error);
+ futureResultList.add(new RegionLocationsFutureResult(future, null,
error));
}
- tableCache.clearCompletedRequests(null);
+ futureResultList.addAll(tableCache.clearCompletedRequests(null));
// Remove a complete locate request in a synchronized block, so the
table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
}
}
@@ -543,9 +571,11 @@ class AsyncNonMetaRegionLocator {
continue;
}
RegionLocations addedLocs = addToCache(tableCache, locs);
+ List<RegionLocationsFutureResult> futureResultList = new
ArrayList<>();
synchronized (tableCache) {
- tableCache.clearCompletedRequests(addedLocs);
+
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
}
}
}
@@ -677,12 +707,16 @@ class AsyncNonMetaRegionLocator {
if (tableCache == null) {
return;
}
+ List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
if (!tableCache.allRequests.isEmpty()) {
IOException error = new IOException("Cache cleared");
- tableCache.allRequests.values().forEach(f ->
f.completeExceptionally(error));
+ tableCache.allRequests.values().forEach(f -> {
+ futureResultList.add(new RegionLocationsFutureResult(f, null,
error));
+ });
}
}
+ futureResultList.forEach(RegionLocationsFutureResult::complete);
conn.getConnectionMetrics()
.ifPresent(metrics ->
metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}