virajjasani commented on a change in pull request #2095:
URL: https://github.com/apache/hbase/pull/2095#discussion_r457098506



##########
File path: 
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, 
Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], 
RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], 
RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 
0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, 
AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry 
registry,
+    AtomicReference<T> stub, AtomicReference<CompletableFuture<T>> 
stubMakeFuture,
+    RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+    Function<RpcChannel, T> stubMaker, String type) {
+    return getOrFetch(stub, stubMakeFuture, () -> {
+      CompletableFuture<T> future = new CompletableFuture<>();
+      addListener(registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(stubMaker.apply(
+              rpcClient.createRpcChannel(addr, user, 
toIntNoOverflow(unit.toMillis(rpcTimeout)))));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, type);
+  }
+
+  private static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> 
cacheRef,
+    AtomicReference<CompletableFuture<T>> futureRef, 
+    Supplier<CompletableFuture<T>> fetch, String type) {
+    for (;;) {
+      T cachedValue = cacheRef.get();
+      if (cachedValue != null) {
+        return CompletableFuture.completedFuture(cachedValue);
+      }
+      LOG.trace("{} cache is null, try fetching from registry", type);
+      if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
+        LOG.debug("Start fetching {} from registry", type);
+        CompletableFuture<T> future = futureRef.get();
+        addListener(fetch.get(), (value, error) -> {
+          if (error != null) {
+            LOG.debug("Failed to fetch {} from registry", type, error);
+            futureRef.getAndSet(null).completeExceptionally(error);

Review comment:
       With futureRef.compareAndSet(null, val), we are good here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to