Repository: hbase
Updated Branches:
  refs/heads/branch-2 1f7873d30 -> b2afd6c24


HBASE-18598 AsyncNonMetaRegionLocator use FIFO algorithm to get a candidate 
locate request


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2afd6c2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2afd6c2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2afd6c2

Branch: refs/heads/branch-2
Commit: b2afd6c24e3727465881735073ad2df1f7380fde
Parents: 1f7873d
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Aug 15 16:15:29 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Aug 16 13:41:32 2017 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncNonMetaRegionLocator.java | 119 ++++++++++---------
 .../client/TestAsyncNonMetaRegionLocator.java   |   1 +
 2 files changed, 63 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b2afd6c2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 31f369c..ab1f0db 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -29,18 +29,18 @@ import static 
org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -107,7 +107,7 @@ class AsyncNonMetaRegionLocator {
     public final Set<LocateRequest> pendingRequests = new HashSet<>();
 
     public final Map<LocateRequest, CompletableFuture<HRegionLocation>> 
allRequests =
-        new HashMap<>();
+        new LinkedHashMap<>();
 
     public boolean hasQuota(int max) {
       return pendingRequests.size() < max;
@@ -120,6 +120,49 @@ class AsyncNonMetaRegionLocator {
     public void send(LocateRequest req) {
       pendingRequests.add(req);
     }
+
+    public Optional<LocateRequest> getCandidate() {
+      return allRequests.keySet().stream().filter(r -> 
!isPending(r)).findFirst();
+    }
+
+    public void clearCompletedRequests(Optional<HRegionLocation> location) {
+      for (Iterator<Map.Entry<LocateRequest, 
CompletableFuture<HRegionLocation>>> iter = allRequests
+          .entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = 
iter.next();
+        if (tryComplete(entry.getKey(), entry.getValue(), location)) {
+          iter.remove();
+        }
+      }
+    }
+
+    private boolean tryComplete(LocateRequest req, 
CompletableFuture<HRegionLocation> future,
+        Optional<HRegionLocation> location) {
+      if (future.isDone()) {
+        return true;
+      }
+      if (!location.isPresent()) {
+        return false;
+      }
+      HRegionLocation loc = location.get();
+      boolean completed;
+      if (req.locateType.equals(RegionLocateType.BEFORE)) {
+        // for locating the row before current row, the common case is to find 
the previous region in
+        // reverse scan, so we check the endKey first. In general, the 
condition should be startKey <
+        // req.row and endKey >= req.row. Here we split it to endKey == 
req.row || (endKey > req.row
+        // && startKey < req.row). The two conditions are equal since startKey 
< endKey.
+        int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
+        completed =
+            c == 0 || (c > 0 && 
Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
+      } else {
+        completed = loc.getRegionInfo().containsRow(req.row);
+      }
+      if (completed) {
+        future.complete(loc);
+        return true;
+      } else {
+        return false;
+      }
+    }
   }
 
   AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
@@ -186,48 +229,27 @@ class AsyncNonMetaRegionLocator {
     }
   }
 
-  private boolean tryComplete(LocateRequest req, 
CompletableFuture<HRegionLocation> future,
-      HRegionLocation loc) {
-    if (future.isDone()) {
-      return true;
-    }
-    boolean completed;
-    if (req.locateType.equals(RegionLocateType.BEFORE)) {
-      // for locating the row before current row, the common case is to find 
the previous region in
-      // reverse scan, so we check the endKey first. In general, the condition 
should be startKey <
-      // req.row and endKey >= req.row. Here we split it to endKey == req.row 
|| (endKey > req.row
-      // && startKey < req.row). The two conditions are equal since startKey < 
endKey.
-      int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
-      completed =
-          c == 0 || (c > 0 && 
Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
-    } else {
-      completed = loc.getRegionInfo().containsRow(req.row);
-    }
-    if (completed) {
-      future.complete(loc);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
   private void complete(TableName tableName, LocateRequest req, 
HRegionLocation loc,
       Throwable error) {
     if (error != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to locate region in '" + tableName + "', row='" +
-            Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
-          error);
-      }
+      LOG.warn(
+        "Failed to locate region in '" + tableName + "', row='" + 
Bytes.toStringBinary(req.row)
+            + "', locateType=" + req.locateType, error);
     }
-    LocateRequest toSend = null;
+    Optional<LocateRequest> toSend = Optional.empty();
     TableCache tableCache = getTableCache(tableName);
     if (loc != null) {
       if (!addToCache(tableCache, loc)) {
         // someone is ahead of us.
         synchronized (tableCache) {
           tableCache.pendingRequests.remove(req);
+          tableCache.clearCompletedRequests(Optional.empty());
+          // Remove a complete locate request in a synchronized block, so the 
table cache must have
+          // quota to send a candidate request.
+          toSend = tableCache.getCandidate();
+          toSend.ifPresent(r -> tableCache.send(r));
         }
+        toSend.ifPresent(r -> locateInMeta(tableName, r));
         return;
       }
     }
@@ -239,30 +261,13 @@ class AsyncNonMetaRegionLocator {
           future.completeExceptionally(error);
         }
       }
-      if (loc != null) {
-        for (Iterator<Map.Entry<LocateRequest, 
CompletableFuture<HRegionLocation>>> iter =
-            tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
-          Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = 
iter.next();
-          if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
-            iter.remove();
-          }
-        }
-      }
-      if (!tableCache.allRequests.isEmpty() &&
-          tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
-        LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
-            .filter(r -> 
!tableCache.isPending(r)).toArray(LocateRequest[]::new);
-        if (candidates.length > 0) {
-          // TODO: use a better algorithm to send a request which is more 
likely to fetch a new
-          // location.
-          toSend = 
candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
-          tableCache.send(toSend);
-        }
-      }
-    }
-    if (toSend != null) {
-      locateInMeta(tableName, toSend);
+      tableCache.clearCompletedRequests(Optional.ofNullable(loc));
+      // Remove a complete locate request in a synchronized block, so the 
table cache must have
+      // quota to send a candidate request.
+      toSend = tableCache.getCandidate();
+      toSend.ifPresent(r -> tableCache.send(r));
     }
+    toSend.ifPresent(r -> locateInMeta(tableName, r));
   }
 
   private void onScanComplete(TableName tableName, LocateRequest req, 
List<Result> results,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2afd6c2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 0bb192b..80ed02e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -209,6 +209,7 @@ public class TestAsyncNonMetaRegionLocator {
         throw new RuntimeException(e);
       }
     }));
+
     LOCATOR.clearCache(TABLE_NAME);
     byte[][] endKeys = getEndKeys();
     IntStream.range(0, 2).forEach(

Reply via email to