bharathv commented on a change in pull request #2071:
URL: https://github.com/apache/hbase/pull/2071#discussion_r455457366
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -337,26 +347,26 @@ public void removeAllQueuesAndHFileRefs(String peerId)
throws ReplicationExcepti
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws
DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
- boolean checkClusterKey = true;
+ ReplicationEndpoint endpoint = null;
if (!StringUtils.isBlank(replicationEndpointImpl)) {
- // try creating a instance
- ReplicationEndpoint endpoint;
try {
+ // try creating a instance
endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
throw new DoNotRetryIOException(
"Can not instantiate configured replication endpoint class=" +
replicationEndpointImpl,
e);
}
- // do not check cluster key if we are not
HBaseInterClusterReplicationEndpoint
- if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
- checkClusterKey = false;
- }
}
- if (checkClusterKey) {
+ // Default is HBaseInterClusterReplicationEndpoint and only it need to
check cluster key
+ if (endpoint == null || endpoint instanceof
HBaseInterClusterReplicationEndpoint) {
checkClusterKey(peerConfig.getClusterKey());
}
+ // Default is HBaseInterClusterReplicationEndpoint which cannot replicate
to same cluster
+ if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
Review comment:
nit: same question as above..null check on endpoint.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -493,12 +499,35 @@ private void
checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
}
}
- private void checkClusterKey(String clusterKey) throws DoNotRetryIOException
{
+ private void checkClusterKeyForHBaseInterClusterReplicationEndpoint(String
clusterKey)
+ throws DoNotRetryIOException {
try {
ZKConfig.validateClusterKey(clusterKey);
} catch (IOException e) {
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
}
+ String peerClusterId = "";
Review comment:
oh sorry my bad, I misread the code.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -337,26 +347,26 @@ public void removeAllQueuesAndHFileRefs(String peerId)
throws ReplicationExcepti
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws
DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
- boolean checkClusterKey = true;
+ ReplicationEndpoint endpoint = null;
Review comment:
nit = null is redundant.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -501,6 +511,32 @@ private void checkClusterKey(String clusterKey) throws
DoNotRetryIOException {
}
}
+ private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
+ String peerClusterId = "";
+ try {
+ // Create the peer cluster config for get peer cluster id
+ Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
clusterKey);
+ ZKWatcher zkWatcher =
+ new ZKWatcher(peerConf, this + "check-peer-cluster-key", new
Abortable() {
+ @Override public void abort(String why, Throwable e) {
+ }
+
+ @Override public boolean isAborted() {
+ return false;
+ }
+ });
+ peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
+ } catch (IOException | KeeperException e) {
+ throw new DoNotRetryIOException("Can't get peerClusterId for
clusterKey=" + clusterKey, e);
+ }
Review comment:
We want to close() the zkWatcher? Otherwise there is a connection leak?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -501,6 +511,32 @@ private void checkClusterKey(String clusterKey) throws
DoNotRetryIOException {
}
}
+ private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
+ String peerClusterId = "";
+ try {
+ // Create the peer cluster config for get peer cluster id
+ Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
clusterKey);
+ ZKWatcher zkWatcher =
+ new ZKWatcher(peerConf, this + "check-peer-cluster-key", new
Abortable() {
+ @Override public void abort(String why, Throwable e) {
+ }
+
+ @Override public boolean isAborted() {
+ return false;
+ }
+ });
+ peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
+ } catch (IOException | KeeperException e) {
+ throw new DoNotRetryIOException("Can't get peerClusterId for
clusterKey=" + clusterKey, e);
+ }
+ // In rare case, zookeeper setting may be messed up. That leads to the
incorrect
+ // peerClusterId value, which is the same as the source clusterId
+ if (clusterId.equals(peerClusterId)) {
Review comment:
Actually, now that I read the code again, why not use
ReplicationEndpoint#getPeerUUID().toString(). That wraps the whole logic?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -501,6 +511,32 @@ private void checkClusterKey(String clusterKey) throws
DoNotRetryIOException {
}
}
+ private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
+ String peerClusterId = "";
+ try {
+ // Create the peer cluster config for get peer cluster id
+ Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
clusterKey);
+ ZKWatcher zkWatcher =
+ new ZKWatcher(peerConf, this + "check-peer-cluster-key", new
Abortable() {
Review comment:
Looks like ZKWatcher is null-safe with abortable. Since we are not
actually relying on it, just pass null?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
##########
@@ -337,25 +347,21 @@ public void removeAllQueuesAndHFileRefs(String peerId)
throws ReplicationExcepti
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws
DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
- boolean checkClusterKey = true;
+ ReplicationEndpoint endpoint = null;
if (!StringUtils.isBlank(replicationEndpointImpl)) {
- // try creating a instance
- ReplicationEndpoint endpoint;
try {
+ // try creating a instance
endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Throwable e) {
throw new DoNotRetryIOException(
"Can not instantiate configured replication endpoint class=" +
replicationEndpointImpl,
e);
}
- // do not check cluster key if we are not
HBaseInterClusterReplicationEndpoint
- if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
- checkClusterKey = false;
- }
}
- if (checkClusterKey) {
- checkClusterKey(peerConfig.getClusterKey());
+ // Default is HBaseInterClusterReplicationEndpoint and only it need to
check cluster key
+ if (endpoint == null || endpoint instanceof
HBaseInterClusterReplicationEndpoint) {
Review comment:
Thanks. Why have endpoint == null check? Ideally it shouldn't be null?
(if we are past the try-catch)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]