bharathv commented on a change in pull request #2095:
URL: https://github.com/apache/hbase/pull/2095#discussion_r456974920
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
##########
@@ -18,21 +18,36 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Registry for meta information needed for connection setup to a HBase
cluster. Implementations
- * hold cluster information such as this cluster's id, location of hbase:meta,
etc..
- * Internal use only.
+ * hold cluster information such as this cluster's id, location of hbase:meta,
etc.. Internal use
+ * only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
+ /**
+ * Get location of meta region for the given {@code row}.
+ */
+ CompletableFuture<RegionLocations> locateMeta(byte[] row, RegionLocateType
locateType);
+
+ /**
+ * Get all meta region locations, including the location of secondary
regions.
+ * @param excludeOfflinedSplitParents whether to include split parent.
+ */
+ CompletableFuture<List<HRegionLocation>>
Review comment:
Should we consider merging the both the APIs to fetch meta locations
into a single RPC call to keep this interface simple and clean? Right now it
maps 1:1 to the requirements of RegionLocator API.
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
##########
@@ -59,9 +74,29 @@
private final ZNodePaths znodePaths;
- ZKConnectionRegistry(Configuration conf) {
+ private final AtomicReference<Interface> stub = new AtomicReference<>();
Review comment:
nit: call it cachedStub or something?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRowBefore;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A cache of meta region locations.
+ */
[email protected]
+class MetaLocationCache implements Stoppable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetaLocationCache.class);
+
+ @VisibleForTesting
+ static final String SYNC_INTERVAL_SECONDS =
+ "hbase.master.meta-location-cache.sync-interval-seconds";
+
+ // default sync every 1 second.
+ @VisibleForTesting
+ static final int DEFAULT_SYNC_INTERVAL_SECONDS = 1;
+
+ private static final String FETCH_TIMEOUT_MS =
+ "hbase.master.meta-location-cache.fetch-timeout-ms";
+
+ // default timeout 1 second
+ private static final int DEFAULT_FETCH_TIMEOUT_MS = 1000;
+
+ private static final class CacheHolder {
+
+ final NavigableMap<byte[], RegionLocations> cache;
+
+ final List<HRegionLocation> all;
+
+ CacheHolder(List<HRegionLocation> all) {
+ this.all = Collections.unmodifiableList(all);
+ NavigableMap<byte[], SortedSet<HRegionLocation>> startKeyToLocs =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (HRegionLocation loc : all) {
+ if (loc.getRegion().isSplitParent()) {
+ continue;
+ }
+ startKeyToLocs.computeIfAbsent(loc.getRegion().getStartKey(),
+ k -> new TreeSet<>((l1, l2) ->
l1.getRegion().compareTo(l2.getRegion()))).add(loc);
+ }
+ this.cache =
startKeyToLocs.entrySet().stream().collect(Collectors.collectingAndThen(
+ Collectors.toMap(Map.Entry::getKey, e -> new
RegionLocations(e.getValue()), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)),
Collections::unmodifiableNavigableMap));
+ }
+ }
+
+ private volatile CacheHolder holder;
+
+ private volatile boolean stopped = false;
+
+ MetaLocationCache(MasterServices master) {
+ int syncIntervalSeconds =
+ master.getConfiguration().getInt(SYNC_INTERVAL_SECONDS,
DEFAULT_SYNC_INTERVAL_SECONDS);
+ int fetchTimeoutMs =
+ master.getConfiguration().getInt(FETCH_TIMEOUT_MS,
DEFAULT_FETCH_TIMEOUT_MS);
+ master.getChoreService().scheduleChore(new ScheduledChore(
+ getClass().getSimpleName() + "-Sync-Chore", this, syncIntervalSeconds,
0, TimeUnit.SECONDS) {
+
+ @Override
+ protected void chore() {
+ AsyncClusterConnection conn = master.getAsyncClusterConnection();
Review comment:
nit: conn reference can be cached once
##########
File path:
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java
##########
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MultithreadedTestUtil;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.MetaRegionLocationCache;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-
-@Category({ SmallTests.class, MasterTests.class })
Review comment:
I think you can also remove MetaRegionLocationCache implementation..
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
##########
@@ -229,6 +264,45 @@ private void
getMetaRegionLocation(CompletableFuture<RegionLocations> future,
});
}
+ private CompletableFuture<Interface> getStub() {
+ return ConnectionUtils.getMasterStub(this, stub, stubMakeFuture,
rpcClient, user,
+ readRpcTimeoutNs, TimeUnit.NANOSECONDS, ClientMetaService::newStub,
"ClientMetaService");
+ }
+
+ @Override
+ public CompletableFuture<RegionLocations> locateMeta(byte[] row,
RegionLocateType locateType) {
+ CompletableFuture<RegionLocations> future = new CompletableFuture<>();
+ addListener(getStub(), (stub, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ stub.locateMetaRegion(controller,
+ LocateMetaRegionRequest.newBuilder().setRow(ByteString.copyFrom(row))
+
.setLocateType(ProtobufUtil.toProtoRegionLocateType(locateType)).build(),
+ resp -> {
+ if (controller.failed()) {
+ IOException ex = controller.getFailed();
+ ConnectionUtils.tryClearMasterStubCache(ex, stub,
ZKConnectionRegistry.this.stub);
+ future.completeExceptionally(ex);
+ return;
+ }
+ RegionLocations locs = new
RegionLocations(resp.getMetaLocationsList().stream()
+ .map(ProtobufUtil::toRegionLocation).collect(Collectors.toList()));
+ future.complete(locs);
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<HRegionLocation>>
+ getAllMetaRegionLocations(boolean excludeOfflinedSplitParents) {
+ return
ConnectionUtils.getAllMetaRegionLocations(excludeOfflinedSplitParents,
getStub(), stub,
+ rpcControllerFactory, -1);
+ }
+
@Override
public void close() {
zk.close();
Review comment:
rpcClient.close()?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -2928,13 +2928,27 @@ public GetActiveMasterResponse
getActiveMaster(RpcController rpcController,
@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController
rpcController,
- GetMetaRegionLocationsRequest request) throws ServiceException {
- GetMetaRegionLocationsResponse.Builder response =
GetMetaRegionLocationsResponse.newBuilder();
- Optional<List<HRegionLocation>> metaLocations =
- master.getMetaRegionLocationCache().getMetaRegionLocations();
- metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
- location ->
response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
- return response.build();
+ GetMetaRegionLocationsRequest request) throws ServiceException {
+ MetaLocationCache cache = master.getMetaLocationCache();
+ RegionLocations locs;
+ try {
+ if (cache != null) {
+ locs = cache.locateMeta(HConstants.EMPTY_BYTE_ARRAY,
RegionLocateType.CURRENT);
+ } else {
Review comment:
Why do we need the else part? It looks like the cache runs on all
masters ?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRowBefore;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A cache of meta region locations.
+ */
[email protected]
+class MetaLocationCache implements Stoppable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetaLocationCache.class);
+
+ @VisibleForTesting
+ static final String SYNC_INTERVAL_SECONDS =
+ "hbase.master.meta-location-cache.sync-interval-seconds";
+
+ // default sync every 1 second.
+ @VisibleForTesting
+ static final int DEFAULT_SYNC_INTERVAL_SECONDS = 1;
+
+ private static final String FETCH_TIMEOUT_MS =
+ "hbase.master.meta-location-cache.fetch-timeout-ms";
+
+ // default timeout 1 second
+ private static final int DEFAULT_FETCH_TIMEOUT_MS = 1000;
+
+ private static final class CacheHolder {
+
+ final NavigableMap<byte[], RegionLocations> cache;
+
+ final List<HRegionLocation> all;
+
+ CacheHolder(List<HRegionLocation> all) {
+ this.all = Collections.unmodifiableList(all);
+ NavigableMap<byte[], SortedSet<HRegionLocation>> startKeyToLocs =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (HRegionLocation loc : all) {
+ if (loc.getRegion().isSplitParent()) {
+ continue;
+ }
+ startKeyToLocs.computeIfAbsent(loc.getRegion().getStartKey(),
+ k -> new TreeSet<>((l1, l2) ->
l1.getRegion().compareTo(l2.getRegion()))).add(loc);
+ }
+ this.cache =
startKeyToLocs.entrySet().stream().collect(Collectors.collectingAndThen(
+ Collectors.toMap(Map.Entry::getKey, e -> new
RegionLocations(e.getValue()), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)),
Collections::unmodifiableNavigableMap));
+ }
+ }
+
+ private volatile CacheHolder holder;
+
+ private volatile boolean stopped = false;
+
+ MetaLocationCache(MasterServices master) {
+ int syncIntervalSeconds =
+ master.getConfiguration().getInt(SYNC_INTERVAL_SECONDS,
DEFAULT_SYNC_INTERVAL_SECONDS);
+ int fetchTimeoutMs =
+ master.getConfiguration().getInt(FETCH_TIMEOUT_MS,
DEFAULT_FETCH_TIMEOUT_MS);
+ master.getChoreService().scheduleChore(new ScheduledChore(
+ getClass().getSimpleName() + "-Sync-Chore", this, syncIntervalSeconds,
0, TimeUnit.SECONDS) {
+
+ @Override
+ protected void chore() {
+ AsyncClusterConnection conn = master.getAsyncClusterConnection();
+ if (conn != null) {
+ addListener(conn.getAllMetaRegionLocations(fetchTimeoutMs), (locs,
error) -> {
+ if (error != null) {
+ LOG.warn("Failed to fetch all meta region locations from active
master", error);
+ return;
+ }
+ CacheHolder ch = new CacheHolder(locs);
+ holder = ch;
Review comment:
I believe thats for thread-safety of holder
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller,
Throwable error) {
controller.setFailed(error.toString());
}
}
+
+ public static RegionLocations locateRow(NavigableMap<byte[],
RegionLocations> cache,
+ TableName tableName, byte[] row, int replicaId) {
+ Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+ if (entry == null) {
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
+ byte[] endKey = loc.getRegion().getEndKey();
+ if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={},
replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+ }
+ return locs;
+ } else {
+ return null;
+ }
+ }
+
+ public static RegionLocations locateRowBefore(NavigableMap<byte[],
RegionLocations> cache,
+ TableName tableName, byte[] row, int replicaId) {
+ boolean isEmptyStopRow = isEmptyStopRow(row);
+ Map.Entry<byte[], RegionLocations> entry =
+ isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+ if (entry == null) {
+ return null;
+ }
+ RegionLocations locs = entry.getValue();
+ HRegionLocation loc = locs.getRegionLocation(replicaId);
+ if (loc == null) {
+ return null;
+ }
+ if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+ (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >=
0)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} in cache for {}, row='{}', locateType={},
replicaId={}", loc, tableName,
+ Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+ }
+ return locs;
+ } else {
+ return null;
+ }
+ }
+
+ public static void tryClearMasterStubCache(IOException error,
+ ClientMetaService.Interface currentStub,
AtomicReference<ClientMetaService.Interface> stub) {
+ if (ClientExceptionsUtil.isConnectionException(error) ||
+ error instanceof ServerNotRunningYetException) {
+ stub.compareAndSet(currentStub, null);
+ }
+ }
+
+ public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry
registry,
+ AtomicReference<T> stub, AtomicReference<CompletableFuture<T>>
stubMakeFuture,
+ RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+ Function<RpcChannel, T> stubMaker, String type) {
+ return getOrFetch(stub, stubMakeFuture, () -> {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ addListener(registry.getActiveMaster(), (addr, error) -> {
Review comment:
I liked @saintstack's idea in the design doc where we can ask active
master for the list of available masters and load balance the RPCs. I think
that can be used both here (to randomize the master we are talking to) and in
master registry to always maintain a fresh list of masters and only use the
initial list of masters a seed input.
(I can quickly add that feature if everyone is okay with it).
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRowBefore;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A cache of meta region locations.
+ */
[email protected]
+class MetaLocationCache implements Stoppable {
Review comment:
We need any tests to test sync / timeout etc of this cache?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]