virajjasani commented on a change in pull request #2095:
URL: https://github.com/apache/hbase/pull/2095#discussion_r457175727



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, 
Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], 
RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], 
RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 
0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, 
AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry 
registry,
+    AtomicReference<T> stub, AtomicReference<CompletableFuture<T>> 
stubMakeFuture,
+    RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+    Function<RpcChannel, T> stubMaker, String type) {
+    return getOrFetch(stub, stubMakeFuture, () -> {
+      CompletableFuture<T> future = new CompletableFuture<>();
+      addListener(registry.getActiveMaster(), (addr, error) -> {

Review comment:
       +1

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, 
Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], 
RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], 
RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 
0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, 
AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry 
registry,
+    AtomicReference<T> stub, AtomicReference<CompletableFuture<T>> 
stubMakeFuture,
+    RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+    Function<RpcChannel, T> stubMaker, String type) {
+    return getOrFetch(stub, stubMakeFuture, () -> {
+      CompletableFuture<T> future = new CompletableFuture<>();
+      addListener(registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(stubMaker.apply(
+              rpcClient.createRpcChannel(addr, user, 
toIntNoOverflow(unit.toMillis(rpcTimeout)))));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, type);
+  }
+
+  private static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> 
cacheRef,
+    AtomicReference<CompletableFuture<T>> futureRef, 
+    Supplier<CompletableFuture<T>> fetch, String type) {
+    for (;;) {
+      T cachedValue = cacheRef.get();
+      if (cachedValue != null) {
+        return CompletableFuture.completedFuture(cachedValue);
+      }
+      LOG.trace("{} cache is null, try fetching from registry", type);
+      if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
+        LOG.debug("Start fetching {} from registry", type);
+        CompletableFuture<T> future = futureRef.get();
+        addListener(fetch.get(), (value, error) -> {
+          if (error != null) {
+            LOG.debug("Failed to fetch {} from registry", type, error);
+            futureRef.getAndSet(null).completeExceptionally(error);
+            return;
+          }
+          LOG.debug("The fetched {} is {}", type, value);
+          // Here we update cache before reset future, so it is possible that 
someone can get a
+          // stale value. Consider this:
+          // 1. update cacheRef
+          // 2. someone clears the cache and relocates again
+          // 3. the futureRef is not null so the old future is used.
+          // 4. we clear futureRef and complete the future in it with the 
value being
+          // cleared in step 2.
+          // But we do not think it is a big deal as it rarely happens, and 
even if it happens, the
+          // caller will retry again later, no correctness problems.
+          cacheRef.set(value);
+          futureRef.set(null);
+          future.complete(value);
+        });
+        return future;
+      } else {
+        CompletableFuture<T> future = futureRef.get();
+        if (future != null) {
+          return future;
+        }
+      }
+    }
+  }
+
+  public static CompletableFuture<List<HRegionLocation>> 
getAllMetaRegionLocations(
+    boolean excludeOfflinedSplitParents,
+    CompletableFuture<ClientMetaService.Interface> getStubFuture,
+    AtomicReference<ClientMetaService.Interface> stubRef,
+    RpcControllerFactory rpcControllerFactory, int callTimeoutMs) {

Review comment:
       Sure, that is fine.

##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
##########
@@ -18,21 +18,36 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Registry for meta information needed for connection setup to a HBase 
cluster. Implementations
- * hold cluster information such as this cluster's id, location of hbase:meta, 
etc..
- * Internal use only.
+ * hold cluster information such as this cluster's id, location of hbase:meta, 
etc.. Internal use
+ * only.
  */
 @InterfaceAudience.Private
 interface ConnectionRegistry extends Closeable {
 
+  /**
+   * Get location of meta region for the given {@code row}.
+   */
+  CompletableFuture<RegionLocations> locateMeta(byte[] row, RegionLocateType 
locateType);
+
+  /**
+   * Get all meta region locations, including the location of secondary 
regions.
+   * @param excludeOfflinedSplitParents whether to include split parent.
+   */
+  CompletableFuture<List<HRegionLocation>>

Review comment:
       I think it's fine to keep them separate, logically it might make sense 
to combine them but given that Rpc service interface also has the same 
separation, here we can maintain it for easy mapping. Yes it is 1:1 with RPC 
call, but still looks simple. Anyways, I don't have strong preference here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to