This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-24666 by this push:
new 3640775 HBASE-25071 ReplicationServer support start ReplicationSource
internal (#2452)
3640775 is described below
commit 36407759d67d67a089625c63cb3c3c81b624fe85
Author: Guanghao Zhang <[email protected]>
AuthorDate: Mon Nov 9 11:46:02 2020 +0800
HBASE-25071 ReplicationServer support start ReplicationSource internal
(#2452)
Signed-off-by: XinSun <[email protected]>
---
.../server/replication/ReplicationServer.proto | 14 +-
.../replication/ZKReplicationQueueStorage.java | 4 +-
.../replication/ZKReplicationStorageBase.java | 4 +
.../hadoop/hbase/master/MasterRpcServices.java | 2 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 6 +-
.../replication/HBaseReplicationEndpoint.java | 14 +-
.../hbase/replication/HReplicationServer.java | 175 ++++++++++++++++++---
.../replication/ReplicationServerRpcServices.java | 15 ++
.../regionserver/RecoveredReplicationSource.java | 9 +-
.../regionserver/ReplicationSource.java | 58 ++++++-
.../regionserver/ReplicationSourceFactory.java | 2 +-
.../regionserver/ReplicationSourceInterface.java | 6 +-
.../regionserver/ReplicationSourceManager.java | 4 +-
.../hbase/replication/ReplicationSourceDummy.java | 5 +-
.../hbase/replication/TestReplicationBase.java | 2 +-
.../replication/TestReplicationFetchServers.java | 43 +++--
...nServer.java => TestReplicationServerSink.java} | 24 +--
.../replication/TestReplicationServerSource.java | 69 ++++++++
.../regionserver/TestReplicationSource.java | 12 +-
.../regionserver/TestReplicationSourceManager.java | 16 +-
20 files changed, 395 insertions(+), 89 deletions(-)
diff --git
a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
index ed334c4..925aed4 100644
---
a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
+++
b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -24,9 +24,21 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
+import "HBase.proto";
import "server/region/Admin.proto";
+message StartReplicationSourceRequest {
+ required ServerName server_name = 1;
+ required string queue_id = 2;
+}
+
+message StartReplicationSourceResponse {
+}
+
service ReplicationServerService {
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);
-}
\ No newline at end of file
+
+ rpc StartReplicationSource(StartReplicationSourceRequest)
+ returns(StartReplicationSourceResponse);
+}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 6f1f5a3..2b9594e 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -80,7 +80,7 @@ import
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
* </pre>
*/
@InterfaceAudience.Private
-class ZKReplicationQueueStorage extends ZKReplicationStorageBase
+public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage {
private static final Logger LOG =
LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
@@ -123,7 +123,7 @@ class ZKReplicationQueueStorage extends
ZKReplicationStorageBase
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
- private String getQueueNode(ServerName serverName, String queueId) {
+ public String getQueueNode(ServerName serverName, String queueId) {
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index 596167f..a239bf8 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -74,4 +74,8 @@ public abstract class ZKReplicationStorageBase {
throw new RuntimeException(e);
}
}
+
+ public ZKWatcher getZookeeper() {
+ return this.zookeeper;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 9f229f4..02a897b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -3396,7 +3396,7 @@ public class MasterRpcServices extends RSRpcServices
implements
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().preListReplicationSinkServers();
}
- builder.addAllServerName(master.listReplicationSinkServers().stream()
+
builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream()
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
if (master.getMasterCoprocessorHost() != null) {
master.getMasterCoprocessorHost().postListReplicationSinkServers();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bc138c4..cd5c82d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -257,7 +257,6 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -271,7 +270,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
AdminService.BlockingInterface, ClientService.BlockingInterface,
PriorityFunction,
- ConfigurationObserver, ReplicationServerService.BlockingInterface {
+ ConfigurationObserver {
protected static final Logger LOG =
LoggerFactory.getLogger(RSRpcServices.class);
/** RPC scheduler to use for the region server. */
@@ -1491,9 +1490,6 @@ public class RSRpcServices implements
HBaseRPCErrorHandler,
bssi.add(new BlockingServiceAndInterface(
AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
- bssi.add(new BlockingServiceAndInterface(
- ReplicationServerService.newReflectiveBlockingService(this),
- ReplicationServerService.BlockingInterface.class));
}
return new org.apache.hbase.thirdparty.com.google.common.collect.
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 6967c6f..69264a4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -321,6 +321,10 @@ public abstract class HBaseReplicationEndpoint extends
BaseReplicationEndpoint
if (!useZk ||
ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
useZk = false;
slaveAddresses = fetchSlavesAddresses();
+ if (slaveAddresses.isEmpty()) {
+ LOG.warn("No sinks available at peer. Try fetch sinks by using zk.");
+ useZk = true;
+ }
} else {
useZk = true;
}
@@ -328,13 +332,15 @@ public abstract class HBaseReplicationEndpoint extends
BaseReplicationEndpoint
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
ctx.getPeerId(), t);
useZk = true;
}
+
if (useZk) {
slaveAddresses = fetchSlavesAddressesByZK();
}
if (slaveAddresses.isEmpty()) {
- LOG.warn("No sinks available at peer. Will not be able to replicate");
+ LOG.warn("No sinks available at peer. Will not be able to replicate.");
}
+
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
synchronized (this) {
@@ -368,10 +374,10 @@ public abstract class HBaseReplicationEndpoint extends
BaseReplicationEndpoint
}
private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
- if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
- return new ReplicationServerSinkPeer(serverName,
conn.getReplicationServerAdmin(serverName));
- } else {
+ if (fetchServersUseZk) {
return new RegionServerSinkPeer(serverName,
conn.getRegionServerAdmin(serverName));
+ } else {
+ return new ReplicationServerSinkPeer(serverName,
conn.getReplicationServerAdmin(serverName));
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 4b53bb7..4c8bb11 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -20,10 +20,19 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.lang.management.MemoryUsage;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -33,18 +42,31 @@ import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.ReplicationService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
+import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.security.SecurityConstants;
+import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
@@ -66,7 +88,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatus
*/
@InterfaceAudience.Private
@SuppressWarnings({ "deprecation"})
-public class HReplicationServer extends Thread implements Server {
+public class HReplicationServer extends Thread implements Server,
ReplicationSourceController {
private static final Logger LOG =
LoggerFactory.getLogger(HReplicationServer.class);
@@ -76,7 +98,7 @@ public class HReplicationServer extends Thread implements
Server {
/**
* This servers start code.
*/
- protected final long startCode;
+ private final long startCode;
private volatile boolean stopped = false;
@@ -85,7 +107,11 @@ public class HReplicationServer extends Thread implements
Server {
private AtomicBoolean abortRequested;
// flag set after we're done setting up server threads
- final AtomicBoolean online = new AtomicBoolean(false);
+ private final AtomicBoolean online = new AtomicBoolean(false);
+
+ private final int msgInterval;
+ // A sleeper that sleeps for msgInterval.
+ private final Sleeper sleeper;
/**
* The server name the Master sees us as. Its made from the hostname the
@@ -94,18 +120,22 @@ public class HReplicationServer extends Thread implements
Server {
*/
private ServerName serverName;
- protected final Configuration conf;
+ private final Configuration conf;
- private ReplicationSinkService replicationSinkService;
+ // zookeeper connection and watcher
+ private final ZKWatcher zooKeeper;
- final int msgInterval;
- // A sleeper that sleeps for msgInterval.
- protected final Sleeper sleeper;
+ private final UUID clusterId;
private final int shortOperationTimeout;
- // zookeeper connection and watcher
- protected final ZKWatcher zooKeeper;
+ private HFileSystem walFs;
+ private Path walRootDir;
+
+ /**
+ * ChoreService used to schedule tasks that we want to run periodically
+ */
+ private ChoreService choreService;
// master address tracker
private final MasterAddressTracker masterAddressTracker;
@@ -113,11 +143,23 @@ public class HReplicationServer extends Thread implements
Server {
/**
* The asynchronous cluster connection to be shared by services.
*/
- protected AsyncClusterConnection asyncClusterConnection;
+ private AsyncClusterConnection asyncClusterConnection;
private UserProvider userProvider;
- protected final ReplicationServerRpcServices rpcServices;
+ final ReplicationServerRpcServices rpcServices;
+
+ // Total buffer size on this RegionServer for holding batched edits to be
shipped.
+ private final long totalBufferLimit;
+ private AtomicLong totalBufferUsed = new AtomicLong();
+
+ private final MetricsReplicationGlobalSourceSource globalMetrics;
+ private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+ private final ConcurrentMap<String, ReplicationSourceInterface> sources =
+ new ConcurrentHashMap<>();
+
+ private final ReplicationQueueStorage queueStorage;
+ private final ReplicationPeers replicationPeers;
// Stub to do region server status calls against the master.
private volatile ReplicationServerStatusService.BlockingInterface rssStub;
@@ -125,12 +167,9 @@ public class HReplicationServer extends Thread implements
Server {
// RPC client. Used to make the stub above that does region server status
checking.
private RpcClient rpcClient;
- /**
- * ChoreService used to schedule tasks that we want to run periodically
- */
- private ChoreService choreService;
+ private ReplicationSinkService replicationSinkService;
- public HReplicationServer(final Configuration conf) throws IOException {
+ public HReplicationServer(final Configuration conf) throws Exception {
TraceUtil.initTracer(conf);
try {
this.startCode = System.currentTimeMillis();
@@ -144,12 +183,29 @@ public class HReplicationServer extends Thread implements
Server {
serverName = ServerName.valueOf(hostName,
this.rpcServices.isa.getPort(), this.startCode);
this.userProvider = UserProvider.instantiate(conf);
+ // login the zookeeper client principal (if using security)
+ ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
+ HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
+ // login the server principal (if using secure Hadoop)
+ this.userProvider.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
+ SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, hostName);
+ // init superusers and add the server principal (if using security)
+ // or process owner as default super user.
+ Superusers.initialize(conf);
this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3
* 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.shortOperationTimeout =
conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+ this.totalBufferLimit =
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.globalMetrics =
+
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource();
+
+ initializeFileSystem();
+ this.choreService = new ChoreService(getName(), true);
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
@@ -162,6 +218,12 @@ public class HReplicationServer extends Thread implements
Server {
zooKeeper = null;
masterAddressTracker = null;
}
+
+ this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf);
+ this.replicationPeers =
+ ReplicationFactory.getReplicationPeers(zooKeeper, this.conf);
+ this.replicationPeers.init();
+ this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper);
this.rpcServices.start(zooKeeper);
this.choreService = new ChoreService(getName(), true);
} catch (Throwable t) {
@@ -172,6 +234,15 @@ public class HReplicationServer extends Thread implements
Server {
}
}
+ private void initializeFileSystem() throws IOException {
+ // Get fs instance used by this RS. Do we use checksum verification in the
hbase? If hbase
+ // checksum verification enabled, then automatically switch off hdfs
checksum verification.
+ boolean useHBaseChecksum =
conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+ CommonFSUtils.setFsDefault(this.conf,
CommonFSUtils.getWALRootDir(this.conf));
+ this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
+ this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
+ }
+
public String getProcessName() {
return REPLICATION_SERVER;
}
@@ -291,6 +362,9 @@ public class HReplicationServer extends Thread implements
Server {
if (this.replicationSinkService != null) {
this.replicationSinkService.stopReplicationService();
}
+ if (this.choreService != null) {
+ this.choreService.shutdown();
+ }
}
@Override
@@ -330,7 +404,7 @@ public class HReplicationServer extends Thread implements
Server {
@Override
public ChoreService getChoreService() {
- return this.choreService;
+ return choreService;
}
@Override
@@ -594,4 +668,69 @@ public class HReplicationServer extends Thread implements
Server {
}
return interrupted;
}
+
+ @Override
+ public long getTotalBufferLimit() {
+ return this.totalBufferLimit;
+ }
+
+ @Override
+ public AtomicLong getTotalBufferUsed() {
+ return this.totalBufferUsed;
+ }
+
+ @Override
+ public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
+
+ @Override
+ public void finishRecoveredSource(RecoveredReplicationSource src) {
+ this.sources.remove(src.getQueueId());
+ this.sourceMetrics.remove(src.getQueueId());
+ deleteQueue(src.getQueueId());
+ LOG.info("Finished recovering queue {} with the following stats: {}",
src.getQueueId(),
+ src.getStats());
+ }
+
+ public void startReplicationSource(ServerName producer, String queueId)
+ throws IOException, ReplicationException {
+ ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(queueId);
+ String peerId = replicationQueueInfo.getPeerId();
+ this.replicationPeers.addPeer(peerId);
+ Path walDir =
+ new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(producer.toString()));
+ MetricsSource metrics = new MetricsSource(queueId);
+
+ ReplicationSourceInterface src = ReplicationSourceFactory.create(conf,
queueId);
+ // init replication source
+ src.init(conf, walFs, walDir, this, queueStorage,
replicationPeers.getPeer(peerId), this,
+ producer, queueId, clusterId, p -> OptionalLong.empty(), metrics);
+ queueStorage.getWALsInQueue(producer, queueId)
+ .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
+ src.startup();
+ sources.put(queueId, src);
+ sourceMetrics.put(queueId, metrics);
+ }
+
+ /**
+ * Delete a complete queue of wals associated with a replication source
+ * @param queueId the id of replication queue to delete
+ */
+ private void deleteQueue(String queueId) {
+ abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(),
queueId));
+ }
+
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+ void exec() throws ReplicationException;
+ }
+
+ private void abortWhenFail(ReplicationQueueOperation op) {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ abort("Failed to operate on replication queue", e);
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 15d4f8c..b8c3884 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -56,11 +56,14 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceResponse;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -321,4 +324,16 @@ public class ReplicationServerRpcServices implements
HBaseRPCErrorHandler,
throw new ServiceException(ie);
}
}
+
+ @Override
+ public StartReplicationSourceResponse startReplicationSource(RpcController
controller,
+ StartReplicationSourceRequest request) throws ServiceException {
+ try {
+
replicationServer.startReplicationSource(ProtobufUtil.toServerName(request.getServerName()),
+ request.getQueueId());
+ return StartReplicationSourceResponse.newBuilder().build();
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index eece3c0..2ecf908 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -51,10 +51,11 @@ public class RecoveredReplicationSource extends
ReplicationSource {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, String peerClusterZnode,
UUID clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws
IOException {
- super.init(conf, fs, walDir, overallController, queueStorage,
replicationPeer, server,
- peerClusterZnode, clusterId, walFileLengthProvider, metrics);
+ ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
+ throws IOException {
+ super.init(conf, fs, walDir, overallController, queueStorage,
replicationPeer, server, producer,
+ queueId, clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6fee960..c81cc29 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -63,10 +63,13 @@ import
org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -155,6 +158,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
private int waitOnEndpointSeconds = -1;
private Thread initThread;
+ private Thread fetchWALsThread;
/**
* WALs to replicate.
@@ -192,8 +196,9 @@ public class ReplicationSource implements
ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID
clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws
IOException {
+ ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
+ throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
this.walDir = walDir;
@@ -224,6 +229,19 @@ public class ReplicationSource implements
ReplicationSourceInterface {
this.abortOnError =
this.conf.getBoolean("replication.source.regionserver.abort",
true);
+ if (conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT)) {
+ if (queueStorage instanceof ZKReplicationQueueStorage) {
+ ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage)
queueStorage;
+ zkQueueStorage.getZookeeper().registerListener(
+ new ReplicationQueueListener(this, zkQueueStorage, producer,
queueId, walDir));
+ LOG.info("Register a ZKListener to track the WALs from {}'s
replication queue, queueId={}",
+ producer, queueId);
+ } else {
+ throw new UnsupportedOperationException(
+ "hbase.replication.offload.enabled=true only support
ZKReplicationQueueStorage");
+ }
+ }
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
@@ -258,7 +276,9 @@ public class ReplicationSource implements
ReplicationSourceInterface {
tryStartNewShipper(walPrefix, queue);
}
} else {
- queue.put(wal);
+ if (!queue.contains(wal)) {
+ queue.put(wal);
+ }
}
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(),
walPrefix,
@@ -928,4 +948,36 @@ public class ReplicationSource implements
ReplicationSourceInterface {
server.abort("Failed to operate on replication queue", e);
}
}
+
+ /**
+ * Tracks changes to the WALs in the replication queue.
+ */
+ public static class ReplicationQueueListener extends ZKListener {
+
+ private final ReplicationSource source;
+ private final String queueNode;
+ private final Path walDir;
+
+ public ReplicationQueueListener(ReplicationSource source,
+ ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String
queueId, Path walDir) {
+ super(zkQueueStorage.getZookeeper());
+ this.source = source;
+ this.queueNode = zkQueueStorage.getQueueNode(producer, queueId);
+ this.walDir = walDir;
+ }
+
+ @Override
+ public synchronized void nodeChildrenChanged(String path) {
+ if (path.equals(queueNode)) {
+ LOG.info("Detected change to the WALs in the replication queue {}",
queueNode);
+ try {
+ ZKUtil.listChildrenNoWatch(watcher, queueNode).forEach(walName -> {
+ source.enqueueLog(new Path(walDir, walName));
+ });
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read WALs in the replication queue {}",
queueNode, e);
+ }
+ }
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index d613049..16a7692 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -32,7 +32,7 @@ public class ReplicationSourceFactory {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSourceFactory.class);
- static ReplicationSourceInterface create(Configuration conf, String queueId)
{
+ public static ReplicationSourceInterface create(Configuration conf, String
queueId) {
ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index f3bf8a4..d14062c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -52,6 +52,7 @@ public interface ReplicationSourceInterface {
* @param queueStorage the replication queue storage
* @param replicationPeer the replication peer
* @param server the server which start and run this replication source
+ * @param producer the name of region server which produce WAL to the
replication queue
* @param queueId the id of our replication queue
* @param clusterId unique UUID for the cluster
* @param walFileLengthProvider used to get the WAL length
@@ -59,8 +60,9 @@ public interface ReplicationSourceInterface {
*/
void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID
clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws
IOException;
+ ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
+ throws IOException;
/**
* Add a log to the list of logs to replicate
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index de9e21f..cea0fc2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -360,8 +360,8 @@ public class ReplicationSourceManager implements
ReplicationListener, Replicatio
MetricsSource metrics = new MetricsSource(queueId);
sourceMetrics.put(queueId, metrics);
// init replication source
- src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server,
queueId, clusterId,
- walFileLengthProvider, metrics);
+ src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server,
server.getServerName(),
+ queueId, clusterId, walFileLengthProvider, metrics);
return src;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 66059c7..b4cce10 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -48,8 +48,9 @@ public class ReplicationSourceDummy implements
ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID
clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws
IOException {
+ ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
+ throws IOException {
this.queueId = queueId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index be44c6c..671e448 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -231,7 +231,7 @@ public class TestReplicationBase {
htable2 = UTIL2.getConnection().getTable(tableName);
}
- private static void startClusters() throws Exception {
+ static void startClusters() throws Exception {
UTIL1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
LOG.info("Setup first Zk");
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
index 9ceacee..db4152e 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.replication;
import static
org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
@@ -53,11 +54,14 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationFetchServers extends TestReplicationBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationFetchServers.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationFetchServers.class);
+ private static HReplicationServer replicationServer;
+
private static AtomicBoolean fetchFlag = new AtomicBoolean(false);
public static class MyObserver implements MasterCoprocessor, MasterObserver {
@@ -77,6 +81,17 @@ public class TestReplicationFetchServers extends
TestReplicationBase {
public static void setUpBeforeClass() throws Exception {
CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
TestReplicationBase.setUpBeforeClass();
+ replicationServer = new HReplicationServer(CONF2);
+ replicationServer.start();
+ UTIL2.waitFor(60000, () -> replicationServer.isOnline());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ if (!replicationServer.isStopped()) {
+ replicationServer.stop("test");
+ }
}
@Before
@@ -85,15 +100,23 @@ public class TestReplicationFetchServers extends
TestReplicationBase {
}
@Test
- public void testMasterListReplicationPeerServers() throws IOException,
ServiceException {
+ public void testMasterListReplicationPeerServers() throws IOException {
AsyncClusterConnection conn = UTIL2.getAsyncConnection();
ServerName master = UTIL2.getAdmin().getMaster();
- MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
- conn.getRpcClient().createBlockingRpcChannel(master,
User.getCurrent(), 1000));
- ListReplicationSinkServersResponse resp =
masterStub.listReplicationSinkServers(
- null, ListReplicationSinkServersRequest.newBuilder().build());
- List<ServerName> servers =
ProtobufUtil.toServerNameList(resp.getServerNameList());
- assertFalse(servers.isEmpty());
+ // Wait for the replication server report to master
+ UTIL2.waitFor(60000, () -> {
+ List<ServerName> servers = new ArrayList<>();
+ try {
+ MasterService.BlockingInterface masterStub =
MasterService.newBlockingStub(
+ conn.getRpcClient().createBlockingRpcChannel(master,
User.getCurrent(), 1000));
+ ListReplicationSinkServersResponse resp =
masterStub.listReplicationSinkServers(
+ null, ListReplicationSinkServersRequest.newBuilder().build());
+ servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
+ } catch (Exception e) {
+ LOG.debug("Failed to list replication servers", e);
+ }
+ return servers.size() == 1;
+ });
assertTrue(fetchFlag.get());
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
similarity index 89%
rename from
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
rename to
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
index 30660c6..bad1dc1 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ReplicationServerManager;
import
org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -64,13 +63,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationServer {
+public class TestReplicationServerSink {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationServer.class);
+ HBaseClassTestRule.forClass(TestReplicationServerSink.class);
- private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationServer.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationServerSink.class);
private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
@@ -145,22 +144,7 @@ public class TestReplicationServer {
replicateWALEntryAndVerify(sinkPeer);
}
- /**
- * Requests region server using {@link AsyncReplicationServerAdmin}
- */
- @Test
- public void testReplicateWAL2() throws Exception {
- AsyncClusterConnection conn =
- TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
- ServerName rs =
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
- .getRegionServer().getServerName();
- AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
-
- ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs,
replAdmin);
- replicateWALEntryAndVerify(sinkPeer);
- }
-
- private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception {
+ private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer)
throws Exception {
Entry[] entries = new Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
new file mode 100644
index 0000000..843e5b1
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationServerSource extends TestReplicationBase {
+
+ @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationServerSource.class);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationServerSource.class);
+
+ private static HReplicationServer replicationServer;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
UTIL1.getConfiguration().setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
true);
+ TestReplicationBase.setUpBeforeClass();
+ replicationServer = new HReplicationServer(UTIL1.getConfiguration());
+ replicationServer.start();
+ UTIL1.waitFor(60000, () -> replicationServer.isOnline());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ replicationServer.stop("Tear down after test");
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Test
+ public void test() throws Exception {
+ try {
+ // Only start one region server in source cluster
+ ServerName producer =
UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName();
+ replicationServer.startReplicationSource(producer, PEER_ID2);
+ } catch (Throwable e) {
+ LOG.info("Failed to start replicaiton source", e);
+ }
+ runSmallBatchTest();
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 43fef88..2a1d3be 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -131,7 +131,7 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -169,8 +169,8 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
- uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), queueId, uuid,
+ p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
@@ -257,8 +257,8 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager =
Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- source.init(testConf, null, null, manager, null, mockPeer, null,
"testPeer",
- null, p -> OptionalLong.empty(), null);
+ source.init(testConf, null, null, manager, null, mockPeer, null, null,
"testPeer", null,
+ p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(
() -> source.terminate("testing source termination"));
@@ -471,7 +471,7 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 0e0353f..d0504a5 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -414,7 +414,8 @@ public abstract class TestReplicationSourceManager {
assertEquals(files,
manager.getWalsByIdRecoveredQueues().get(id).get(group));
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
rp1.getPeer("1"),
- manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+ manager.getServer(), manager.getServer().getServerName(), id, null, p ->
OptionalLong.empty(),
+ null);
source.cleanOldWALs(file2, false);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2),
manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -631,16 +632,16 @@ public abstract class TestReplicationSourceManager {
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
- mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
peerId2, null,
- p -> OptionalLong.empty(), null);
+ mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
+ manager.getServer().getServerName(), peerId2, null, p ->
OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
// still there if peer id does not match
assertTrue(fs.exists(remoteWAL));
source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
- mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
slaveId, null,
- p -> OptionalLong.empty(), null);
+ mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
+ manager.getServer().getServerName(), slaveId, null, p ->
OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
assertFalse(fs.exists(remoteWAL));
} finally {
@@ -821,8 +822,9 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, String queueId, UUID
clusterId,
- WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException{
+ ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics)
+ throws IOException {
throw new IOException("Failing deliberately");
}
}