This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 26c4e86 HBASE-25976 Implement a master based ReplicationTracker
(#3390)
26c4e86 is described below
commit 26c4e86ebdc42deff7cb22424fc58b22f3614815
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jun 17 18:24:49 2021 +0800
HBASE-25976 Implement a master based ReplicationTracker (#3390)
Signed-off-by: Bharath Vissapragada <[email protected]>
---
hbase-replication/pom.xml | 5 +
.../replication/MasterReplicationTracker.java | 103 ++++++++
.../hbase/replication/ReplicationFactory.java | 17 +-
.../hbase/replication/ReplicationListener.java | 3 +-
.../hbase/replication/ReplicationTracker.java | 33 ++-
.../hbase/replication/ReplicationTrackerBase.java | 72 ++++++
.../replication/ReplicationTrackerParams.java | 98 ++++++++
...rackerZKImpl.java => ZKReplicationTracker.java} | 102 +++-----
.../replication/ReplicationTrackerTestBase.java | 110 +++++++++
.../replication/TestMasterReplicationTracker.java | 87 +++++++
.../replication/TestZKReplicationPeerStorage.java | 30 ++-
.../replication/TestZKReplicationTracker.java | 84 +++++++
.../regionserver/DumpReplicationQueues.java | 9 +-
.../replication/regionserver/Replication.java | 7 +-
.../regionserver/ReplicationSourceManager.java | 21 +-
.../replication/TestReplicationTrackerZKImpl.java | 270 ---------------------
16 files changed, 685 insertions(+), 366 deletions(-)
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
index 7295617..60bb16d 100644
--- a/hbase-replication/pom.xml
+++ b/hbase-replication/pom.xml
@@ -121,6 +121,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
new file mode 100644
index 0000000..c55a82e
--- /dev/null
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ReplicationTracker} implementation which polls the region servers
list periodically from
+ * master.
+ */
[email protected]
+class MasterReplicationTracker extends ReplicationTrackerBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MasterReplicationTracker.class);
+
+ static final String REFRESH_INTERVAL_SECONDS =
+ "hbase.replication.tracker.master.refresh.interval.secs";
+
+ // default to refresh every 5 seconds
+ static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5;
+
+ private final ChoreService choreService;
+
+ private final ScheduledChore chore;
+
+ private final Admin admin;
+
+ private volatile Set<ServerName> regionServers;
+
+ MasterReplicationTracker(ReplicationTrackerParams params) {
+ try {
+ this.admin = params.connection().getAdmin();
+ } catch (IOException e) {
+ // should not happen
+ throw new AssertionError(e);
+ }
+ this.choreService = params.choreService();
+ int refreshIntervalSecs =
+ params.conf().getInt(REFRESH_INTERVAL_SECONDS,
REFRESH_INTERVAL_SECONDS_DEFAULT);
+ this.chore = new ScheduledChore(getClass().getSimpleName(),
params.stopptable(),
+ refreshIntervalSecs, 0, TimeUnit.SECONDS) {
+
+ @Override
+ protected void chore() {
+ try {
+ refresh();
+ } catch (IOException e) {
+ LOG.warn("failed to refresh region server list for replication", e);
+ }
+ }
+ };
+ }
+
+ private Set<ServerName> reload() throws IOException {
+ return Collections.unmodifiableSet(new
HashSet<>(admin.getRegionServers()));
+
+ }
+
+ private void refresh() throws IOException {
+ Set<ServerName> newRegionServers = reload();
+ for (ServerName oldRs : regionServers) {
+ if (!newRegionServers.contains(oldRs)) {
+ notifyListeners(oldRs);
+ }
+ }
+ this.regionServers = newRegionServers;
+ }
+
+ @Override
+ protected Set<ServerName>
internalLoadLiveRegionServersAndInitializeListeners()
+ throws IOException {
+ Set<ServerName> newRegionServers = reload();
+ this.regionServers = newRegionServers;
+ choreService.scheduleChore(chore);
+ return newRegionServers;
+ }
+}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 2a970ba..b4d33d6 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -27,14 +26,20 @@ import org.apache.yetus.audience.InterfaceAudience;
* A factory class for instantiating replication objects that deal with
replication state.
*/
@InterfaceAudience.Private
-public class ReplicationFactory {
+public final class ReplicationFactory {
+
+ public static final String REPLICATION_TRACKER_IMPL =
"hbase.replication.tracker.impl";
+
+ private ReplicationFactory() {
+ }
public static ReplicationPeers getReplicationPeers(ZKWatcher zk,
Configuration conf) {
return new ReplicationPeers(zk, conf);
}
- public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
Abortable abortable,
- Stoppable stopper) {
- return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper);
+ public static ReplicationTracker
getReplicationTracker(ReplicationTrackerParams params) {
+ Class<? extends ReplicationTracker> clazz =
params.conf().getClass(REPLICATION_TRACKER_IMPL,
+ ZKReplicationTracker.class, ReplicationTracker.class);
+ return ReflectionUtils.newInstance(clazz, params);
}
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
index f040bf9..5c21e1e 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -33,5 +34,5 @@ public interface ReplicationListener {
* A region server has been removed from the local cluster
* @param regionServer the removed region server
*/
- public void regionServerRemoved(String regionServer);
+ public void regionServerRemoved(ServerName regionServer);
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
index 9370226..1d59401 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
@@ -18,18 +18,20 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.util.List;
-
+import java.io.IOException;
+import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * This is the interface for a Replication Tracker. A replication tracker
provides the facility to
- * subscribe and track events that reflect a change in replication state.
These events are used by
- * the ReplicationSourceManager to coordinate replication tasks such as
addition/deletion of queues
- * and queue failover. These events are defined in the ReplicationListener
interface. If a class
- * would like to listen to replication events it must implement the
ReplicationListener interface
- * and register itself with a Replication Tracker.
+ * This is the interface for a Replication Tracker.
+ * <p/>
+ * A replication tracker provides the facility to subscribe and track events
that reflect a change
+ * in replication state. These events are used by the ReplicationSourceManager
to coordinate
+ * replication tasks such as addition/deletion of queues and queue failover.
These events are
+ * defined in the ReplicationListener interface. If a class would like to
listen to replication
+ * events it must implement the ReplicationListener interface and register
itself with a Replication
+ * Tracker.
*/
@InterfaceAudience.Private
public interface ReplicationTracker {
@@ -40,11 +42,20 @@ public interface ReplicationTracker {
*/
void registerListener(ReplicationListener listener);
+ /**
+ * Remove a replication listener
+ * @param listener the listener to remove
+ */
void removeListener(ReplicationListener listener);
/**
- * Returns a list of other live region servers in the cluster.
- * @return List of region servers.
+ * In this method, you need to load the newest list of region server list
and return it, and all
+ * later changes to the region server list must be passed to the listeners.
+ * <p/>
+ * This is very important for us to not miss a region server crash.
+ * <p/>
+ * Notice that this method can only be called once.
+ * @return Set of region servers.
*/
- List<ServerName> getListOfRegionServers();
+ Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws
IOException;
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
new file mode 100644
index 0000000..96a3061
--- /dev/null
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base implementation class for replication tracker.
+ */
[email protected]
+abstract class ReplicationTrackerBase implements ReplicationTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationTrackerBase.class);
+
+ // listeners to be notified
+ private final List<ReplicationListener> listeners = new
CopyOnWriteArrayList<>();
+
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+ @Override
+ public void registerListener(ReplicationListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(ReplicationListener listener) {
+ listeners.remove(listener);
+ }
+
+ protected final void notifyListeners(ServerName regionServer) {
+ LOG.info("{} is dead, triggering replicatorRemoved event", regionServer);
+ for (ReplicationListener listener : listeners) {
+ listener.regionServerRemoved(regionServer);
+ }
+ }
+
+ @Override
+ public final Set<ServerName> loadLiveRegionServersAndInitializeListeners()
throws IOException {
+ if (!initialized.compareAndSet(false, true)) {
+ throw new IllegalStateException(
+ "loadLiveRegionServersAndInitializeListeners can only be called once");
+ }
+ return internalLoadLiveRegionServersAndInitializeListeners();
+ }
+
+ protected abstract Set<ServerName>
internalLoadLiveRegionServersAndInitializeListeners()
+ throws IOException;
+
+}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
new file mode 100644
index 0000000..9aeedcf
--- /dev/null
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Parameters for constructing a {@link ReplicationTracker}.
+ */
[email protected]
+public final class ReplicationTrackerParams {
+
+ private final Configuration conf;
+
+ private final Stoppable stopptable;
+
+ private ZKWatcher zookeeper;
+
+ private Abortable abortable;
+
+ private Connection conn;
+
+ private ChoreService choreService;
+
+ private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) {
+ this.conf = conf;
+ this.stopptable = stopptable;
+ }
+
+ public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) {
+ this.zookeeper = zookeeper;
+ return this;
+ }
+
+ public ReplicationTrackerParams abortable(Abortable abortable) {
+ this.abortable = abortable;
+ return this;
+ }
+
+ public ReplicationTrackerParams connection(Connection conn) {
+ this.conn = conn;
+ return this;
+ }
+
+ public ReplicationTrackerParams choreService(ChoreService choreService) {
+ this.choreService = choreService;
+ return this;
+ }
+
+ public Configuration conf() {
+ return conf;
+ }
+
+ public Stoppable stopptable() {
+ return stopptable;
+ }
+
+ public ZKWatcher zookeeper() {
+ return zookeeper;
+ }
+
+ public Abortable abortable() {
+ return abortable;
+ }
+
+ public Connection connection() {
+ return conn;
+ }
+
+ public ChoreService choreService() {
+ return choreService;
+ }
+
+ public static ReplicationTrackerParams create(Configuration conf, Stoppable
stopptable) {
+ return new ReplicationTrackerParams(conf, stopptable);
+ }
+}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
similarity index 65%
rename from
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
rename to
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
index 6fc3c45..b74187a 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
@@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
/**
* This class is a ZooKeeper implementation of the ReplicationTracker
interface. This class is
@@ -39,9 +39,7 @@ import org.slf4j.LoggerFactory;
* interface.
*/
@InterfaceAudience.Private
-public class ReplicationTrackerZKImpl implements ReplicationTracker {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
+class ZKReplicationTracker extends ReplicationTrackerBase {
// Zookeeper
private final ZKWatcher zookeeper;
@@ -49,42 +47,14 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
private final Abortable abortable;
// All about stopping
private final Stoppable stopper;
- // listeners to be notified
- private final List<ReplicationListener> listeners = new
CopyOnWriteArrayList<>();
// List of all the other region servers in this cluster
- private final List<ServerName> otherRegionServers = new ArrayList<>();
+ private final Set<ServerName> regionServers = new HashSet<>();
- public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable,
Stoppable stopper) {
- this.zookeeper = zookeeper;
- this.abortable = abortable;
- this.stopper = stopper;
+ ZKReplicationTracker(ReplicationTrackerParams params) {
+ this.zookeeper = params.zookeeper();
+ this.abortable = params.abortable();
+ this.stopper = params.stopptable();
this.zookeeper.registerListener(new
OtherRegionServerWatcher(this.zookeeper));
- // watch the changes
- refreshOtherRegionServersList(true);
- }
-
- @Override
- public void registerListener(ReplicationListener listener) {
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(ReplicationListener listener) {
- listeners.remove(listener);
- }
-
- /**
- * Return a snapshot of the current region servers.
- */
- @Override
- public List<ServerName> getListOfRegionServers() {
- refreshOtherRegionServersList(false);
-
- List<ServerName> list = null;
- synchronized (otherRegionServers) {
- list = new ArrayList<>(otherRegionServers);
- }
- return list;
}
/**
@@ -106,6 +76,9 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
*/
@Override
public void nodeCreated(String path) {
+ if (stopper.isStopped()) {
+ return;
+ }
refreshListIfRightPath(path);
}
@@ -118,14 +91,10 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
if (stopper.isStopped()) {
return;
}
- boolean cont = refreshListIfRightPath(path);
- if (!cont) {
+ if (!refreshListIfRightPath(path)) {
return;
}
- LOG.info(path + " znode expired, triggering replicatorRemoved event");
- for (ReplicationListener rl : listeners) {
- rl.regionServerRemoved(getZNodeName(path));
- }
+ notifyListeners(ServerName.valueOf(getZNodeName(path)));
}
/**
@@ -144,7 +113,7 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
return false;
}
- return refreshOtherRegionServersList(true);
+ return refreshRegionServerList();
}
}
@@ -154,8 +123,8 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
* @return the id or an empty string if path is invalid
*/
private String getZNodeName(String fullPath) {
- String[] parts = fullPath.split("/");
- return parts.length > 0 ? parts[parts.length - 1] : "";
+ List<String> parts = Splitter.on('/').splitToList(fullPath);
+ return parts.size() > 0 ? parts.get(parts.size() - 1) : "";
}
/**
@@ -164,14 +133,14 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
* @return true if the local list of the other region servers was updated
with the ZK data (even
* if it was empty), false if the data was missing in ZK
*/
- private boolean refreshOtherRegionServersList(boolean watch) {
- List<ServerName> newRsList = getRegisteredRegionServers(watch);
+ private boolean refreshRegionServerList() {
+ Set<ServerName> newRsList = getRegisteredRegionServers();
if (newRsList == null) {
return false;
} else {
- synchronized (otherRegionServers) {
- otherRegionServers.clear();
- otherRegionServers.addAll(newRsList);
+ synchronized (regionServers) {
+ regionServers.clear();
+ regionServers.addAll(newRsList);
}
}
return true;
@@ -181,19 +150,26 @@ public class ReplicationTrackerZKImpl implements
ReplicationTracker {
* Get a list of all the other region servers in this cluster and set a watch
* @return a list of server nanes
*/
- private List<ServerName> getRegisteredRegionServers(boolean watch) {
+ private Set<ServerName> getRegisteredRegionServers() {
List<String> result = null;
try {
- if (watch) {
- result = ZKUtil.listChildrenAndWatchThem(this.zookeeper,
- this.zookeeper.getZNodePaths().rsZNode);
- } else {
- result = ZKUtil.listChildrenNoWatch(this.zookeeper,
this.zookeeper.getZNodePaths().rsZNode);
- }
+ result =
+ ZKUtil.listChildrenAndWatchThem(this.zookeeper,
this.zookeeper.getZNodePaths().rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
return result == null ? null :
-
result.stream().map(ServerName::parseServerName).collect(Collectors.toList());
+
result.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+ }
+
+ @Override
+ protected Set<ServerName>
internalLoadLiveRegionServersAndInitializeListeners()
+ throws IOException {
+ if (!refreshRegionServerList()) {
+ throw new IOException("failed to refresh region server list");
+ }
+ synchronized (regionServers) {
+ return new HashSet<>(regionServers);
+ }
}
}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
new file mode 100644
index 0000000..270e9f6
--- /dev/null
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for testing {@link ReplicationTracker} and {@link
ReplicationListener}.
+ */
+public abstract class ReplicationTrackerTestBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationTrackerTestBase.class);
+
+ private ReplicationTracker rt;
+
+ private AtomicInteger rsRemovedCount;
+
+ private volatile ServerName rsRemovedData;
+
+ @Before
+ public void setUp() {
+ ReplicationTrackerParams params = createParams();
+ rt = ReplicationFactory.getReplicationTracker(params);
+ rsRemovedCount = new AtomicInteger(0);
+ rsRemovedData = null;
+ }
+
+ protected abstract ReplicationTrackerParams createParams();
+
+ protected abstract void addServer(ServerName sn) throws Exception;
+
+ protected abstract void removeServer(ServerName sn) throws Exception;
+
+ @Test
+ public void testWatchRegionServers() throws Exception {
+ ServerName sn =
+ ServerName.valueOf("hostname2.example.org,1234," +
EnvironmentEdgeManager.currentTime());
+ addServer(sn);
+ rt.registerListener(new DummyReplicationListener());
+ assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size());
+ // delete one
+ removeServer(sn);
+ // wait for event
+ Waiter.waitFor(HBaseConfiguration.create(), 15000, () ->
rsRemovedCount.get() >= 1);
+ assertEquals(sn, rsRemovedData);
+ }
+
+ private class DummyReplicationListener implements ReplicationListener {
+
+ @Override
+ public void regionServerRemoved(ServerName regionServer) {
+ rsRemovedData = regionServer;
+ rsRemovedCount.getAndIncrement();
+ LOG.debug("Received regionServerRemoved event: " + regionServer);
+ }
+ }
+
+ protected static class WarnOnlyStoppable implements Stoppable {
+
+ @Override
+ public void stop(String why) {
+ LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " +
why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+ }
+
+ protected static class WarnOnlyAbortable implements Abortable {
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " +
why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ }
+}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
new file mode 100644
index 0000000..357908f
--- /dev/null
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestMasterReplicationTracker extends ReplicationTrackerTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMasterReplicationTracker.class);
+
+ private static Configuration CONF;
+
+ private static Connection CONN;
+
+ private static ChoreService CHORE_SERVICE;
+
+ private static List<ServerName> SERVERS = new CopyOnWriteArrayList<>();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws IOException {
+ CONF = HBaseConfiguration.create();
+ CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL,
MasterReplicationTracker.class,
+ ReplicationTracker.class);
+ Admin admin = mock(Admin.class);
+ when(admin.getRegionServers()).thenReturn(SERVERS);
+ CONN = mock(Connection.class);
+ when(CONN.getAdmin()).thenReturn(admin);
+ CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker");
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ CHORE_SERVICE.shutdown();
+ }
+
+ @Override
+ protected ReplicationTrackerParams createParams() {
+ return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
+ .abortable(new
WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE);
+ }
+
+ @Override
+ protected void addServer(ServerName sn) throws Exception {
+ SERVERS.add(sn);
+ }
+
+ @Override
+ protected void removeServer(ServerName sn) throws Exception {
+ SERVERS.remove(sn);
+ }
+}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 77b71ad..a5a4566 100644
---
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -184,7 +188,8 @@ public class TestZKReplicationPeerStorage {
}
}
- @Test public void testBaseReplicationPeerConfig() throws
ReplicationException{
+ @Test
+ public void testBaseReplicationPeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
String customPeerConfigUpdatedValue = "testUpdated";
@@ -227,7 +232,8 @@ public class TestZKReplicationPeerStorage {
getConfiguration().get(customPeerConfigSecondKey));
}
- @Test public void testBaseReplicationRemovePeerConfig() throws
ReplicationException {
+ @Test
+ public void testBaseReplicationRemovePeerConfig() throws
ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
String customPeerConfigValue = "test";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
@@ -256,7 +262,8 @@ public class TestZKReplicationPeerStorage {
assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
}
- @Test public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
+ @Test
+ public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
@@ -271,4 +278,21 @@ public class TestZKReplicationPeerStorage {
updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
}
+
+ @Test
+ public void testPeerNameControl() throws Exception {
+ String clusterKey = "key";
+ STORAGE.addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true);
+
+ try {
+ STORAGE.addPeer("6",
ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
+ true);
+ fail();
+ } catch (ReplicationException e) {
+ assertThat(e.getCause(),
instanceOf(KeeperException.NodeExistsException.class));
+ } finally {
+ // clean up
+ STORAGE.removePeer("6");
+ }
+ }
}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
new file mode 100644
index 0000000..4ef42e3
--- /dev/null
+++
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationTracker extends ReplicationTrackerTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestZKReplicationTracker.class);
+
+ private static Configuration CONF;
+
+ private static HBaseZKTestingUtility UTIL;
+
+ private static ZKWatcher ZKW;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL = new HBaseZKTestingUtility();
+ UTIL.startMiniZKCluster();
+ CONF = UTIL.getConfiguration();
+ CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL,
ZKReplicationTracker.class,
+ ReplicationTracker.class);
+ ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
+ ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
+ ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
+ }
+
+ @Override
+ protected ReplicationTrackerParams createParams() {
+ return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
+ .abortable(new WarnOnlyAbortable()).zookeeper(ZKW);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ Closeables.close(ZKW, true);
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ @Override
+ protected void addServer(ServerName sn) throws Exception {
+ ZKUtil.createAndWatch(ZKW,
ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()),
+ HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ @Override
+ protected void removeServer(ServerName sn) throws Exception {
+ ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode,
sn.toString()));
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 8e1969f..5561384 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -51,6 +51,7 @@ import
org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -311,9 +312,11 @@ public class DumpReplicationQueues extends Configured
implements Tool {
StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw,
getConf());
- replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new
WarnOnlyAbortable(),
- new WarnOnlyStoppable());
- Set<ServerName> liveRegionServers = new
HashSet<>(replicationTracker.getListOfRegionServers());
+ replicationTracker = ReplicationFactory
+ .getReplicationTracker(ReplicationTrackerParams.create(getConf(), new
WarnOnlyStoppable())
+ .abortable(new WarnOnlyAbortable()).zookeeper(zkw));
+ Set<ServerName> liveRegionServers =
+ new
HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners());
// Loops each peer on each RS and dumps the queues
List<ServerName> regionservers = queueStorage.getListOfReplicators();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4ca2c56..12f8ed8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -110,8 +111,10 @@ public class Replication implements
ReplicationSourceService, ReplicationSinkSer
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(),
this.conf);
this.replicationPeers.init();
- this.replicationTracker =
- ReplicationFactory.getReplicationTracker(server.getZooKeeper(),
this.server, this.server);
+ this.replicationTracker = ReplicationFactory
+
.getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(),
server)
+
.abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService())
+ .connection(server.getConnection()));
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9926251..68f6f8b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -262,16 +261,24 @@ public class ReplicationSourceManager implements
ReplicationListener {
server.abort("Failed to get all replicators", e);
return;
}
+ Set<ServerName> liveRegionServers;
+ try {
+ // must call this method to load the first snapshot of live region
servers and initialize
+ // listeners
+ liveRegionServers =
replicationTracker.loadLiveRegionServersAndInitializeListeners();
+ } catch (IOException e) {
+ server.abort("Failed load live region server list for replication", e);
+ return;
+ }
+ LOG.info("Current list of replicators: {}, live RSes: {}",
currentReplicators,
+ liveRegionServers);
if (currentReplicators == null || currentReplicators.isEmpty()) {
return;
}
- List<ServerName> otherRegionServers =
replicationTracker.getListOfRegionServers();
- LOG.info(
- "Current list of replicators: " + currentReplicators + " other RSs: " +
otherRegionServers);
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
- if (!otherRegionServers.contains(rs)) {
+ if (!liveRegionServers.contains(rs)) {
transferQueues(rs);
}
}
@@ -659,8 +666,8 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
@Override
- public void regionServerRemoved(String regionserver) {
- transferQueues(ServerName.valueOf(regionserver));
+ public void regionServerRemoved(ServerName regionserver) {
+ transferQueues(regionserver);
}
/**
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
deleted file mode 100644
index 86743fe..0000000
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class tests the ReplicationTrackerZKImpl class and ReplicationListener
interface. One
- * MiniZKCluster is used throughout the entire class. The cluster is
initialized with the creation
- * of the rsZNode. All other znode creation/initialization is handled by the
replication state
- * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class
should ensure that the
- * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing
but the rsZNode).
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationTrackerZKImpl {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class);
-
- private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class);
-
- private static Configuration conf;
- private static HBaseTestingUtility utility;
-
- // Each one of the below variables are reinitialized before every test case
- private ZKWatcher zkw;
- private ReplicationPeers rp;
- private ReplicationTracker rt;
- private AtomicInteger rsRemovedCount;
- private String rsRemovedData;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- utility = new HBaseTestingUtility();
- utility.startMiniZKCluster();
- conf = utility.getConfiguration();
- ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
- ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
- }
-
- @Before
- public void setUp() throws Exception {
- zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
- String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
- "hostname1.example.org:1234");
- try {
- ZKClusterId.setClusterId(zkw, new ClusterId());
- rp = ReplicationFactory.getReplicationPeers(zkw, conf);
- rp.init();
- rt = ReplicationFactory.getReplicationTracker(zkw, new
DummyServer(fakeRs1),
- new DummyServer(fakeRs1));
- } catch (Exception e) {
- fail("Exception during test setup: " + e);
- }
- rsRemovedCount = new AtomicInteger(0);
- rsRemovedData = "";
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- utility.shutdownMiniZKCluster();
- }
-
- @Test
- public void testGetListOfRegionServers() throws Exception {
- // 0 region servers
- assertEquals(0, rt.getListOfRegionServers().size());
-
- // 1 region server
- ZKUtil.createWithParents(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
- "hostname1.example.org,1234,1611218678009"));
- List<ServerName> rss = rt.getListOfRegionServers();
- assertEquals(rss.toString(), 1, rss.size());
-
- // 2 region servers
- ZKUtil.createWithParents(zkw,
ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
- "hostname2.example.org,1234,1611218678009"));
- rss = rt.getListOfRegionServers();
- assertEquals(rss.toString(), 2, rss.size());
-
- // 1 region server
- ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
- "hostname2.example.org,1234,1611218678009"));
- rss = rt.getListOfRegionServers();
- assertEquals(1, rss.size());
-
- // 0 region server
- ZKUtil.deleteNode(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
- "hostname1.example.org,1234,1611218678009"));
- rss = rt.getListOfRegionServers();
- assertEquals(rss.toString(), 0, rss.size());
- }
-
- @Test
- public void testRegionServerRemovedEvent() throws Exception {
- ZKUtil.createAndWatch(zkw,
- ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname2.example.org:1234"),
- HConstants.EMPTY_BYTE_ARRAY);
- rt.registerListener(new DummyReplicationListener());
- // delete one
- ZKUtil.deleteNode(zkw,
- ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
"hostname2.example.org:1234"));
- // wait for event
- while (rsRemovedCount.get() < 1) {
- Thread.sleep(5);
- }
- assertEquals("hostname2.example.org:1234", rsRemovedData);
- }
-
- @Test
- public void testPeerNameControl() throws Exception {
- int exists = 0;
- rp.getPeerStorage().addPeer("6",
-
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
true);
-
- try {
- rp.getPeerStorage().addPeer("6",
-
ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(),
true);
- } catch (ReplicationException e) {
- if (e.getCause() instanceof KeeperException.NodeExistsException) {
- exists++;
- }
- }
-
- assertEquals(1, exists);
-
- // clean up
- rp.getPeerStorage().removePeer("6");
- }
-
- private class DummyReplicationListener implements ReplicationListener {
-
- @Override
- public void regionServerRemoved(String regionServer) {
- rsRemovedData = regionServer;
- rsRemovedCount.getAndIncrement();
- LOG.debug("Received regionServerRemoved event: " + regionServer);
- }
- }
-
- private class DummyServer implements Server {
- private String serverName;
- private boolean isAborted = false;
- private boolean isStopped = false;
-
- public DummyServer(String serverName) {
- this.serverName = serverName;
- }
-
- @Override
- public Configuration getConfiguration() {
- return conf;
- }
-
- @Override
- public ZKWatcher getZooKeeper() {
- return zkw;
- }
-
- @Override
- public CoordinatedStateManager getCoordinatedStateManager() {
- return null;
- }
-
- @Override
- public ClusterConnection getConnection() {
- return null;
- }
-
- @Override
- public ServerName getServerName() {
- return ServerName.valueOf(this.serverName);
- }
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.info("Aborting " + serverName);
- this.isAborted = true;
- }
-
- @Override
- public boolean isAborted() {
- return this.isAborted;
- }
-
- @Override
- public void stop(String why) {
- this.isStopped = true;
- }
-
- @Override
- public boolean isStopped() {
- return this.isStopped;
- }
-
- @Override
- public ChoreService getChoreService() {
- return null;
- }
-
- @Override
- public ClusterConnection getClusterConnection() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public FileSystem getFileSystem() {
- return null;
- }
-
- @Override
- public boolean isStopping() {
- return false;
- }
-
- @Override
- public Connection createConnection(Configuration conf) throws IOException {
- return null;
- }
- }
-}