ndimiduk commented on code in PR #5865:
URL: https://github.com/apache/hbase/pull/5865#discussion_r1604779421
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java:
##########
@@ -39,4 +39,10 @@ public interface ConnectionRegistryURIFactory {
* {@link ConnectionRegistryFactory}.
*/
String getScheme();
+
+ /**
+ * Validate the given {@code uri}.
+ * @throws IOException if this is not a valid connection registry URI.
+ */
+ void validate(URI uri) throws IOException;
Review Comment:
Yeah, I understand. This enforces try-catch flow control where if/else would
be better. For this kind of thing, I really like the `Result` based API that is
catching on in other languages.
Anyway, maybe it should throw something besides `IOException` ?
IllegatStateException, for example?
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryURIFactory.java:
##########
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.URI;
+import org.apache.commons.lang3.StringUtils;
Review Comment:
Thanks.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java:
##########
@@ -402,6 +407,57 @@ public void removeAllQueuesAndHFileRefs(String peerId)
throws ReplicationExcepti
queueStorage.removePeerFromHFileRefs(peerId);
}
+ private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint)
+ throws DoNotRetryIOException {
+ if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) {
+ return;
+ }
+ // Endpoints implementing HBaseReplicationEndpoint need to check cluster
key
+ URI connectionUri =
ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey);
+ try {
+ if (connectionUri != null) {
+ ConnectionRegistryFactory.validate(connectionUri);
+ } else {
+ ZKConfig.validateClusterKey(clusterKey);
+ }
+ } catch (IOException e) {
+ throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
+ }
+ if (endpoint != null && endpoint.canReplicateToSameCluster()) {
+ return;
+ }
+ // make sure we do not replicate to same cluster
+ String peerClusterId;
+ try {
+ if (connectionUri != null) {
+ // fetch cluster id through standard admin API
+ try (Connection conn =
ConnectionFactory.createConnection(connectionUri, conf);
+ Admin admin = conn.getAdmin()) {
+ peerClusterId =
+
admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId();
Review Comment:
That would be nice, but I guess not urgent.
##########
hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java:
##########
@@ -3216,6 +3216,24 @@ public static <T> String safeGetAsStr(List<T> lst, int
i) {
}
}
+ public String getRpcConnnectionURI() throws UnknownHostException {
Review Comment:
okay.
##########
hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReservoirSample.java:
##########
@@ -41,7 +41,7 @@ public class ReservoirSample<T> {
private int n;
public ReservoirSample(int k) {
- Preconditions.checkArgument(k > 0, "negative sampling number(%d) is not
allowed");
+ Preconditions.checkArgument(k > 0, "negative sampling number(%s) is not
allowed", k);
Review Comment:
Yes I guess so.
##########
hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java:
##########
@@ -86,6 +86,14 @@ public class TestVerifyReplication extends
TestReplicationBase {
@Rule
public TestName name = new TestName();
+ @Override
+ protected String getClusterKey(HBaseTestingUtil util) throws Exception {
+ // TODO: VerifyReplication does not support connection uri yet, so here we
need to use cluster
Review Comment:
thanks!
##########
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java:
##########
@@ -166,8 +175,8 @@ public void onConfigurationChange(Configuration conf) {
for (ReplicationPeerImpl peer : peerCache.values()) {
try {
peer.onConfigurationChange(
- ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(),
conf));
- } catch (ReplicationException e) {
+ ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf,
peer.getPeerConfig()));
+ } catch (IOException e) {
LOG.warn("failed to reload configuration for peer {}", peer.getId(),
e);
Review Comment:
Yes true, the RS should not abort.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java:
##########
@@ -402,6 +407,57 @@ public void removeAllQueuesAndHFileRefs(String peerId)
throws ReplicationExcepti
queueStorage.removePeerFromHFileRefs(peerId);
}
+ private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint)
+ throws DoNotRetryIOException {
+ if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) {
+ return;
+ }
+ // Endpoints implementing HBaseReplicationEndpoint need to check cluster
key
+ URI connectionUri =
ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey);
+ try {
+ if (connectionUri != null) {
+ ConnectionRegistryFactory.validate(connectionUri);
+ } else {
+ ZKConfig.validateClusterKey(clusterKey);
+ }
+ } catch (IOException e) {
+ throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
+ }
+ if (endpoint != null && endpoint.canReplicateToSameCluster()) {
+ return;
+ }
+ // make sure we do not replicate to same cluster
+ String peerClusterId;
+ try {
+ if (connectionUri != null) {
+ // fetch cluster id through standard admin API
+ try (Connection conn =
ConnectionFactory.createConnection(connectionUri, conf);
+ Admin admin = conn.getAdmin()) {
+ peerClusterId =
+
admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId();
+ }
+ } else {
+ // Create the peer cluster config for get peer cluster id
+ Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
clusterKey);
+ try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this +
"check-peer-cluster-id", null)) {
+ peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
+ }
+ }
+ } catch (IOException | KeeperException e) {
+ // we just want to check whether we will replicate to the same cluster,
so if we get an error
+ // while getting the cluster id of the peer cluster, it means we are not
connecting to
+ // ourselves, as we are still alive. So here we just log the error and
continue
Review Comment:
_nod_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]