This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 748e369 HBASE-26029 It is not reliable to use nodeDeleted event to
track region server's death (#3430)
748e369 is described below
commit 748e369629bca0f68cc259d5269aa2adfe62ff22
Author: Duo Zhang <[email protected]>
AuthorDate: Wed Jun 30 08:44:19 2021 +0800
HBASE-26029 It is not reliable to use nodeDeleted event to track region
server's death (#3430)
Signed-off-by: Xin Sun <[email protected]>
---
.../src/main/protobuf/MasterProcedure.proto | 25 +-
.../replication/MasterReplicationTracker.java | 103 -------
.../hbase/replication/ReplicationFactory.java | 7 -
.../hbase/replication/ReplicationTracker.java | 61 ----
.../hbase/replication/ReplicationTrackerBase.java | 72 -----
.../replication/ReplicationTrackerParams.java | 98 -------
.../hbase/replication/ZKReplicationTracker.java | 175 ------------
.../replication/ReplicationTrackerTestBase.java | 110 --------
.../replication/TestMasterReplicationTracker.java | 87 ------
.../replication/TestZKReplicationTracker.java | 84 ------
.../apache/hadoop/hbase/executor/EventType.java | 9 +-
.../apache/hadoop/hbase/executor/ExecutorType.java | 3 +-
.../master/procedure/ServerCrashProcedure.java | 7 +-
.../master/procedure/ServerProcedureInterface.java | 16 +-
.../hadoop/hbase/master/procedure/ServerQueue.java | 2 +
.../ClaimReplicationQueueRemoteProcedure.java | 127 +++++++++
.../ClaimReplicationQueuesProcedure.java | 147 ++++++++++
.../BaseRSProcedureCallable.java} | 49 ++--
.../hadoop/hbase/regionserver/HRegionServer.java | 4 +
.../hbase/regionserver/SplitWALCallable.java | 31 +--
...ble.java => ClaimReplicationQueueCallable.java} | 47 ++--
.../regionserver/DumpReplicationQueues.java | 28 +-
.../regionserver/PeerProcedureHandler.java | 5 +-
.../regionserver/PeerProcedureHandlerImpl.java | 7 +
.../regionserver/RefreshPeerCallable.java | 29 +-
.../replication/regionserver/Replication.java | 12 +-
.../regionserver/ReplicationSourceManager.java | 307 +++++++--------------
.../regionserver/ReplicationSyncUp.java | 35 ++-
.../SwitchRpcThrottleRemoteCallable.java | 27 +-
.../replication/TestClaimReplicationQueue.java | 165 +++++++++++
.../hbase/replication/TestReplicationBase.java | 78 +++---
.../regionserver/TestReplicationSourceManager.java | 8 +-
32 files changed, 771 insertions(+), 1194 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index e060077..9511ee4 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -312,8 +312,9 @@ enum ServerCrashState {
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
SERVER_CRASH_SPLIT_META_LOGS = 10;
SERVER_CRASH_ASSIGN_META = 11;
- SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12;
- SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13;
+ SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR = 12;
+ SERVER_CRASH_DELETE_SPLIT_WALS_DIR = 13;
+ SERVER_CRASH_CLAIM_REPLICATION_QUEUES = 14;
SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
SERVER_CRASH_FINISH = 100;
}
@@ -561,3 +562,23 @@ enum SplitWALState {
DISPATCH_WAL_TO_WORKER = 2;
RELEASE_SPLIT_WORKER = 3;
}
+
+message ClaimReplicationQueuesStateData {
+ required ServerName crashed_server = 1;
+}
+
+message ClaimReplicationQueueRemoteStateData {
+ required ServerName crashed_server = 1;
+ required string queue = 2;
+ required ServerName target_server = 3;
+}
+
+message ClaimReplicationQueueRemoteParameter {
+ required ServerName crashed_server = 1;
+ required string queue = 2;
+}
+
+enum ClaimReplicationQueuesState {
+ CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
+ CLAIM_REPLICATION_QUEUES_FINISH = 2;
+}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
deleted file mode 100644
index c55a82e..0000000
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/MasterReplicationTracker.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link ReplicationTracker} implementation which polls the region servers
list periodically from
- * master.
- */
[email protected]
-class MasterReplicationTracker extends ReplicationTrackerBase {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MasterReplicationTracker.class);
-
- static final String REFRESH_INTERVAL_SECONDS =
- "hbase.replication.tracker.master.refresh.interval.secs";
-
- // default to refresh every 5 seconds
- static final int REFRESH_INTERVAL_SECONDS_DEFAULT = 5;
-
- private final ChoreService choreService;
-
- private final ScheduledChore chore;
-
- private final Admin admin;
-
- private volatile Set<ServerName> regionServers;
-
- MasterReplicationTracker(ReplicationTrackerParams params) {
- try {
- this.admin = params.connection().getAdmin();
- } catch (IOException e) {
- // should not happen
- throw new AssertionError(e);
- }
- this.choreService = params.choreService();
- int refreshIntervalSecs =
- params.conf().getInt(REFRESH_INTERVAL_SECONDS,
REFRESH_INTERVAL_SECONDS_DEFAULT);
- this.chore = new ScheduledChore(getClass().getSimpleName(),
params.stopptable(),
- refreshIntervalSecs, 0, TimeUnit.SECONDS) {
-
- @Override
- protected void chore() {
- try {
- refresh();
- } catch (IOException e) {
- LOG.warn("failed to refresh region server list for replication", e);
- }
- }
- };
- }
-
- private Set<ServerName> reload() throws IOException {
- return Collections.unmodifiableSet(new
HashSet<>(admin.getRegionServers()));
-
- }
-
- private void refresh() throws IOException {
- Set<ServerName> newRegionServers = reload();
- for (ServerName oldRs : regionServers) {
- if (!newRegionServers.contains(oldRs)) {
- notifyListeners(oldRs);
- }
- }
- this.regionServers = newRegionServers;
- }
-
- @Override
- protected Set<ServerName>
internalLoadLiveRegionServersAndInitializeListeners()
- throws IOException {
- Set<ServerName> newRegionServers = reload();
- this.regionServers = newRegionServers;
- choreService.scheduleChore(chore);
- return newRegionServers;
- }
-}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index b4d33d6..8342160 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -36,10 +35,4 @@ public final class ReplicationFactory {
public static ReplicationPeers getReplicationPeers(ZKWatcher zk,
Configuration conf) {
return new ReplicationPeers(zk, conf);
}
-
- public static ReplicationTracker
getReplicationTracker(ReplicationTrackerParams params) {
- Class<? extends ReplicationTracker> clazz =
params.conf().getClass(REPLICATION_TRACKER_IMPL,
- ZKReplicationTracker.class, ReplicationTracker.class);
- return ReflectionUtils.newInstance(clazz, params);
- }
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
deleted file mode 100644
index 1d59401..0000000
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.Set;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This is the interface for a Replication Tracker.
- * <p/>
- * A replication tracker provides the facility to subscribe and track events
that reflect a change
- * in replication state. These events are used by the ReplicationSourceManager
to coordinate
- * replication tasks such as addition/deletion of queues and queue failover.
These events are
- * defined in the ReplicationListener interface. If a class would like to
listen to replication
- * events it must implement the ReplicationListener interface and register
itself with a Replication
- * Tracker.
- */
[email protected]
-public interface ReplicationTracker {
-
- /**
- * Register a replication listener to receive replication events.
- * @param listener
- */
- void registerListener(ReplicationListener listener);
-
- /**
- * Remove a replication listener
- * @param listener the listener to remove
- */
- void removeListener(ReplicationListener listener);
-
- /**
- * In this method, you need to load the newest list of region server list
and return it, and all
- * later changes to the region server list must be passed to the listeners.
- * <p/>
- * This is very important for us to not miss a region server crash.
- * <p/>
- * Notice that this method can only be called once.
- * @return Set of region servers.
- */
- Set<ServerName> loadLiveRegionServersAndInitializeListeners() throws
IOException;
-}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
deleted file mode 100644
index 96a3061..0000000
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerBase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base implementation class for replication tracker.
- */
[email protected]
-abstract class ReplicationTrackerBase implements ReplicationTracker {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ReplicationTrackerBase.class);
-
- // listeners to be notified
- private final List<ReplicationListener> listeners = new
CopyOnWriteArrayList<>();
-
- private final AtomicBoolean initialized = new AtomicBoolean(false);
-
- @Override
- public void registerListener(ReplicationListener listener) {
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(ReplicationListener listener) {
- listeners.remove(listener);
- }
-
- protected final void notifyListeners(ServerName regionServer) {
- LOG.info("{} is dead, triggering replicatorRemoved event", regionServer);
- for (ReplicationListener listener : listeners) {
- listener.regionServerRemoved(regionServer);
- }
- }
-
- @Override
- public final Set<ServerName> loadLiveRegionServersAndInitializeListeners()
throws IOException {
- if (!initialized.compareAndSet(false, true)) {
- throw new IllegalStateException(
- "loadLiveRegionServersAndInitializeListeners can only be called once");
- }
- return internalLoadLiveRegionServersAndInitializeListeners();
- }
-
- protected abstract Set<ServerName>
internalLoadLiveRegionServersAndInitializeListeners()
- throws IOException;
-
-}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
deleted file mode 100644
index 9aeedcf..0000000
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerParams.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Parameters for constructing a {@link ReplicationTracker}.
- */
[email protected]
-public final class ReplicationTrackerParams {
-
- private final Configuration conf;
-
- private final Stoppable stopptable;
-
- private ZKWatcher zookeeper;
-
- private Abortable abortable;
-
- private Connection conn;
-
- private ChoreService choreService;
-
- private ReplicationTrackerParams(Configuration conf, Stoppable stopptable) {
- this.conf = conf;
- this.stopptable = stopptable;
- }
-
- public ReplicationTrackerParams zookeeper(ZKWatcher zookeeper) {
- this.zookeeper = zookeeper;
- return this;
- }
-
- public ReplicationTrackerParams abortable(Abortable abortable) {
- this.abortable = abortable;
- return this;
- }
-
- public ReplicationTrackerParams connection(Connection conn) {
- this.conn = conn;
- return this;
- }
-
- public ReplicationTrackerParams choreService(ChoreService choreService) {
- this.choreService = choreService;
- return this;
- }
-
- public Configuration conf() {
- return conf;
- }
-
- public Stoppable stopptable() {
- return stopptable;
- }
-
- public ZKWatcher zookeeper() {
- return zookeeper;
- }
-
- public Abortable abortable() {
- return abortable;
- }
-
- public Connection connection() {
- return conn;
- }
-
- public ChoreService choreService() {
- return choreService;
- }
-
- public static ReplicationTrackerParams create(Configuration conf, Stoppable
stopptable) {
- return new ReplicationTrackerParams(conf, stopptable);
- }
-}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
deleted file mode 100644
index b74187a..0000000
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationTracker.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
-
-/**
- * This class is a ZooKeeper implementation of the ReplicationTracker
interface. This class is
- * responsible for handling replication events that are defined in the
ReplicationListener
- * interface.
- */
[email protected]
-class ZKReplicationTracker extends ReplicationTrackerBase {
-
- // Zookeeper
- private final ZKWatcher zookeeper;
- // Server to abort.
- private final Abortable abortable;
- // All about stopping
- private final Stoppable stopper;
- // List of all the other region servers in this cluster
- private final Set<ServerName> regionServers = new HashSet<>();
-
- ZKReplicationTracker(ReplicationTrackerParams params) {
- this.zookeeper = params.zookeeper();
- this.abortable = params.abortable();
- this.stopper = params.stopptable();
- this.zookeeper.registerListener(new
OtherRegionServerWatcher(this.zookeeper));
- }
-
- /**
- * Watcher used to be notified of the other region server's death in the
local cluster. It
- * initiates the process to transfer the queues if it is able to grab the
lock.
- */
- public class OtherRegionServerWatcher extends ZKListener {
-
- /**
- * Construct a ZooKeeper event listener.
- */
- public OtherRegionServerWatcher(ZKWatcher watcher) {
- super(watcher);
- }
-
- /**
- * Called when a new node has been created.
- * @param path full path of the new node
- */
- @Override
- public void nodeCreated(String path) {
- if (stopper.isStopped()) {
- return;
- }
- refreshListIfRightPath(path);
- }
-
- /**
- * Called when a node has been deleted
- * @param path full path of the deleted node
- */
- @Override
- public void nodeDeleted(String path) {
- if (stopper.isStopped()) {
- return;
- }
- if (!refreshListIfRightPath(path)) {
- return;
- }
- notifyListeners(ServerName.valueOf(getZNodeName(path)));
- }
-
- /**
- * Called when an existing node has a child node added or removed.
- * @param path full path of the node whose children have changed
- */
- @Override
- public void nodeChildrenChanged(String path) {
- if (stopper.isStopped()) {
- return;
- }
- refreshListIfRightPath(path);
- }
-
- private boolean refreshListIfRightPath(String path) {
- if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
- return false;
- }
- return refreshRegionServerList();
- }
- }
-
- /**
- * Extracts the znode name of a peer cluster from a ZK path
- * @param fullPath Path to extract the id from
- * @return the id or an empty string if path is invalid
- */
- private String getZNodeName(String fullPath) {
- List<String> parts = Splitter.on('/').splitToList(fullPath);
- return parts.size() > 0 ? parts.get(parts.size() - 1) : "";
- }
-
- /**
- * Reads the list of region servers from ZK and atomically clears our local
view of it and
- * replaces it with the updated list.
- * @return true if the local list of the other region servers was updated
with the ZK data (even
- * if it was empty), false if the data was missing in ZK
- */
- private boolean refreshRegionServerList() {
- Set<ServerName> newRsList = getRegisteredRegionServers();
- if (newRsList == null) {
- return false;
- } else {
- synchronized (regionServers) {
- regionServers.clear();
- regionServers.addAll(newRsList);
- }
- }
- return true;
- }
-
- /**
- * Get a list of all the other region servers in this cluster and set a watch
- * @return a list of server nanes
- */
- private Set<ServerName> getRegisteredRegionServers() {
- List<String> result = null;
- try {
- result =
- ZKUtil.listChildrenAndWatchThem(this.zookeeper,
this.zookeeper.getZNodePaths().rsZNode);
- } catch (KeeperException e) {
- this.abortable.abort("Get list of registered region servers", e);
- }
- return result == null ? null :
-
result.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
- }
-
- @Override
- protected Set<ServerName>
internalLoadLiveRegionServersAndInitializeListeners()
- throws IOException {
- if (!refreshRegionServerList()) {
- throw new IOException("failed to refresh region server list");
- }
- synchronized (regionServers) {
- return new HashSet<>(regionServers);
- }
- }
-}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
deleted file mode 100644
index 270e9f6..0000000
---
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/ReplicationTrackerTestBase.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for testing {@link ReplicationTracker} and {@link
ReplicationListener}.
- */
-public abstract class ReplicationTrackerTestBase {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ReplicationTrackerTestBase.class);
-
- private ReplicationTracker rt;
-
- private AtomicInteger rsRemovedCount;
-
- private volatile ServerName rsRemovedData;
-
- @Before
- public void setUp() {
- ReplicationTrackerParams params = createParams();
- rt = ReplicationFactory.getReplicationTracker(params);
- rsRemovedCount = new AtomicInteger(0);
- rsRemovedData = null;
- }
-
- protected abstract ReplicationTrackerParams createParams();
-
- protected abstract void addServer(ServerName sn) throws Exception;
-
- protected abstract void removeServer(ServerName sn) throws Exception;
-
- @Test
- public void testWatchRegionServers() throws Exception {
- ServerName sn =
- ServerName.valueOf("hostname2.example.org,1234," +
EnvironmentEdgeManager.currentTime());
- addServer(sn);
- rt.registerListener(new DummyReplicationListener());
- assertEquals(1, rt.loadLiveRegionServersAndInitializeListeners().size());
- // delete one
- removeServer(sn);
- // wait for event
- Waiter.waitFor(HBaseConfiguration.create(), 15000, () ->
rsRemovedCount.get() >= 1);
- assertEquals(sn, rsRemovedData);
- }
-
- private class DummyReplicationListener implements ReplicationListener {
-
- @Override
- public void regionServerRemoved(ServerName regionServer) {
- rsRemovedData = regionServer;
- rsRemovedCount.getAndIncrement();
- LOG.debug("Received regionServerRemoved event: " + regionServer);
- }
- }
-
- protected static class WarnOnlyStoppable implements Stoppable {
-
- @Override
- public void stop(String why) {
- LOG.warn("TestReplicationTracker received stop, ignoring. Reason: " +
why);
- }
-
- @Override
- public boolean isStopped() {
- return false;
- }
- }
-
- protected static class WarnOnlyAbortable implements Abortable {
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.warn("TestReplicationTracker received abort, ignoring. Reason: " +
why);
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- }
-}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
deleted file mode 100644
index 357908f..0000000
---
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplicationTracker.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestMasterReplicationTracker extends ReplicationTrackerTestBase {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMasterReplicationTracker.class);
-
- private static Configuration CONF;
-
- private static Connection CONN;
-
- private static ChoreService CHORE_SERVICE;
-
- private static List<ServerName> SERVERS = new CopyOnWriteArrayList<>();
-
- @BeforeClass
- public static void setUpBeforeClass() throws IOException {
- CONF = HBaseConfiguration.create();
- CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL,
MasterReplicationTracker.class,
- ReplicationTracker.class);
- Admin admin = mock(Admin.class);
- when(admin.getRegionServers()).thenReturn(SERVERS);
- CONN = mock(Connection.class);
- when(CONN.getAdmin()).thenReturn(admin);
- CHORE_SERVICE = new ChoreService("TestMasterReplicationTracker");
- }
-
- @AfterClass
- public static void tearDownAfterClass() {
- CHORE_SERVICE.shutdown();
- }
-
- @Override
- protected ReplicationTrackerParams createParams() {
- return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
- .abortable(new
WarnOnlyAbortable()).connection(CONN).choreService(CHORE_SERVICE);
- }
-
- @Override
- protected void addServer(ServerName sn) throws Exception {
- SERVERS.add(sn);
- }
-
- @Override
- protected void removeServer(ServerName sn) throws Exception {
- SERVERS.remove(sn);
- }
-}
diff --git
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
deleted file mode 100644
index 4ef42e3..0000000
---
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationTracker.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationTracker extends ReplicationTrackerTestBase {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestZKReplicationTracker.class);
-
- private static Configuration CONF;
-
- private static HBaseZKTestingUtility UTIL;
-
- private static ZKWatcher ZKW;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- UTIL = new HBaseZKTestingUtility();
- UTIL.startMiniZKCluster();
- CONF = UTIL.getConfiguration();
- CONF.setClass(ReplicationFactory.REPLICATION_TRACKER_IMPL,
ZKReplicationTracker.class,
- ReplicationTracker.class);
- ZKWatcher zk = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
- ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
- ZKW = HBaseZKTestingUtility.getZooKeeperWatcher(UTIL);
- }
-
- @Override
- protected ReplicationTrackerParams createParams() {
- return ReplicationTrackerParams.create(CONF, new WarnOnlyStoppable())
- .abortable(new WarnOnlyAbortable()).zookeeper(ZKW);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- Closeables.close(ZKW, true);
- UTIL.shutdownMiniZKCluster();
- }
-
- @Override
- protected void addServer(ServerName sn) throws Exception {
- ZKUtil.createAndWatch(ZKW,
ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode, sn.toString()),
- HConstants.EMPTY_BYTE_ARRAY);
- }
-
- @Override
- protected void removeServer(ServerName sn) throws Exception {
- ZKUtil.deleteNode(ZKW, ZNodePaths.joinZNode(ZKW.getZNodePaths().rsZNode,
sn.toString()));
- }
-}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 32993f5..3d1fa39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -287,7 +287,14 @@ public enum EventType {
*
* RS_REFRESH_PEER
*/
- RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
+ RS_REFRESH_PEER(84, ExecutorType.RS_REFRESH_PEER),
+
+ /**
+ * RS claim replication queue.<br>
+ *
+ * RS_CLAIM_REPLICATION_QUEUE
+ */
+ RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
private final int code;
private final ExecutorType executor;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index ca82d71..ea83122 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -51,7 +51,8 @@ public enum ExecutorType {
RS_OPEN_PRIORITY_REGION (30),
RS_REFRESH_PEER(31),
RS_SWITCH_RPC_THROTTLE(33),
- RS_IN_MEMORY_COMPACTION(34);
+ RS_IN_MEMORY_COMPACTION(34),
+ RS_CLAIM_REPLICATION_QUEUE(35);
ExecutorType(int value) {
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index c3e3f53..462a389 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.SplitWALManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import
org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -235,11 +236,15 @@ public class ServerCrashProcedure
}
assignRegions(env, regionsOnCrashedServer);
}
- setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+ setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
break;
case SERVER_CRASH_HANDLE_RIT2:
// Noop. Left in place because we used to call handleRIT here for a
second time
// but no longer necessary since HBASE-20634.
+ setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
+ break;
+ case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
+ addChildProcedure(new ClaimReplicationQueuesProcedure(serverName));
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
case SERVER_CRASH_FINISH:
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index 8162269..a7abfdc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -27,7 +27,9 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface ServerProcedureInterface {
public enum ServerOperationType {
- CRASH_HANDLER, SWITCH_RPC_THROTTLE,
+ CRASH_HANDLER,
+ SWITCH_RPC_THROTTLE,
+
/**
* help find a available region server as worker and release worker after
task done invoke
* SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker
manage the split wal
@@ -38,7 +40,17 @@ public interface ServerProcedureInterface {
/**
* send the split WAL request to region server and handle the response
*/
- SPLIT_WAL_REMOTE
+ SPLIT_WAL_REMOTE,
+
+ /**
+ * Get all the replication queues of a crash server and assign them to
other region servers
+ */
+ CLAIM_REPLICATION_QUEUES,
+
+ /**
+ * send the claim replication queue request to region server to actually
assign it
+ */
+ CLAIM_REPLICATION_QUEUE_REMOTE
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 1659ab5..726ee14 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -38,6 +38,8 @@ class ServerQueue extends Queue<ServerName> {
case SWITCH_RPC_THROTTLE:
case SPLIT_WAL:
case SPLIT_WAL_REMOTE:
+ case CLAIM_REPLICATION_QUEUES:
+ case CLAIM_REPLICATION_QUEUE_REMOTE:
return false;
default:
break;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
new file mode 100644
index 0000000..c8c5704
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import
org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;
+
[email protected]
+public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
+ implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv,
ServerName> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class);
+
+ private ServerName crashedServer;
+
+ private String queue;
+
+ public ClaimReplicationQueueRemoteProcedure() {
+ }
+
+ public ClaimReplicationQueueRemoteProcedure(ServerName crashedServer, String
queue,
+ ServerName targetServer) {
+ this.crashedServer = crashedServer;
+ this.queue = queue;
+ this.targetServer = targetServer;
+ }
+
+ @Override
+ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName remote) {
+ assert targetServer.equals(remote);
+ return Optional.of(new ServerOperation(this, getProcId(),
ClaimReplicationQueueCallable.class,
+ ClaimReplicationQueueRemoteParameter.newBuilder()
+
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build()
+ .toByteArray()));
+ }
+
+ @Override
+ public ServerName getServerName() {
+ // return crashed server here, as we are going to recover its replication
queues so we should
+ // use its scheduler queue instead of the one for the target server.
+ return crashedServer;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return false;
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE;
+ }
+
+ @Override
+ protected void complete(MasterProcedureEnv env, Throwable error) {
+ if (error != null) {
+ LOG.warn("Failed to claim replication queue {} of crashed server on
server {} ", queue,
+ crashedServer, targetServer, error);
+ this.succ = false;
+ } else {
+ this.succ = true;
+ }
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException,
InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected boolean waitInitialized(MasterProcedureEnv env) {
+ return env.waitInitialized(this);
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ serializer.serialize(ClaimReplicationQueueRemoteStateData.newBuilder()
+
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue)
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ ClaimReplicationQueueRemoteStateData data =
+ serializer.deserialize(ClaimReplicationQueueRemoteStateData.class);
+ crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+ queue = data.getQueue();
+ targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
new file mode 100644
index 0000000..5a35c3f
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueuesProcedure.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * Used to assign the replication queues of a dead server to other region
servers.
+ */
[email protected]
+public class ClaimReplicationQueuesProcedure extends
Procedure<MasterProcedureEnv>
+ implements ServerProcedureInterface {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class);
+
+ private ServerName crashedServer;
+
+ private RetryCounter retryCounter;
+
+ public ClaimReplicationQueuesProcedure() {
+ }
+
+ public ClaimReplicationQueuesProcedure(ServerName crashedServer) {
+ this.crashedServer = crashedServer;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return crashedServer;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return false;
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return ServerOperationType.CLAIM_REPLICATION_QUEUES;
+ }
+
+ @Override
+ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException,
InterruptedException {
+ ReplicationQueueStorage storage =
env.getReplicationPeerManager().getQueueStorage();
+ try {
+ List<String> queues = storage.getAllQueues(crashedServer);
+ if (queues.isEmpty()) {
+ LOG.debug("Finish claiming replication queues for {}", crashedServer);
+ storage.removeReplicatorIfQueueIsEmpty(crashedServer);
+ // we are done
+ return null;
+ }
+ LOG.debug("There are {} replication queues need to be claimed for {}",
queues.size(),
+ crashedServer);
+ List<ServerName> targetServers =
+ env.getMasterServices().getServerManager().getOnlineServersList();
+ if (targetServers.isEmpty()) {
+ throw new ReplicationException("no region server available");
+ }
+ Collections.shuffle(targetServers);
+ ClaimReplicationQueueRemoteProcedure[] procs =
+ new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(),
targetServers.size())];
+ for (int i = 0; i < procs.length; i++) {
+ procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer,
queues.get(i),
+ targetServers.get(i));
+ }
+ return procs;
+ } catch (ReplicationException e) {
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("Failed to claim replication queues for {}, suspend {}secs {};
{};", crashedServer,
+ backoff / 1000, e);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureProtos.ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException,
InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ serializer.serialize(ClaimReplicationQueuesStateData.newBuilder()
+ .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ ClaimReplicationQueuesStateData data =
+ serializer.deserialize(ClaimReplicationQueuesStateData.class);
+ crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
similarity index 55%
copy from
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
copy to
hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
index 65da9af..76b749e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
@@ -15,27 +15,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.hbase.procedure2;
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;
-/**
- * A handler for modifying replication peer in peer procedures.
- */
@InterfaceAudience.Private
-public interface PeerProcedureHandler {
-
- public void addPeer(String peerId) throws ReplicationException, IOException;
-
- public void removePeer(String peerId) throws ReplicationException,
IOException;
-
- public void disablePeer(String peerId) throws ReplicationException,
IOException;
-
- public void enablePeer(String peerId) throws ReplicationException,
IOException;
-
- public void updatePeerConfig(String peerId) throws ReplicationException,
IOException;
+public abstract class BaseRSProcedureCallable implements RSProcedureCallable {
+
+ protected HRegionServer rs;
+
+ private Exception initError;
+
+ @Override
+ public final Void call() throws Exception {
+ if (initError != null) {
+ throw initError;
+ }
+ doCall();
+ return null;
+ }
+
+ @Override
+ public final void init(byte[] parameter, HRegionServer rs) {
+ this.rs = rs;
+ try {
+ initParameter(parameter);
+ } catch (Exception e) {
+ initError = e;
+ }
+ }
+
+ protected abstract void doCall() throws Exception;
+
+ protected abstract void initParameter(byte[] parameter) throws Exception;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6a78c9d..fb8e042 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2092,6 +2092,10 @@ public class HRegionServer extends Thread implements
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads",
1);
executorService.startExecutorService(executorService.new
ExecutorConfig().setExecutorType(
ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
+ final int claimReplicationQueueThreads =
+
conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
+ executorService.startExecutorService(executorService.new
ExecutorConfig().setExecutorType(
+
ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads));
Threads
.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
index 03f2061..319ecca 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
/**
@@ -43,27 +43,18 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
* we switch to procedure-based WAL splitting.
*/
@InterfaceAudience.Private
-public class SplitWALCallable implements RSProcedureCallable {
- private static final Logger LOG =
LoggerFactory.getLogger(SplitWALCallable.class);
+public class SplitWALCallable extends BaseRSProcedureCallable {
private String walPath;
- private Exception initError;
- private HRegionServer rs;
private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
private volatile Lock splitWALLock = null;
@Override
- public void init(byte[] parameter, HRegionServer rs) {
- try {
- this.rs = rs;
- MasterProcedureProtos.SplitWALParameter param =
- MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
- this.walPath = param.getWalPath();
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Parse proto buffer of split WAL request failed ", e);
- initError = e;
- }
+ protected void initParameter(byte[] parameter) throws
InvalidProtocolBufferException {
+ MasterProcedureProtos.SplitWALParameter param =
+ MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
+ this.walPath = param.getWalPath();
}
@Override
@@ -90,10 +81,7 @@ public class SplitWALCallable implements RSProcedureCallable
{
}
@Override
- public Void call() throws Exception {
- if (initError != null) {
- throw initError;
- }
+ protected void doCall() throws Exception {
//grab a lock
splitWALLock = splitWALLocks.acquireLock(walPath);
try {
@@ -110,7 +98,6 @@ public class SplitWALCallable implements RSProcedureCallable
{
} finally {
splitWALLock.unlock();
}
- return null;
}
public String getWalPath() {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
similarity index 55%
copy from
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
copy to
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
index b2e698f..ddae731 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
@@ -17,46 +17,39 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.yetus.audience.InterfaceAudience;
import
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
-/**
- * The callable executed at RS side to switch rpc throttle state. <br/>
- */
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
+
@InterfaceAudience.Private
-public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
- private HRegionServer rs;
- private boolean rpcThrottleEnabled;
- private Exception initError;
+public class ClaimReplicationQueueCallable extends BaseRSProcedureCallable {
+
+ private ServerName crashedServer;
+
+ private String queue;
@Override
- public Void call() throws Exception {
- if (initError != null) {
- throw initError;
- }
- rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
- return null;
+ public EventType getEventType() {
+ return EventType.RS_CLAIM_REPLICATION_QUEUE;
}
@Override
- public void init(byte[] parameter, HRegionServer rs) {
- this.rs = rs;
- try {
- SwitchRpcThrottleRemoteStateData param =
- SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
- rpcThrottleEnabled = param.getRpcThrottleEnabled();
- } catch (InvalidProtocolBufferException e) {
- initError = e;
- }
+ protected void doCall() throws Exception {
+ PeerProcedureHandler handler =
rs.getReplicationSourceService().getPeerProcedureHandler();
+ handler.claimReplicationQueue(crashedServer, queue);
}
@Override
- public EventType getEventType() {
- return EventType.M_RS_SWITCH_RPC_THROTTLE;
+ protected void initParameter(byte[] parameter) throws
InvalidProtocolBufferException {
+ ClaimReplicationQueueRemoteParameter param =
+ ClaimReplicationQueueRemoteParameter.parseFrom(parameter);
+ crashedServer = ProtobufUtil.toServerName(param.getCrashedServer());
+ queue = param.getQueue();
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 5561384..cc0482d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -44,14 +42,11 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -305,18 +300,13 @@ public class DumpReplicationQueues extends Configured
implements Tool {
return sb.toString();
}
- public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
- boolean hdfs) throws Exception {
+ public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs)
throws Exception {
ReplicationQueueStorage queueStorage;
- ReplicationTracker replicationTracker;
StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw,
getConf());
- replicationTracker = ReplicationFactory
- .getReplicationTracker(ReplicationTrackerParams.create(getConf(), new
WarnOnlyStoppable())
- .abortable(new WarnOnlyAbortable()).zookeeper(zkw));
- Set<ServerName> liveRegionServers =
- new
HashSet<>(replicationTracker.loadLiveRegionServersAndInitializeListeners());
+ Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
zkw.getZNodePaths().rsZNode)
+ .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
// Loops each peer on each RS and dumps the queues
List<ServerName> regionservers = queueStorage.getListOfReplicators();
@@ -419,16 +409,4 @@ public class DumpReplicationQueues extends Configured
implements Tool {
return false;
}
}
-
- private static class WarnOnlyStoppable implements Stoppable {
- @Override
- public void stop(String why) {
- LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " +
why);
- }
-
- @Override
- public boolean isStopped() {
- return false;
- }
- }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index 65da9af..a96e860 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -38,4 +38,7 @@ public interface PeerProcedureHandler {
public void enablePeer(String peerId) throws ReplicationException,
IOException;
public void updatePeerConfig(String peerId) throws ReplicationException,
IOException;
+
+ void claimReplicationQueue(ServerName crashedServer, String queue)
+ throws ReplicationException, IOException;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 78c1977..f12a654 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -132,4 +133,10 @@ public class PeerProcedureHandlerImpl implements
PeerProcedureHandler {
peerLock.unlock();
}
}
+
+ @Override
+ public void claimReplicationQueue(ServerName crashedServer, String queue)
+ throws ReplicationException, IOException {
+ replicationSourceManager.claimQueue(crashedServer, queue);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index 91fee44..2beb9f5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,24 +32,16 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
* The callable executed at RS side to refresh the peer config/state. <br/>
*/
@InterfaceAudience.Private
-public class RefreshPeerCallable implements RSProcedureCallable {
+public class RefreshPeerCallable extends BaseRSProcedureCallable {
private static final Logger LOG =
LoggerFactory.getLogger(RefreshPeerCallable.class);
- private HRegionServer rs;
-
private String peerId;
private PeerModificationType type;
- private Exception initError;
-
@Override
- public Void call() throws Exception {
- if (initError != null) {
- throw initError;
- }
-
+ protected void doCall() throws Exception {
LOG.info("Received a peer change event, peerId=" + peerId + ", type=" +
type);
PeerProcedureHandler handler =
rs.getReplicationSourceService().getPeerProcedureHandler();
switch (type) {
@@ -72,19 +63,13 @@ public class RefreshPeerCallable implements
RSProcedureCallable {
default:
throw new IllegalArgumentException("Unknown peer modification type: "
+ type);
}
- return null;
}
@Override
- public void init(byte[] parameter, HRegionServer rs) {
- this.rs = rs;
- try {
- RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
- this.peerId = param.getPeerId();
- this.type = param.getType();
- } catch (InvalidProtocolBufferException e) {
- initError = e;
- }
+ protected void initParameter(byte[] parameter) throws
InvalidProtocolBufferException {
+ RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
+ this.peerId = param.getPeerId();
+ this.type = param.getType();
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 12f8ed8..842955e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationTrackerParams;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -64,7 +62,6 @@ public class Replication implements ReplicationSourceService,
ReplicationSinkSer
private ReplicationSourceManager replicationManager;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
- private ReplicationTracker replicationTracker;
private Configuration conf;
private ReplicationSink replicationSink;
// Hosting server
@@ -111,10 +108,6 @@ public class Replication implements
ReplicationSourceService, ReplicationSinkSer
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(),
this.conf);
this.replicationPeers.init();
- this.replicationTracker = ReplicationFactory
-
.getReplicationTracker(ReplicationTrackerParams.create(server.getConfiguration(),
server)
-
.abortable(server).zookeeper(server.getZooKeeper()).choreService(server.getChoreService())
- .connection(server.getConnection()));
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}
@@ -126,9 +119,8 @@ public class Replication implements
ReplicationSourceService, ReplicationSinkSer
}
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
- this.replicationManager = new ReplicationSourceManager(queueStorage,
replicationPeers,
- replicationTracker, conf, this.server, fs, logDir, oldLogDir,
clusterId, walFactory,
- globalMetricsSource);
+ this.replicationManager = new ReplicationSourceManager(queueStorage,
replicationPeers, conf,
+ this.server, fs, logDir, oldLogDir, clusterId, walFactory,
globalMetricsSource);
// Get the user-space WAL provider
WALProvider walProvider = walFactory != null? walFactory.getWALProvider():
null;
if (walProvider != null) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 68f6f8b..fe83168 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -33,9 +33,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -44,7 +42,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -53,14 +50,12 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
@@ -103,24 +98,30 @@ import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are
three methods which
* modify it, {@link #removePeer(String)} ,
+<<<<<<< HEAD
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+=======
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
+ * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is
called by
+>>>>>>> 51893b9ba3... HBASE-26029 It is not reliable to use nodeDeleted event
to track region server's death (#3430)
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will
terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
- * {@link #walsByIdRecoveredQueues}. And {@link
ReplicationSourceManager.NodeFailoverWorker#run()}
- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start
up a
- * {@link ReplicationSourceInterface}. So there is no race here. For
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link
#removePeer(String)}, there
- * is already synchronized on {@link #oldsources}. So no need synchronized on
- * {@link #walsByIdRecoveredQueues}.</li>
+ * {@link #walsByIdRecoveredQueues}. And
+ * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add
the wals to
+ * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link
ReplicationSourceInterface}. So
+ * there is no race here. For {@link
ReplicationSourceManager#claimQueue(ServerName, String)} and
+ * {@link #removePeer(String)}, there is already synchronized on {@link
#oldsources}. So no need
+ * synchronized on {@link #walsByIdRecoveredQueues}.</li>
* <li>Need synchronized on {@link #latestPaths} to avoid the new open source
miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered
source for the
* to-be-removed peer.</li>
* </ul>
*/
@InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSourceManager.class);
// all the sources that read this RS's logs and every peer only has one
replication source
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -135,7 +136,6 @@ public class ReplicationSourceManager implements
ReplicationListener {
*/
private final ReplicationQueueStorage queueStorage;
- private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
private final UUID clusterId;
@@ -193,7 +193,7 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param oldLogDir the directory where old logs are archived
*/
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
- ReplicationPeers replicationPeers, ReplicationTracker
replicationTracker, Configuration conf,
+ ReplicationPeers replicationPeers, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID
clusterId,
WALFactory walFactory,
MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
@@ -202,7 +202,6 @@ public class ReplicationSourceManager implements
ReplicationListener {
this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
- this.replicationTracker = replicationTracker;
this.server = server;
this.walsById = new ConcurrentHashMap<>();
this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
@@ -215,7 +214,6 @@ public class ReplicationSourceManager implements
ReplicationListener {
// seconds
this.clusterId = clusterId;
this.walFactory = walFactory;
- this.replicationTracker.registerListener(this);
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -236,12 +234,9 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
/**
- * Adds a normal source per registered peer cluster and tries to process all
old region server wal
- * queues
- * <p>
- * The returned future is for adoptAbandonedQueues task.
+ * Adds a normal source per registered peer cluster.
*/
- Future<?> init() throws IOException {
+ void init() throws IOException {
for (String id : this.replicationPeers.getAllPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@@ -250,38 +245,6 @@ public class ReplicationSourceManager implements
ReplicationListener {
throwIOExceptionWhenFail(() ->
this.queueStorage.addPeerToHFileRefs(id));
}
}
- return this.executor.submit(this::adoptAbandonedQueues);
- }
-
- private void adoptAbandonedQueues() {
- List<ServerName> currentReplicators = null;
- try {
- currentReplicators = queueStorage.getListOfReplicators();
- } catch (ReplicationException e) {
- server.abort("Failed to get all replicators", e);
- return;
- }
- Set<ServerName> liveRegionServers;
- try {
- // must call this method to load the first snapshot of live region
servers and initialize
- // listeners
- liveRegionServers =
replicationTracker.loadLiveRegionServersAndInitializeListeners();
- } catch (IOException e) {
- server.abort("Failed load live region server list for replication", e);
- return;
- }
- LOG.info("Current list of replicators: {}, live RSes: {}",
currentReplicators,
- liveRegionServers);
- if (currentReplicators == null || currentReplicators.isEmpty()) {
- return;
- }
-
- // Look if there's anything to process after a restart
- for (ServerName rs : currentReplicators) {
- if (!liveRegionServers.contains(rs)) {
- transferQueues(rs);
- }
- }
}
/**
@@ -665,169 +628,101 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
}
- @Override
- public void regionServerRemoved(ServerName regionserver) {
- transferQueues(regionserver);
- }
-
- /**
- * Transfer all the queues of the specified to this region server. First it
tries to grab a lock
- * and if it works it will move the old queues and finally will delete the
old queues.
- * <p>
- * It creates one old source for any type of source of the old rs.
- */
- private void transferQueues(ServerName deadRS) {
- if (server.getServerName().equals(deadRS)) {
- // it's just us, give up
+ void claimQueue(ServerName deadRS, String queue) {
+ // Wait a bit before transferring the queues, we may be shutting down.
+ // This sleep may not be enough in some cases.
+ try {
+ Thread.sleep(sleepBeforeFailover +
+ (long) (ThreadLocalRandom.current().nextFloat() *
sleepBeforeFailover));
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting before transferring a queue.");
+ Thread.currentThread().interrupt();
+ }
+ // We try to lock that rs' queue directory
+ if (server.isStopped()) {
+ LOG.info("Not transferring queue since we are shutting down");
return;
}
- NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
+ // After claim the queues from dead region server, wewill skip to start the
+ // RecoveredReplicationSource if the peer has been removed. but there's
possible that remove a
+ // peer with peerId = 2 and add a peer with peerId = 2 again during
failover. So we need to get
+ // a copy of the replication peer first to decide whether we should start
the
+ // RecoveredReplicationSource. If the latest peer is not the old peer, we
should also skip to
+ // start the RecoveredReplicationSource, Otherwise the rs will abort (See
HBASE-20475).
+ String peerId = new ReplicationQueueInfo(queue).getPeerId();
+ ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
+ if (oldPeer == null) {
+ LOG.info("Not transferring queue since the replication peer {} for queue
{} does not exist",
+ peerId, queue);
+ return;
+ }
+ Pair<String, SortedSet<String>> claimedQueue;
try {
- this.executor.execute(transfer);
- } catch (RejectedExecutionException ex) {
-
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
- .getGlobalSource().incrFailedRecoveryQueue();
- LOG.info("Cancelling the transfer of " + deadRS + " because of " +
ex.getMessage());
+ claimedQueue = queueStorage.claimQueue(deadRS, queue,
server.getServerName());
+ } catch (ReplicationException e) {
+ LOG.error(
+ "ReplicationException: cannot claim dead region ({})'s " +
+ "replication queue. Znode : ({})" +
+ " Possible solution: check if znode size exceeds jute.maxBuffer
value. " +
+ " If so, increase it for both client and server side.",
+ deadRS, queueStorage.getRsNode(deadRS), e);
+ server.abort("Failed to claim queue from dead regionserver.", e);
+ return;
}
- }
-
- /**
- * Class responsible to setup new ReplicationSources to take care of the
queues from dead region
- * servers.
- */
- class NodeFailoverWorker extends Thread {
-
- private final ServerName deadRS;
- // After claim the queues from dead region server, the NodeFailoverWorker
will skip to start
- // the RecoveredReplicationSource if the peer has been removed. but
there's possible that
- // remove a peer with peerId = 2 and add a peer with peerId = 2 again
during the
- // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to
decide whether we
- // should start the RecoveredReplicationSource. If the latest peer is not
the old peer when
- // NodeFailoverWorker begin, we should skip to start the
RecoveredReplicationSource, Otherwise
- // the rs will abort (See HBASE-20475).
- private final Map<String, ReplicationPeerImpl> peersSnapshot;
-
- public NodeFailoverWorker(ServerName deadRS) {
- super("Failover-for-" + deadRS);
- this.deadRS = deadRS;
- peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
+ if (claimedQueue.getSecond().isEmpty()) {
+ return;
}
-
- private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
- ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
- return oldPeerRef != null && oldPeerRef == newPeerRef;
+ String queueId = claimedQueue.getFirst();
+ Set<String> walsSet = claimedQueue.getSecond();
+ ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
+ if (peer == null || peer != oldPeer) {
+ LOG.warn("Skipping failover for peer {} of node {}, peer is null",
peerId, deadRS);
+ abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(),
queueId));
+ return;
+ }
+ if (server instanceof ReplicationSyncUp.DummyServer &&
+ peer.getPeerState().equals(PeerState.DISABLED)) {
+ LOG.warn(
+ "Peer {} is disabled. ReplicationSyncUp tool will skip " +
"replicating data to this peer.",
+ peerId);
+ return;
}
- @Override
- public void run() {
- // Wait a bit before transferring the queues, we may be shutting down.
- // This sleep may not be enough in some cases.
- try {
- Thread.sleep(sleepBeforeFailover +
- (long) (ThreadLocalRandom.current().nextFloat() *
sleepBeforeFailover));
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting before transferring a queue.");
- Thread.currentThread().interrupt();
- }
- // We try to lock that rs' queue directory
- if (server.isStopped()) {
- LOG.info("Not transferring queue since we are shutting down");
+ ReplicationSourceInterface src;
+ try {
+ src = createSource(queueId, peer);
+ } catch (IOException e) {
+ LOG.error("Can not create replication source for peer {} and queue {}",
peerId, queueId, e);
+ server.abort("Failed to create replication source after claiming
queue.", e);
+ return;
+ }
+ // synchronized on oldsources to avoid adding recovered source for the
to-be-removed peer
+ synchronized (oldsources) {
+ peer = replicationPeers.getPeer(src.getPeerId());
+ if (peer == null || peer != oldPeer) {
+ src.terminate("Recovered queue doesn't belong to any current peer");
+ deleteQueue(queueId);
return;
}
- Map<String, Set<String>> newQueues = new HashMap<>();
- try {
- List<String> queues = queueStorage.getAllQueues(deadRS);
- while (!queues.isEmpty()) {
- Pair<String, SortedSet<String>> peer =
queueStorage.claimQueue(deadRS,
- queues.get(ThreadLocalRandom.current().nextInt(queues.size())),
server.getServerName());
- long sleep = sleepBeforeFailover / 2;
- if (!peer.getSecond().isEmpty()) {
- newQueues.put(peer.getFirst(), peer.getSecond());
- sleep = sleepBeforeFailover;
- }
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting before transferring a queue.");
- Thread.currentThread().interrupt();
- }
- queues = queueStorage.getAllQueues(deadRS);
- }
- if (queues.isEmpty()) {
- queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
+ // track sources in walsByIdRecoveredQueues
+ Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+ walsByIdRecoveredQueues.put(queueId, walsByGroup);
+ for (String wal : walsSet) {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ NavigableSet<String> wals = walsByGroup.get(walPrefix);
+ if (wals == null) {
+ wals = new TreeSet<>();
+ walsByGroup.put(walPrefix, wals);
}
- } catch (ReplicationException e) {
- LOG.error(String.format("ReplicationException: cannot claim dead
region (%s)'s " +
- "replication queue. Znode : (%s)" +
- " Possible solution: check if znode size exceeds jute.maxBuffer
value. " +
- " If so, increase it for both client and server side.",
- deadRS, queueStorage.getRsNode(deadRS)), e);
- server.abort("Failed to claim queue from dead regionserver.", e);
- return;
+ wals.add(wal);
}
- // Copying over the failed queue is completed.
- if (newQueues.isEmpty()) {
- // We either didn't get the lock or the failed region server didn't
have any outstanding
- // WALs to replicate, so we are done.
- return;
- }
-
- for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
- String queueId = entry.getKey();
- Set<String> walsSet = entry.getValue();
- try {
- // there is not an actual peer defined corresponding to peerId for
the failover.
- ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(queueId);
- String actualPeerId = replicationQueueInfo.getPeerId();
-
- ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
- if (peer == null || !isOldPeer(actualPeerId, peer)) {
- LOG.warn("Skipping failover for peer {} of node {}, peer is null",
actualPeerId,
- deadRS);
- abortWhenFail(() ->
queueStorage.removeQueue(server.getServerName(), queueId));
- continue;
- }
- if (server instanceof ReplicationSyncUp.DummyServer
- && peer.getPeerState().equals(PeerState.DISABLED)) {
- LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
- + "replicating data to this peer.",
- actualPeerId);
- continue;
- }
- // track sources in walsByIdRecoveredQueues
- Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
- walsByIdRecoveredQueues.put(queueId, walsByGroup);
- for (String wal : walsSet) {
- String walPrefix =
AbstractFSWALProvider.getWALPrefixFromWALName(wal);
- NavigableSet<String> wals = walsByGroup.get(walPrefix);
- if (wals == null) {
- wals = new TreeSet<>();
- walsByGroup.put(walPrefix, wals);
- }
- wals.add(wal);
- }
-
- ReplicationSourceInterface src = createSource(queueId, peer);
- // synchronized on oldsources to avoid adding recovered source for
the to-be-removed peer
- synchronized (oldsources) {
- peer = replicationPeers.getPeer(src.getPeerId());
- if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
- src.terminate("Recovered queue doesn't belong to any current
peer");
- removeRecoveredSource(src);
- continue;
- }
- oldsources.add(src);
- LOG.info("Added recovered source {}", src.getQueueId());
- for (String wal : walsSet) {
- src.enqueueLog(new Path(oldLogDir, wal));
- }
- src.startup();
- }
- } catch (IOException e) {
- // TODO manage it
- LOG.error("Failed creating a source", e);
- }
+ oldsources.add(src);
+ LOG.info("Added source for recovered queue {}", src.getQueueId());
+ for (String wal : walsSet) {
+ LOG.trace("Enqueueing log from recovered queue for source: " +
src.getQueueId());
+ src.enqueueLog(new Path(oldLogDir, wal));
}
+ src.startup();
}
}
@@ -1054,4 +949,8 @@ public class ReplicationSourceManager implements
ReplicationListener {
}
return crs.startup();
}
+
+ ReplicationQueueStorage getQueueStorage() {
+ return queueStorage;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index b6386cc..a2ce6e6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -31,18 +35,21 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
/**
* In a scenario of Replication based Disaster/Recovery, when hbase
Master-Cluster crashes, this
* tool is used to sync-up the delta from Master to Slave using the info from
ZooKeeper. The tool
- * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still
available after hbase
+ * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still
available after hbase
* crashes
*
* <pre>
@@ -62,6 +69,29 @@ public class ReplicationSyncUp extends Configured implements
Tool {
System.exit(ret);
}
+ private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws
KeeperException {
+ List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw,
zkw.getZNodePaths().rsZNode);
+ return rsZNodes == null ? Collections.emptySet() :
+
rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+ }
+
+ // When using this tool, usually the source cluster is unhealthy, so we
should try to claim the
+ // replication queues for the dead region servers first and then replicate
the data out.
+ private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager
mgr)
+ throws ReplicationException, KeeperException {
+ List<ServerName> replicators =
mgr.getQueueStorage().getListOfReplicators();
+ Set<ServerName> liveRegionServers = getLiveRegionServers(zkw);
+ for (ServerName sn : replicators) {
+ if (!liveRegionServers.contains(sn)) {
+ List<String> replicationQueues =
mgr.getQueueStorage().getAllQueues(sn);
+ System.out.println(sn + " is dead, claim its replication queues: " +
replicationQueues);
+ for (String queue : replicationQueues) {
+ mgr.claimQueue(sn, queue);
+ }
+ }
+ }
+ }
+
@Override
public int run(String[] args) throws Exception {
Abortable abortable = new Abortable() {
@@ -88,7 +118,8 @@ public class ReplicationSyncUp extends Configured implements
Tool {
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
new WALFactory(conf, "test", null));
ReplicationSourceManager manager = replication.getReplicationManager();
- manager.init().get();
+ manager.init();
+ claimReplicationQueues(zkw, manager);
while (manager.activeFailoverTaskCount() > 0) {
Thread.sleep(SLEEP_TIME);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
index b2e698f..c78fe40 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
@@ -18,41 +18,30 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.yetus.audience.InterfaceAudience;
import
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
/**
* The callable executed at RS side to switch rpc throttle state. <br/>
*/
@InterfaceAudience.Private
-public class SwitchRpcThrottleRemoteCallable implements RSProcedureCallable {
- private HRegionServer rs;
+public class SwitchRpcThrottleRemoteCallable extends BaseRSProcedureCallable {
+
private boolean rpcThrottleEnabled;
- private Exception initError;
@Override
- public Void call() throws Exception {
- if (initError != null) {
- throw initError;
- }
+ protected void doCall() throws Exception {
rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
- return null;
}
@Override
- public void init(byte[] parameter, HRegionServer rs) {
- this.rs = rs;
- try {
- SwitchRpcThrottleRemoteStateData param =
- SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
- rpcThrottleEnabled = param.getRpcThrottleEnabled();
- } catch (InvalidProtocolBufferException e) {
- initError = e;
- }
+ protected void initParameter(byte[] parameter) throws
InvalidProtocolBufferException {
+ SwitchRpcThrottleRemoteStateData param =
SwitchRpcThrottleRemoteStateData.parseFrom(parameter);
+ rpcThrottleEnabled = param.getRpcThrottleEnabled();
}
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java
new file mode 100644
index 0000000..6c86feb
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import
org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and
make it a step in SCP,
+ * this is a UT to make sure the {@link ClaimReplicationQueuesProcedure} works
correctly.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestClaimReplicationQueue extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestClaimReplicationQueue.class);
+
+ private static final TableName tableName3 = TableName.valueOf("test3");
+
+ private static final String PEER_ID3 = "3";
+
+ private static Table table3;
+
+ private static Table table4;
+
+ private static volatile boolean EMPTY = false;
+
+ public static final class ServerManagerForTest extends ServerManager {
+
+ public ServerManagerForTest(MasterServices master) {
+ super(master);
+ }
+
+ @Override
+ public List<ServerName> getOnlineServersList() {
+ // return no region server to make the procedure hang
+ if (EMPTY) {
+ for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
+ if
(e.getClassName().equals(ClaimReplicationQueuesProcedure.class.getName())) {
+ return Collections.emptyList();
+ }
+ }
+ }
+ return super.getOnlineServersList();
+ }
+ }
+
+ public static final class HMasterForTest extends HMaster {
+
+ public HMasterForTest(Configuration conf) throws IOException {
+ super(conf);
+ }
+
+ @Override
+ protected ServerManager createServerManager(MasterServices master) throws
IOException {
+ setupClusterConnection();
+ return new ServerManagerForTest(master);
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class,
HMaster.class);
+ TestReplicationBase.setUpBeforeClass();
+ createTable(tableName3);
+ table3 = connection1.getTable(tableName3);
+ table4 = connection2.getTable(tableName3);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ Closeables.close(table3, true);
+ Closeables.close(table4, true);
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Override
+ public void setUpBase() throws Exception {
+ super.setUpBase();
+ // set up two replication peers and only 1 rs to test claim replication
queue with multiple
+ // round
+ addPeer(PEER_ID3, tableName3);
+ }
+
+ @Override
+ public void tearDownBase() throws Exception {
+ super.tearDownBase();
+ removePeer(PEER_ID3);
+ }
+
+ @Test
+ public void testClaim() throws Exception {
+ // disable the peers
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
+ hbaseAdmin.disableReplicationPeer(PEER_ID3);
+
+ // put some data
+ int count1 = UTIL1.loadTable(htable1, famName);
+ int count2 = UTIL1.loadTable(table3, famName);
+
+ EMPTY = true;
+ UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
+ UTIL1.getMiniHBaseCluster().startRegionServer();
+
+ // since there is no active region server to get the replication queue,
the procedure should be
+ // in WAITING_TIMEOUT state for most time to retry
+ HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
+ UTIL1.waitFor(30000,
+ () -> master.getProcedures().stream()
+ .filter(p -> p instanceof ClaimReplicationQueuesProcedure)
+ .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
+
+ hbaseAdmin.enableReplicationPeer(PEER_ID2);
+ hbaseAdmin.enableReplicationPeer(PEER_ID3);
+
+ EMPTY = false;
+ // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub
procedure of SCP
+ UTIL1.waitFor(30000, () -> master.getProcedures().stream()
+ .filter(p -> p instanceof
ServerCrashProcedure).allMatch(Procedure::isSuccess));
+
+ // we should get all the data in the target cluster
+ waitForReplication(htable2, count1, NB_RETRIES);
+ waitForReplication(table4, count2, NB_RETRIES);
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 6d40997..cb90eba 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -54,6 +55,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@@ -65,8 +67,8 @@ import
org.apache.hbase.thirdparty.com.google.common.io.Closeables;
*/
public class TestReplicationBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationBase.class);
- private static Connection connection1;
- private static Connection connection2;
+ protected static Connection connection1;
+ protected static Connection connection2;
protected static Configuration CONF_WITH_LOCALFS;
protected static ReplicationAdmin admin;
@@ -141,19 +143,22 @@ public class TestReplicationBase {
waitForReplication(htable2, expectedRows, retries);
}
- protected static void waitForReplication(Table htable2, int expectedRows,
int retries)
- throws IOException, InterruptedException {
+ protected static void waitForReplication(Table table, int expectedRows, int
retries)
+ throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan();
- if (i== retries -1) {
+ if (i == retries - 1) {
fail("Waited too much time for normal batch replication");
}
- ResultScanner scanner = htable2.getScanner(scan);
- Result[] res = scanner.next(expectedRows);
- scanner.close();
- if (res.length != expectedRows) {
- LOG.info("Only got " + res.length + " rows");
+ int count = 0;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ while (scanner.next() != null) {
+ count++;
+ }
+ }
+ if (count != expectedRows) {
+ LOG.info("Only got " + count + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
@@ -229,6 +234,18 @@ public class TestReplicationBase {
htable2 = UTIL2.getConnection().getTable(tableName);
}
+ protected static void createTable(TableName tableName)
+ throws IOException {
+ TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+ UTIL1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ UTIL2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ UTIL1.waitUntilAllRegionsAssigned(tableName);
+ UTIL2.waitUntilAllRegionsAssigned(tableName);
+ }
+
private static void startClusters() throws Exception {
UTIL1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
@@ -248,22 +265,9 @@ public class TestReplicationBase {
admin = new ReplicationAdmin(CONF1);
hbaseAdmin = connection1.getAdmin();
- TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
-
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
- .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
-
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
-
- try (
- Admin admin1 = connection1.getAdmin();
- Admin admin2 = connection2.getAdmin()) {
- admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
- admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
- UTIL1.waitUntilAllRegionsAssigned(tableName);
- htable1 = connection1.getTable(tableName);
- UTIL2.waitUntilAllRegionsAssigned(tableName);
- htable2 = connection2.getTable(tableName);
- }
-
+ createTable(tableName);
+ htable1 = connection1.getTable(tableName);
+ htable2 = connection2.getTable(tableName);
}
@BeforeClass
@@ -276,21 +280,29 @@ public class TestReplicationBase {
return hbaseAdmin.listReplicationPeers().stream().anyMatch(p ->
peerId.equals(p.getPeerId()));
}
- @Before
- public void setUpBase() throws Exception {
- if (!peerExist(PEER_ID2)) {
+ protected final void addPeer(String peerId, TableName tableName) throws
Exception {
+ if (!peerExist(peerId)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
ReplicationEndpointTest.class.getName());
- hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
+ hbaseAdmin.addReplicationPeer(peerId, builder.build());
+ }
+ }
+
+ @Before
+ public void setUpBase() throws Exception {
+ addPeer(PEER_ID2, tableName);
+ }
+
+ protected final void removePeer(String peerId) throws Exception {
+ if (peerExist(peerId)) {
+ hbaseAdmin.removeReplicationPeer(peerId);
}
}
@After
public void tearDownBase() throws Exception {
- if (peerExist(PEER_ID2)) {
- hbaseAdmin.removeReplicationPeer(PEER_ID2);
- }
+ removePeer(PEER_ID2);
}
protected static void runSimplePutDeleteTest() throws IOException,
InterruptedException {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 68926d5..8aad2b1 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -72,7 +72,6 @@ import
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
-import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -393,9 +392,7 @@ public abstract class TestReplicationSourceManager {
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(),
s1.getConfiguration());
rp1.init();
- NodeFailoverWorker w1 =
- manager.new NodeFailoverWorker(server.getServerName());
- w1.run();
+ manager.claimQueue(server.getServerName(), "1");
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files,
manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -415,8 +412,7 @@ public abstract class TestReplicationSourceManager {
rq.addWAL(server.getServerName(), "2", group + ".log1");
rq.addWAL(server.getServerName(), "2", group + ".log2");
- NodeFailoverWorker w1 = manager.new
NodeFailoverWorker(server.getServerName());
- w1.run();
+ manager.claimQueue(server.getServerName(), "2");
// The log of the unknown peer should be removed from zk
for (String peer : manager.getAllQueues()) {