This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-24666 by this push:
new afd8b9b HBASE-26194 Introduce a ReplicationServerSourceManager to
simplify HReplicationServer (#3584)
afd8b9b is described below
commit afd8b9bbfa14cddc797978f94a996bd59b23e279
Author: XinSun <[email protected]>
AuthorDate: Tue Aug 17 16:28:18 2021 +0800
HBASE-26194 Introduce a ReplicationServerSourceManager to simplify
HReplicationServer (#3584)
Signed-off-by: stack <[email protected]>
---
.../apache/hadoop/hbase/security/SecurityInfo.java | 7 +
.../hadoop/hbase/security/SecurityConstants.java | 4 +
.../hbase/replication/ReplicationQueueInfo.java | 15 ++
.../hbase/replication/HReplicationServer.java | 122 ++---------
.../replication/regionserver/MetricsSource.java | 13 ++
.../ReplicationServerSourceManager.java | 234 +++++++++++++++++++++
.../TestReplicationServerSourceManager.java | 139 ++++++++++++
7 files changed, 427 insertions(+), 107 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index f5f6922..4740d9d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -27,6 +27,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos;
/**
* Maps RPC protocol interfaces to required configuration
@@ -51,6 +53,11 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
+
infos.put(ReplicationServerStatusProtos.ReplicationServerStatusService.getDescriptor()
+ .getName(),
+ new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
+
infos.put(ReplicationServerProtos.ReplicationServerService.getDescriptor().getName(),
+ new SecurityInfo(SecurityConstants.REPLICATION_SERVER_KRB_PRINCIPAL,
Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider
ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
index 3e387e8..d0e13a3 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
@@ -34,6 +34,10 @@ public final class SecurityConstants {
public static final String MASTER_KRB_KEYTAB_FILE =
"hbase.master.keytab.file";
public static final String REGIONSERVER_KRB_PRINCIPAL =
"hbase.regionserver.kerberos.principal";
public static final String REGIONSERVER_KRB_KEYTAB_FILE =
"hbase.regionserver.keytab.file";
+ public static final String REPLICATION_SERVER_KRB_PRINCIPAL =
+ "hbase.replication.server.kerberos.principal";
+ public static final String REPLICATION_SERVER_KRB_KEYTAB_FILE =
+ "hbase.replication.server.keytab.file";
/**
* This config is for experts: don't set its value unless you really know
what you are doing.
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index 49a2153..9c1c03a 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -147,4 +148,18 @@ public class ReplicationQueueInfo {
public boolean isQueueRecovered() {
return queueRecovered;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ReplicationQueueInfo) {
+ ReplicationQueueInfo other = (ReplicationQueueInfo) o;
+ return Objects.equals(this.owner, other.owner) &&
Objects.equals(this.queueId, other.queueId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.owner, this.queueId);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index f31a98d..1a9b18c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -20,19 +20,12 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.lang.management.MemoryUsage;
import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.OptionalLong;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -49,22 +42,14 @@ import
org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.ReplicationService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
-import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
-import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
-import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
-import
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
-import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import
org.apache.hadoop.hbase.replication.replicationserver.RemoteWALFileLengthProvider;
+import
org.apache.hadoop.hbase.replication.replicationserver.ReplicationServerSourceManager;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Sleeper;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -89,7 +74,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatus
*/
@InterfaceAudience.Private
@SuppressWarnings({ "deprecation"})
-public class HReplicationServer extends Thread implements Server,
ReplicationSourceController {
+public class HReplicationServer extends Thread implements Server {
private static final Logger LOG =
LoggerFactory.getLogger(HReplicationServer.class);
@@ -150,18 +135,6 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
final ReplicationServerRpcServices rpcServices;
- // Total buffer size on this RegionServer for holding batched edits to be
shipped.
- private final long totalBufferLimit;
- private AtomicLong totalBufferUsed = new AtomicLong();
-
- private final MetricsReplicationGlobalSourceSource globalMetrics;
- private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
- private final ConcurrentMap<String, ReplicationSourceInterface> sources =
- new ConcurrentHashMap<>();
-
- private final ReplicationQueueStorage queueStorage;
- private final ReplicationPeers replicationPeers;
-
// Stub to do region server status calls against the master.
private volatile ReplicationServerStatusService.BlockingInterface rssStub;
@@ -170,6 +143,8 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
private ReplicationSinkService replicationSinkService;
+ private ReplicationServerSourceManager sourceManager;
+
public HReplicationServer(final Configuration conf) throws Exception {
try {
this.startCode = System.currentTimeMillis();
@@ -198,11 +173,6 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
this.shortOperationTimeout =
conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
- this.totalBufferLimit =
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
- this.globalMetrics =
-
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
- .getGlobalSource();
initializeFileSystem();
this.choreService = new ChoreService(getName(), true);
@@ -219,10 +189,6 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
masterAddressTracker = null;
}
- this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf);
- this.replicationPeers =
- ReplicationFactory.getReplicationPeers(zooKeeper, this.conf);
- this.replicationPeers.init();
this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper);
this.rpcServices.start(zooKeeper);
this.choreService = new ChoreService(getName(), true);
@@ -497,7 +463,11 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
/**
* Start up replication source and sink handlers.
*/
- private void startReplicationService() throws IOException {
+ private void startReplicationService() throws IOException,
ReplicationException {
+ Path oldWalDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ this.sourceManager = new ReplicationServerSourceManager(this, walFs,
walRootDir,
+ oldWalDir, clusterId);
+ this.sourceManager.init();
if (this.replicationSinkService != null) {
this.replicationSinkService.startReplicationService();
}
@@ -510,6 +480,10 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
return replicationSinkService;
}
+ public ReplicationServerSourceManager getReplicationServerSourceManager() {
+ return this.sourceManager;
+ }
+
/**
* Report the status of the server. A server is online once all the startup
is
* completed (setting up filesystem, starting executorService threads,
etc.). This
@@ -682,75 +656,9 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
return interrupted;
}
- @Override
- public long getTotalBufferLimit() {
- return this.totalBufferLimit;
- }
-
- @Override
- public AtomicLong getTotalBufferUsed() {
- return this.totalBufferUsed;
- }
-
- @Override
- public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
- return this.globalMetrics;
- }
-
- @Override
- public void finishRecoveredSource(RecoveredReplicationSource src) {
- this.sources.remove(src.getQueueId());
- this.sourceMetrics.remove(src.getQueueId());
- deleteQueue(src.getReplicationQueueInfo());
- LOG.info("Finished recovering queue {} with the following stats: {}",
src.getQueueId(),
- src.getStats());
- }
-
public void startReplicationSource(ServerName owner, String queueId)
throws IOException, ReplicationException {
- ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(owner, queueId);
- String peerId = replicationQueueInfo.getPeerId();
- this.replicationPeers.addPeer(peerId);
- Path walDir = new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
- MetricsSource metrics = new MetricsSource(queueId);
-
- ReplicationSourceInterface src = ReplicationSourceFactory.create(conf,
queueId);
- // init replication source
- src.init(conf, walFs, walDir, this, queueStorage,
replicationPeers.getPeer(peerId), this,
- replicationQueueInfo, clusterId, createWALFileLengthProvider(owner,
queueId), metrics);
- queueStorage.getWALsInQueue(owner, queueId)
- .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
- src.startup();
- sources.put(queueId, src);
- sourceMetrics.put(queueId, metrics);
- }
-
- /**
- * Delete a complete queue of wals associated with a replication source
- * @param queueInfo the replication queue to delete
- */
- private void deleteQueue(ReplicationQueueInfo queueInfo) {
- abortWhenFail(() ->
- this.queueStorage.removeQueue(queueInfo.getOwner(),
queueInfo.getQueueId()));
- }
-
- @FunctionalInterface
- private interface ReplicationQueueOperation {
- void exec() throws ReplicationException;
- }
-
- private void abortWhenFail(ReplicationQueueOperation op) {
- try {
- op.exec();
- } catch (ReplicationException e) {
- abort("Failed to operate on replication queue", e);
- }
- }
-
- private WALFileLengthProvider createWALFileLengthProvider(ServerName
producer, String queueId) {
- if (ReplicationQueueInfo.isQueueRecovered(queueId)) {
- return p -> OptionalLong.empty();
- }
- return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
+ LOG.info("Start replication source, owner: {}, queueId: {}", owner,
queueId);
+ this.sourceManager.startReplicationSource(owner, queueId);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 3ab0806..2135010 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -66,6 +67,18 @@ public class MetricsSource implements BaseSource {
singleSourceSourceByTable = new HashMap<>();
}
+
+ /**
+ * Constructor used to register the metrics
+ * On ReplicationServer, there may be multiple queues with the same queueId,
so use queueOwner
+ * and queueId to distinguish them.
+ *
+ * @param queueInfo The replication queue this class is monitoring
+ */
+ public MetricsSource(ReplicationQueueInfo queueInfo) {
+ this(queueInfo.getOwner() + "." + queueInfo.getQueueId());
+ }
+
/**
* Constructor for injecting custom (or test) MetricsReplicationSourceSources
* @param id Name of the source this class is monitoring
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/ReplicationServerSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/ReplicationServerSourceManager.java
new file mode 100644
index 0000000..683fbcf
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/ReplicationServerSourceManager.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
+import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
+import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible to manage all the replication sources on
ReplicationServer.
+ */
[email protected]
+public class ReplicationServerSourceManager implements
ReplicationSourceController {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationServerSourceManager.class);
+
+ private final Server server;
+
+ // Total buffer size on this ReplicationServer for holding batched edits to
be shipped.
+ private final long totalBufferLimit;
+ private AtomicLong totalBufferUsed = new AtomicLong();
+
+ private final MetricsReplicationGlobalSourceSource globalMetrics;
+
+ private final ConcurrentMap<ReplicationQueueInfo,
ReplicationSourceInterface> sources =
+ new ConcurrentHashMap<>();
+
+ private final ZKReplicationQueueStorage zkQueueStorage;
+ private final ReplicationPeers replicationPeers;
+
+ private final Configuration conf;
+
+ private final FileSystem fs;
+ private final Path walRootDir;
+ // Path to the wal archive
+ private final Path oldWalDir;
+
+ private final UUID clusterId;
+
+ public ReplicationServerSourceManager(Server server, FileSystem fs, Path
walRootDir,
+ Path oldWalDir, UUID clusterId) {
+ this.server = server;
+ this.conf = server.getConfiguration();
+ this.fs = fs;
+ this.walRootDir = walRootDir;
+ this.oldWalDir = oldWalDir;
+ this.clusterId = clusterId;
+ this.zkQueueStorage = (ZKReplicationQueueStorage) ReplicationStorageFactory
+ .getReplicationQueueStorage(server.getZooKeeper(), conf);
+ this.replicationPeers =
+ ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
+ this.totalBufferLimit =
this.conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.globalMetrics = CompatibilitySingletonFactory.getInstance(
+ MetricsReplicationSourceFactory.class).getGlobalSource();
+ }
+
+
+ public void init() throws IOException, ReplicationException {
+ this.replicationPeers.init();
+ }
+
+ @Override
+ public long getTotalBufferLimit() {
+ return totalBufferLimit;
+ }
+
+ @Override
+ public AtomicLong getTotalBufferUsed() {
+ return totalBufferUsed;
+ }
+
+ @Override
+ public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
+
+ @Override
+ public void finishRecoveredSource(RecoveredReplicationSource src) {
+ this.sources.remove(src.getReplicationQueueInfo());
+ deleteQueue(src.getReplicationQueueInfo());
+ LOG.info("Finished recovering queue {} with the following stats: {}",
src.getQueueId(),
+ src.getStats());
+ }
+
+ /**
+ * Get a list of all sources, including normal sources and recovered sources.
+ * @return list of all sources
+ */
+ public List<ReplicationSourceInterface> getSources() {
+ return new ArrayList<>(this.sources.values());
+ }
+
+ public ReplicationSourceInterface getSource(ReplicationQueueInfo queueInfo) {
+ return this.sources.get(queueInfo);
+ }
+
+ public ReplicationPeers getReplicationPeers() {
+ return this.replicationPeers;
+ }
+
+ public void startReplicationSource(ServerName owner, String queueId) throws
IOException,
+ ReplicationException {
+ ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(owner, queueId);
+ String peerId = replicationQueueInfo.getPeerId();
+ this.replicationPeers.addPeer(peerId);
+
+ ReplicationSourceInterface src = ReplicationSourceFactory.create(conf,
queueId);
+
+ ReplicationSourceInterface existSource =
sources.putIfAbsent(replicationQueueInfo, src);
+ if (existSource != null) {
+ LOG.warn("Duplicate source exists for replication queue: owner={},
queueId={}", owner,
+ queueId);
+ return;
+ }
+
+ Path walDir = replicationQueueInfo.isQueueRecovered() ? oldWalDir :
+ new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
+
+ // init replication source
+ src.init(conf, fs, walDir, this, zkQueueStorage,
replicationPeers.getPeer(peerId),
+ server, replicationQueueInfo, clusterId,
+ createWALFileLengthProvider(replicationQueueInfo), new
MetricsSource(replicationQueueInfo));
+
+ List<String> waLsInQueue = zkQueueStorage.getWALsInQueue(owner, queueId);
+ waLsInQueue.forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
+
+ src.startup();
+ LOG.info("Start replication source for queue {} from region server {}",
queueId, owner);
+ }
+
+ public void stopReplicationSource(ServerName owner, String queueId) throws
IOException,
+ ReplicationException {
+ String terminateMessage = "Replication source was stopped by Master";
+ ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(owner, queueId);
+ ReplicationSourceInterface src = sources.remove(replicationQueueInfo);
+ if (src != null) {
+ // Just terminate the source, do not clean queue.
+ src.terminate(terminateMessage);
+ }
+ }
+
+ public void removePeer(String peerId) {
+ String terminateMessage = "Replication stream was removed by a user";
+
+ List<ReplicationQueueInfo> queuesToDelete = sources.keySet().stream()
+ .filter(queueInfo ->
queueInfo.getPeerId().equals(peerId)).collect(Collectors.toList());
+
+ for (ReplicationQueueInfo queueInfo : queuesToDelete) {
+ ReplicationSourceInterface source = sources.remove(queueInfo);
+ source.terminate(terminateMessage);
+ deleteQueue(queueInfo);
+ }
+ // Remove HFile Refs
+ abortWhenFail(() -> this.zkQueueStorage.removePeerFromHFileRefs(peerId));
+ }
+
+ /**
+ * Delete a complete queue of wals associated with a replication source
+ * @param queueInfo the replication queue to delete
+ */
+ private void deleteQueue(ReplicationQueueInfo queueInfo) {
+ abortWhenFail(() ->
+ this.zkQueueStorage.removeQueue(queueInfo.getOwner(),
queueInfo.getQueueId()));
+ }
+
+ private void abortWhenFail(ReplicationQueueOperation op) {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ server.abort("Failed to operate on replication queue", e);
+ }
+ }
+
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+ void exec() throws ReplicationException;
+ }
+
+ private WALFileLengthProvider
createWALFileLengthProvider(ReplicationQueueInfo queueInfo) {
+ if (queueInfo.isQueueRecovered()) {
+ return p -> OptionalLong.empty();
+ }
+ return new RemoteWALFileLengthProvider(server.getAsyncClusterConnection(),
+ queueInfo.getOwner());
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestReplicationServerSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestReplicationServerSourceManager.java
new file mode 100644
index 0000000..7baf97c
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestReplicationServerSourceManager.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestReplicationServerSourceManager extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationServerSourceManager.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReplicationServerSourceManager.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private HReplicationServer replicationServer;
+
+ private ReplicationServerSourceManager manager;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CONF1.setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true);
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ replicationServer = new HReplicationServer(UTIL1.getConfiguration());
+ replicationServer.start();
+ replicationServer.waitForServerOnline();
+ manager = replicationServer.getReplicationServerSourceManager();
+ super.setUpBase();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ replicationServer.stop("test");
+ super.tearDownBase();
+ }
+
+ @Test
+ public void testAddSource() throws Exception {
+ ServerName rs =
+
UTIL1.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer().getServerName();
+ manager.startReplicationSource(rs, PEER_ID2);
+ loadData("aaa", row);
+ UTIL2.waitFor(30000, () -> htable2.getScanner(new Scan()).next() != null);
+ }
+
+ @Test
+ public void testRemovePeerMetricsCleanup() throws Exception {
+ HRegionServer rs =
+
UTIL1.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
+ ReplicationSourceManager rsSourceManager =
+ rs.getReplicationSourceService().getReplicationManager();
+ int rsSourceSizeInitial = rsSourceManager.getSources().size();
+ String peerId = name.getMethodName();
+ ReplicationQueueInfo queueInfo = new
ReplicationQueueInfo(rs.getServerName(), peerId);
+
+ addPeer(peerId, tableName);
+ UTIL1.waitFor(30000,
+ () -> rsSourceSizeInitial + 1 == rsSourceManager.getSources().size());
+
+ MetricsReplicationGlobalSourceSource globalMetrics =
manager.getGlobalMetrics();
+ int globalLogQueueSizeInitial = globalMetrics.getSizeOfLogQueue();
+ LOG.debug("globalLogQueueSize: {} before starting source on replication
server",
+ globalLogQueueSizeInitial);
+ // Start source on replication server
+ manager.startReplicationSource(rs.getServerName(), peerId);
+ ReplicationSourceInterface src = manager.getSource(queueInfo);
+ assertNotNull(src);
+ assertEquals(globalLogQueueSizeInitial +
src.getSourceMetrics().getSizeOfLogQueue(),
+ globalMetrics.getSizeOfLogQueue());
+
+ // Stopping the peer should reset the global metrics
+ manager.stopReplicationSource(rs.getServerName(), peerId);
+ UTIL1.waitFor(30000, () -> manager.getSources().size() == 0);
+ assertEquals(globalLogQueueSizeInitial, globalMetrics.getSizeOfLogQueue());
+
+ // Adding the same source back again should reset the single source metrics
+ manager.startReplicationSource(rs.getServerName(), peerId);
+ src = manager.getSource(queueInfo);
+ assertNotNull(src);
+ assertEquals(globalLogQueueSizeInitial +
src.getSourceMetrics().getSizeOfLogQueue(),
+ globalMetrics.getSizeOfLogQueue());
+
+ removePeer(peerId);
+ }
+}
\ No newline at end of file