This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-24666 by this push:
new fd2f3d1 HBASE-26084 Add owner of replication queue for
ReplicationQueueInfo (#3477)
fd2f3d1 is described below
commit fd2f3d1abdcb79b0fc037ff92e65459660c5b502
Author: XinSun <[email protected]>
AuthorDate: Thu Aug 12 17:14:35 2021 +0800
HBASE-26084 Add owner of replication queue for ReplicationQueueInfo (#3477)
Signed-off-by: stack <[email protected]>
---
.../hbase/replication/ReplicationQueueInfo.java | 24 +++++++++++++++-
.../hadoop/hbase/replication/ReplicationUtils.java | 3 +-
.../replication/ZKReplicationQueueStorage.java | 2 +-
.../master/replication/ReplicationPeerManager.java | 3 +-
.../hbase/replication/HReplicationServer.java | 22 +++++++--------
.../regionserver/DumpReplicationQueues.java | 2 +-
.../regionserver/RecoveredReplicationSource.java | 7 +++--
.../regionserver/ReplicationSource.java | 32 ++++++++--------------
.../regionserver/ReplicationSourceFactory.java | 3 +-
.../regionserver/ReplicationSourceInterface.java | 17 +++++++++---
.../regionserver/ReplicationSourceManager.java | 11 ++++----
.../hadoop/hbase/util/hbck/ReplicationChecker.java | 15 ++++------
.../hbase/replication/ReplicationSourceDummy.java | 20 ++++++--------
.../regionserver/TestReplicationSource.java | 22 +++++++++------
.../regionserver/TestReplicationSourceManager.java | 13 +++++----
.../TestReplicationSourceManagerZkImpl.java | 3 +-
16 files changed, 113 insertions(+), 86 deletions(-)
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index d39a37e..49a2153 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
public class ReplicationQueueInfo {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationQueueInfo.class);
+ private final ServerName owner;
private final String peerId;
private final String queueId;
private boolean queueRecovered;
@@ -46,7 +47,8 @@ public class ReplicationQueueInfo {
* The passed queueId will be either the id of the peer or the handling
story of that queue
* in the form of id-servername-*
*/
- public ReplicationQueueInfo(String queueId) {
+ public ReplicationQueueInfo(ServerName owner, String queueId) {
+ this.owner = owner;
this.queueId = queueId;
String[] parts = queueId.split("-", 2);
this.queueRecovered = parts.length != 1;
@@ -58,6 +60,22 @@ public class ReplicationQueueInfo {
}
/**
+ * A util method to parse the peerId from queueId.
+ */
+ public static String parsePeerId(String queueId) {
+ String[] parts = queueId.split("-", 2);
+ return parts.length != 1 ? parts[0] : queueId;
+ }
+
+ /**
+ * A util method to check whether a queue is recovered.
+ */
+ public static boolean isQueueRecovered(String queueId) {
+ String[] parts = queueId.split("-", 2);
+ return parts.length != 1;
+ }
+
+ /**
* Parse dead server names from queue id. servername can contain "-" such as
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing
for the following
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server
name>-...
@@ -114,6 +132,10 @@ public class ReplicationQueueInfo {
return Collections.unmodifiableList(this.deadRegionServers);
}
+ public ServerName getOwner() {
+ return this.owner;
+ }
+
public String getPeerId() {
return this.peerId;
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 7bafbc2..7dbfe41a 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -86,8 +86,7 @@ public final class ReplicationUtils {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (queueInfo.getPeerId().equals(peerId)) {
+ if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 08ac142..141e890 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -205,7 +205,7 @@ public class ZKReplicationQueueStorage extends
ZKReplicationStorageBase
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
- String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+ String peerId = ReplicationQueueInfo.parsePeerId(queueId);
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(),
peerId);
Pair<Long, Integer> p =
getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index add5121..c2a21a5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -103,8 +103,7 @@ public class ReplicationPeerManager {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (queueInfo.getPeerId().equals(peerId)) {
+ if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
throw new DoNotRetryIOException("undeleted queue for peerId: " +
peerId +
", replicator: " + replicator + ", queueId: " + queueId);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 2654565..f31a98d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -701,25 +701,24 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
public void finishRecoveredSource(RecoveredReplicationSource src) {
this.sources.remove(src.getQueueId());
this.sourceMetrics.remove(src.getQueueId());
- deleteQueue(src.getQueueId());
+ deleteQueue(src.getReplicationQueueInfo());
LOG.info("Finished recovering queue {} with the following stats: {}",
src.getQueueId(),
src.getStats());
}
- public void startReplicationSource(ServerName producer, String queueId)
+ public void startReplicationSource(ServerName owner, String queueId)
throws IOException, ReplicationException {
- ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(queueId);
+ ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(owner, queueId);
String peerId = replicationQueueInfo.getPeerId();
this.replicationPeers.addPeer(peerId);
- Path walDir =
- new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(producer.toString()));
+ Path walDir = new Path(walRootDir,
AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
MetricsSource metrics = new MetricsSource(queueId);
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf,
queueId);
// init replication source
src.init(conf, walFs, walDir, this, queueStorage,
replicationPeers.getPeer(peerId), this,
- producer, queueId, clusterId, createWALFileLengthProvider(producer,
queueId), metrics);
- queueStorage.getWALsInQueue(producer, queueId)
+ replicationQueueInfo, clusterId, createWALFileLengthProvider(owner,
queueId), metrics);
+ queueStorage.getWALsInQueue(owner, queueId)
.forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
src.startup();
sources.put(queueId, src);
@@ -728,10 +727,11 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
/**
* Delete a complete queue of wals associated with a replication source
- * @param queueId the id of replication queue to delete
+ * @param queueInfo the replication queue to delete
*/
- private void deleteQueue(String queueId) {
- abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(),
queueId));
+ private void deleteQueue(ReplicationQueueInfo queueInfo) {
+ abortWhenFail(() ->
+ this.queueStorage.removeQueue(queueInfo.getOwner(),
queueInfo.getQueueId()));
}
@FunctionalInterface
@@ -748,7 +748,7 @@ public class HReplicationServer extends Thread implements
Server, ReplicationSou
}
private WALFileLengthProvider createWALFileLengthProvider(ServerName
producer, String queueId) {
- if (new ReplicationQueueInfo(queueId).isQueueRecovered()) {
+ if (ReplicationQueueInfo.isQueueRecovered(queueId)) {
return p -> OptionalLong.empty();
}
return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 11b0c7c..ef4fde3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -316,7 +316,7 @@ public class DumpReplicationQueues extends Configured
implements Tool {
deadRegionServers.add(regionserver.getServerName());
}
for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ ReplicationQueueInfo queueInfo = new
ReplicationQueueInfo(regionserver, queueId);
List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
Collections.sort(wals);
if (!peerIds.contains(queueInfo.getPeerId())) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 147556f..c8200ca 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -51,11 +52,11 @@ public class RecoveredReplicationSource extends
ReplicationSource {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo
queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
throws IOException {
- super.init(conf, fs, walDir, overallController, queueStorage,
replicationPeer, server, producer,
- queueId, clusterId, walFileLengthProvider, metrics);
+ super.init(conf, fs, walDir, overallController, queueStorage,
replicationPeer, server,
+ queueInfo, clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0d9ee4b..4e46a79 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -112,8 +112,6 @@ public class ReplicationSource implements
ReplicationSourceInterface {
private UUID clusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
- // The znode we currently play with
- protected String queueId;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@@ -190,7 +188,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo
queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
throws IOException {
this.server = server;
@@ -212,8 +210,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
this.metrics = metrics;
this.clusterId = clusterId;
- this.queueId = queueId;
- this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
+ this.replicationQueueInfo = queueInfo;
// A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
defaultBandwidth =
this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -229,15 +226,15 @@ public class ReplicationSource implements
ReplicationSourceInterface {
if (queueStorage instanceof ZKReplicationQueueStorage) {
ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage)
queueStorage;
zkQueueStorage.getZookeeper().registerListener(
- new ReplicationQueueListener(this, zkQueueStorage, producer,
queueId, walDir));
+ new ReplicationQueueListener(this, zkQueueStorage, queueInfo,
walDir));
LOG.info("Register a ZKListener to track the WALs from {}'s
replication queue, queueId={}",
- producer, queueId);
+ queueInfo.getOwner(), queueInfo.getQueueId());
} else {
throw new UnsupportedOperationException(
"hbase.replication.offload.enabled=true only support
ZKReplicationQueueStorage");
}
}
- LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
+ LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}",
getQueueId(),
replicationPeer.getId(), this.currentBandwidth);
}
@@ -346,7 +343,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
createNewWALReader(walGroupId, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
- + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
+ + ".replicationSource.wal-reader." + walGroupId + "," +
getQueueId(),
(t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t,e) -> this.uncaughtException(t, e, null,
this.getPeerId()));
@@ -594,7 +591,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
setSourceStartupStatus(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
- Thread.currentThread().getName() + ".replicationSource," + this.queueId,
+ Thread.currentThread().getName() + ".replicationSource," + getQueueId(),
(t,e) -> {
//if first initialization attempt failed, and abortOnError is false,
we will
//keep looping in this thread until initialize eventually succeeds,
@@ -638,10 +635,10 @@ public class ReplicationSource implements
ReplicationSourceInterface {
public void terminate(String reason, Exception cause, boolean clearMetrics,
boolean join) {
if (cause == null) {
- LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId,
reason);
+ LOG.info("{} Closing source {} because: {}", logPeerId(), getQueueId(),
reason);
} else {
LOG.error(String.format("%s Closing source %s because an error occurred:
%s",
- logPeerId(), this.queueId, reason), cause);
+ logPeerId(), getQueueId(), reason), cause);
}
this.sourceRunning = false;
if (initThread != null && Thread.currentThread() != initThread) {
@@ -698,7 +695,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("{} Got exception while waiting for endpoint to shutdown "
- + "for replication source : {}", logPeerId(), this.queueId, te);
+ + "for replication source : {}", logPeerId(), getQueueId(), te);
}
}
}
@@ -711,11 +708,6 @@ public class ReplicationSource implements
ReplicationSourceInterface {
}
@Override
- public String getQueueId() {
- return this.queueId;
- }
-
- @Override
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipper worker : workerThreads.values()) {
@@ -957,10 +949,10 @@ public class ReplicationSource implements
ReplicationSourceInterface {
private final Path walDir;
public ReplicationQueueListener(ReplicationSource source,
- ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String
queueId, Path walDir) {
+ ZKReplicationQueueStorage zkQueueStorage, ReplicationQueueInfo
queueInfo, Path walDir) {
super(zkQueueStorage.getZookeeper());
this.source = source;
- this.queueNode = zkQueueStorage.getQueueNode(producer, queueId);
+ this.queueNode = zkQueueStorage.getQueueNode(queueInfo.getOwner(),
queueInfo.getQueueId());
this.walDir = walDir;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 56c8ee4..6254af5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -36,8 +36,7 @@ public final class ReplicationSourceFactory {
private ReplicationSourceFactory() {}
public static ReplicationSourceInterface create(Configuration conf, String
queueId) {
- ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(queueId);
- boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
+ boolean isQueueRecovered = ReplicationQueueInfo.isQueueRecovered(queueId);
ReplicationSourceInterface src;
try {
String defaultReplicationSourceImpl =
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 461276e..1122e5e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -50,15 +51,14 @@ public interface ReplicationSourceInterface {
* @param queueStorage the replication queue storage
* @param replicationPeer the replication peer
* @param server the server which start and run this replication source
- * @param producer the name of region server which produce WAL to the
replication queue
- * @param queueId the id of our replication queue
+ * @param queueInfo the replication queue
* @param clusterId unique UUID for the cluster
* @param walFileLengthProvider used to get the WAL length
* @param metrics metrics for this replication source
*/
void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo
queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
throws IOException;
@@ -105,7 +105,16 @@ public interface ReplicationSourceInterface {
*
* @return queue id
*/
- String getQueueId();
+ default String getQueueId() {
+ return getReplicationQueueInfo().getQueueId();
+ }
+
+ /**
+ * Get the replication queue info
+ *
+ * @return the replication queue info
+ */
+ ReplicationQueueInfo getReplicationQueueInfo();
/**
* Get the id that the source is replicating to.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3dc2d12..c050e1f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -298,14 +298,15 @@ public class ReplicationSourceManager implements
ReplicationSourceController {
private ReplicationSourceInterface createSource(String queueId,
ReplicationPeer replicationPeer)
throws IOException {
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf,
queueId);
+ ReplicationQueueInfo queueInfo = new
ReplicationQueueInfo(server.getServerName(), queueId);
// Init the just created replication source. Pass the default
walProvider's wal file length
// provider. Presumption is we replicate user-space Tables only. For
hbase:meta region replica
// replication, see #createCatalogReplicationSource().
WALFileLengthProvider walFileLengthProvider =
this.walFactory.getWALProvider() != null?
this.walFactory.getWALProvider().getWALFileLengthProvider() : p ->
OptionalLong.empty();
- src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server,
server.getServerName(),
- queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId));
+ src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server,
queueInfo,
+ clusterId, walFileLengthProvider, new MetricsSource(queueId));
return src;
}
@@ -587,7 +588,7 @@ public class ReplicationSourceManager implements
ReplicationSourceController {
// a copy of the replication peer first to decide whether we should start
the
// RecoveredReplicationSource. If the latest peer is not the old peer, we
should also skip to
// start the RecoveredReplicationSource, Otherwise the rs will abort (See
HBASE-20475).
- String peerId = new ReplicationQueueInfo(queue).getPeerId();
+ String peerId = ReplicationQueueInfo.parsePeerId(queue);
ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
if (oldPeer == null) {
LOG.info("Not transferring queue since the replication peer {} for queue
{} does not exist",
@@ -926,8 +927,8 @@ public class ReplicationSourceManager implements
ReplicationSourceController {
this.clusterId.toString());
final ReplicationSourceInterface crs = new CatalogReplicationSource();
crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer,
server,
- server.getServerName(), peer.getId(), clusterId,
walProvider.getWALFileLengthProvider(),
- new MetricsSource(peer.getId()));
+ new ReplicationQueueInfo(server.getServerName(), peer.getId()),
clusterId,
+ walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
// Add listener on the provider so we can pick up the WAL to replicate on
roll.
WALActionsListener listener = new WALActionsListener() {
@Override public void postLogRoll(Path oldPath, Path newPath) throws
IOException {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 7203fd1..ba440fc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -69,13 +69,11 @@ public class ReplicationChecker {
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
for (ServerName replicator : queueStorage.getListOfReplicators()) {
for (String queueId : queueStorage.getAllQueues(replicator)) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (!peerIds.contains(queueInfo.getPeerId())) {
+ String peerId = ReplicationQueueInfo.parsePeerId(queueId);
+ if (!peerIds.contains(peerId)) {
undeletedQueues.computeIfAbsent(replicator, key -> new
ArrayList<>()).add(queueId);
- LOG.debug(
- "Undeleted replication queue for removed peer found: " +
- "[removedPeerId={}, replicator={}, queueId={}]",
- queueInfo.getPeerId(), replicator, queueId);
+ LOG.debug("Undeleted replication queue for removed peer found: " +
+ "[removedPeerId={}, replicator={}, queueId={}]", peerId,
replicator, queueId);
}
}
}
@@ -99,10 +97,9 @@ public class ReplicationChecker {
undeletedQueueIds = getUnDeletedQueues();
undeletedQueueIds.forEach((replicator, queueIds) -> {
queueIds.forEach(queueId -> {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
String msg = "Undeleted replication queue for removed peer found: " +
- String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
queueInfo.getPeerId(),
- replicator, queueId);
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+ ReplicationQueueInfo.parsePeerId(queueId), replicator, queueId);
errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
msg);
});
});
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 8f28dee..6b87047 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
public class ReplicationSourceDummy implements ReplicationSourceInterface {
private ReplicationPeer replicationPeer;
- private String queueId;
+ private ReplicationQueueInfo replicationQueueInfo;
private Path currentPath;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
@@ -48,10 +48,10 @@ public class ReplicationSourceDummy implements
ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo
queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource
metrics)
throws IOException {
- this.queueId = queueId;
+ this.replicationQueueInfo = queueInfo;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationPeer = replicationPeer;
@@ -69,6 +69,11 @@ public class ReplicationSourceDummy implements
ReplicationSourceInterface {
}
@Override
+ public ReplicationQueueInfo getReplicationQueueInfo() {
+ return this.replicationQueueInfo;
+ }
+
+ @Override
public ReplicationSourceInterface startup() {
startup.set(true);
return this;
@@ -96,15 +101,8 @@ public class ReplicationSourceDummy implements
ReplicationSourceInterface {
}
@Override
- public String getQueueId() {
- return queueId;
- }
-
- @Override
public String getPeerId() {
- String[] parts = queueId.split("-", 2);
- return parts.length != 1 ?
- parts[0] : queueId;
+ return this.replicationQueueInfo.getPeerId();
}
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index bd673bc..dde2c3c 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -56,6 +56,7 @@ import
org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -139,7 +140,8 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), queueId), null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -177,7 +179,8 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), queueId, uuid,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), queueId), uuid,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -265,8 +268,8 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager =
Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- source.init(testConf, null, null, manager, null, mockPeer, null, null,
"testPeer", null,
- p -> OptionalLong.empty(), null);
+ source.init(testConf, null, null, manager, null, mockPeer, null,
+ new ReplicationQueueInfo(null, "testPeer"), null, p ->
OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(
() -> source.terminate("testing source termination"));
@@ -289,8 +292,9 @@ public class TestReplicationSource {
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
- source.init(testConf, null, null, mockManager, null, mockPeer, null, null,
- "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
+ source.init(testConf, null, null, mockManager, null, mockPeer, null,
+ new ReplicationQueueInfo(null, "testPeer"), null, p ->
OptionalLong.empty(),
+ mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source, null);
ReplicationSourceShipper shipper =
@@ -536,7 +540,8 @@ public class TestReplicationSource {
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), queueId), null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}
@@ -655,7 +660,8 @@ public class TestReplicationSource {
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
ReplicationSource source = new ReplicationSource();
- source.init(conf, null, null, manager, null, mockPeer, rss,
rss.getServerName(), id, null,
+ source.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), id), null,
p -> OptionalLong.empty(), metrics);
final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index e6b745e..2b6ab55 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@@ -414,8 +415,8 @@ public abstract class TestReplicationSourceManager {
assertEquals(files,
manager.getWalsByIdRecoveredQueues().get(id).get(group));
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
rp1.getPeer("1"),
- manager.getServer(), manager.getServer().getServerName(), id, null, p ->
OptionalLong.empty(),
- null);
+ manager.getServer(), new
ReplicationQueueInfo(manager.getServer().getServerName(), id), null,
+ p -> OptionalLong.empty(), null);
source.cleanOldWALs(file2, false);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2),
manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -632,7 +633,8 @@ public abstract class TestReplicationSourceManager {
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
- manager.getServer().getServerName(), peerId2, null, p ->
OptionalLong.empty(), null);
+ new ReplicationQueueInfo(manager.getServer().getServerName(),
peerId2), null,
+ p -> OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
// still there if peer id does not match
assertTrue(fs.exists(remoteWAL));
@@ -640,7 +642,8 @@ public abstract class TestReplicationSourceManager {
source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
- manager.getServer().getServerName(), slaveId, null, p ->
OptionalLong.empty(), null);
+ new ReplicationQueueInfo(manager.getServer().getServerName(),
slaveId), null,
+ p -> OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
assertFalse(fs.exists(remoteWAL));
} finally {
@@ -821,7 +824,7 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage
queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer,
String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo
queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics)
throws IOException {
throw new IOException("Failing deliberately");
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 8e0ab0f..86c141b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -95,7 +95,8 @@ public class TestReplicationSourceManagerZkImpl extends
TestReplicationSourceMan
queueStorage.claimQueue(serverName, unclaimed.get(0),
s3.getServerName()).getFirst();
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
- ReplicationQueueInfo replicationQueueInfo = new
ReplicationQueueInfo(queue3);
+ ReplicationQueueInfo replicationQueueInfo =
+ new ReplicationQueueInfo(s3.getServerName(), queue3);
List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
// verify
assertTrue(result.contains(server.getServerName()));