This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-24666 by this push: new fd2f3d1 HBASE-26084 Add owner of replication queue for ReplicationQueueInfo (#3477) fd2f3d1 is described below commit fd2f3d1abdcb79b0fc037ff92e65459660c5b502 Author: XinSun <ddu...@gmail.com> AuthorDate: Thu Aug 12 17:14:35 2021 +0800 HBASE-26084 Add owner of replication queue for ReplicationQueueInfo (#3477) Signed-off-by: stack <st...@apple.com> --- .../hbase/replication/ReplicationQueueInfo.java | 24 +++++++++++++++- .../hadoop/hbase/replication/ReplicationUtils.java | 3 +- .../replication/ZKReplicationQueueStorage.java | 2 +- .../master/replication/ReplicationPeerManager.java | 3 +- .../hbase/replication/HReplicationServer.java | 22 +++++++-------- .../regionserver/DumpReplicationQueues.java | 2 +- .../regionserver/RecoveredReplicationSource.java | 7 +++-- .../regionserver/ReplicationSource.java | 32 ++++++++-------------- .../regionserver/ReplicationSourceFactory.java | 3 +- .../regionserver/ReplicationSourceInterface.java | 17 +++++++++--- .../regionserver/ReplicationSourceManager.java | 11 ++++---- .../hadoop/hbase/util/hbck/ReplicationChecker.java | 15 ++++------ .../hbase/replication/ReplicationSourceDummy.java | 20 ++++++-------- .../regionserver/TestReplicationSource.java | 22 +++++++++------ .../regionserver/TestReplicationSourceManager.java | 13 +++++---- .../TestReplicationSourceManagerZkImpl.java | 3 +- 16 files changed, 113 insertions(+), 86 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java index d39a37e..49a2153 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; public class ReplicationQueueInfo { private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class); + private final ServerName owner; private final String peerId; private final String queueId; private boolean queueRecovered; @@ -46,7 +47,8 @@ public class ReplicationQueueInfo { * The passed queueId will be either the id of the peer or the handling story of that queue * in the form of id-servername-* */ - public ReplicationQueueInfo(String queueId) { + public ReplicationQueueInfo(ServerName owner, String queueId) { + this.owner = owner; this.queueId = queueId; String[] parts = queueId.split("-", 2); this.queueRecovered = parts.length != 1; @@ -58,6 +60,22 @@ public class ReplicationQueueInfo { } /** + * A util method to parse the peerId from queueId. + */ + public static String parsePeerId(String queueId) { + String[] parts = queueId.split("-", 2); + return parts.length != 1 ? parts[0] : queueId; + } + + /** + * A util method to check whether a queue is recovered. + */ + public static boolean isQueueRecovered(String queueId) { + String[] parts = queueId.split("-", 2); + return parts.length != 1; + } + + /** * Parse dead server names from queue id. servername can contain "-" such as * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-... @@ -114,6 +132,10 @@ public class ReplicationQueueInfo { return Collections.unmodifiableList(this.deadRegionServers); } + public ServerName getOwner() { + return this.owner; + } + public String getPeerId() { return this.peerId; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 7bafbc2..7dbfe41a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -86,8 +86,7 @@ public final class ReplicationUtils { for (ServerName replicator : queueStorage.getListOfReplicators()) { List<String> queueIds = queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (queueInfo.getPeerId().equals(peerId)) { + if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) { queueStorage.removeQueue(replicator, queueId); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 08ac142..141e890 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -205,7 +205,7 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds, List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException { - String peerId = new ReplicationQueueInfo(queueId).getPeerId(); + String peerId = ReplicationQueueInfo.parsePeerId(queueId); for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index add5121..c2a21a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -103,8 +103,7 @@ public class ReplicationPeerManager { for (ServerName replicator : queueStorage.getListOfReplicators()) { List<String> queueIds = queueStorage.getAllQueues(replicator); for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (queueInfo.getPeerId().equals(peerId)) { + if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) { throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: " + replicator + ", queueId: " + queueId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java index 2654565..f31a98d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -701,25 +701,24 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou public void finishRecoveredSource(RecoveredReplicationSource src) { this.sources.remove(src.getQueueId()); this.sourceMetrics.remove(src.getQueueId()); - deleteQueue(src.getQueueId()); + deleteQueue(src.getReplicationQueueInfo()); LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), src.getStats()); } - public void startReplicationSource(ServerName producer, String queueId) + public void startReplicationSource(ServerName owner, String queueId) throws IOException, ReplicationException { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(owner, queueId); String peerId = replicationQueueInfo.getPeerId(); this.replicationPeers.addPeer(peerId); - Path walDir = - new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(producer.toString())); + Path walDir = new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(owner.toString())); MetricsSource metrics = new MetricsSource(queueId); ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); // init replication source src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this, - producer, queueId, clusterId, createWALFileLengthProvider(producer, queueId), metrics); - queueStorage.getWALsInQueue(producer, queueId) + replicationQueueInfo, clusterId, createWALFileLengthProvider(owner, queueId), metrics); + queueStorage.getWALsInQueue(owner, queueId) .forEach(walName -> src.enqueueLog(new Path(walDir, walName))); src.startup(); sources.put(queueId, src); @@ -728,10 +727,11 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou /** * Delete a complete queue of wals associated with a replication source - * @param queueId the id of replication queue to delete + * @param queueInfo the replication queue to delete */ - private void deleteQueue(String queueId) { - abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), queueId)); + private void deleteQueue(ReplicationQueueInfo queueInfo) { + abortWhenFail(() -> + this.queueStorage.removeQueue(queueInfo.getOwner(), queueInfo.getQueueId())); } @FunctionalInterface @@ -748,7 +748,7 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou } private WALFileLengthProvider createWALFileLengthProvider(ServerName producer, String queueId) { - if (new ReplicationQueueInfo(queueId).isQueueRecovered()) { + if (ReplicationQueueInfo.isQueueRecovered(queueId)) { return p -> OptionalLong.empty(); } return new RemoteWALFileLengthProvider(asyncClusterConnection, producer); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 11b0c7c..ef4fde3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -316,7 +316,7 @@ public class DumpReplicationQueues extends Configured implements Tool { deadRegionServers.add(regionserver.getServerName()); } for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(regionserver, queueId); List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); Collections.sort(wals); if (!peerIds.contains(queueInfo.getPeerId())) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 147556f..c8200ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -51,11 +52,11 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, producer, - queueId, clusterId, walFileLengthProvider, metrics); + super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, + queueInfo, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0d9ee4b..4e46a79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -112,8 +112,6 @@ public class ReplicationSource implements ReplicationSourceInterface { private UUID clusterId; // total number of edits we replicated private AtomicLong totalReplicatedEdits = new AtomicLong(0); - // The znode we currently play with - protected String queueId; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Indicates if this particular source is running @@ -190,7 +188,7 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; @@ -212,8 +210,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.metrics = metrics; this.clusterId = clusterId; - this.queueId = queueId; - this.replicationQueueInfo = new ReplicationQueueInfo(queueId); + this.replicationQueueInfo = queueInfo; // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); @@ -229,15 +226,15 @@ public class ReplicationSource implements ReplicationSourceInterface { if (queueStorage instanceof ZKReplicationQueueStorage) { ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) queueStorage; zkQueueStorage.getZookeeper().registerListener( - new ReplicationQueueListener(this, zkQueueStorage, producer, queueId, walDir)); + new ReplicationQueueListener(this, zkQueueStorage, queueInfo, walDir)); LOG.info("Register a ZKListener to track the WALs from {}'s replication queue, queueId={}", - producer, queueId); + queueInfo.getOwner(), queueInfo.getQueueId()); } else { throw new UnsupportedOperationException( "hbase.replication.offload.enabled=true only support ZKReplicationQueueStorage"); } } - LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", getQueueId(), replicationPeer.getId(), this.currentBandwidth); } @@ -346,7 +343,7 @@ public class ReplicationSource implements ReplicationSourceInterface { createNewWALReader(walGroupId, worker.getStartPosition()); Threads.setDaemonThreadRunning( walReader, Thread.currentThread().getName() - + ".replicationSource.wal-reader." + walGroupId + "," + queueId, + + ".replicationSource.wal-reader." + walGroupId + "," + getQueueId(), (t,e) -> this.uncaughtException(t, e, null, this.getPeerId())); worker.setWALReader(walReader); worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId())); @@ -594,7 +591,7 @@ public class ReplicationSource implements ReplicationSourceInterface { setSourceStartupStatus(true); initThread = new Thread(this::initialize); Threads.setDaemonThreadRunning(initThread, - Thread.currentThread().getName() + ".replicationSource," + this.queueId, + Thread.currentThread().getName() + ".replicationSource," + getQueueId(), (t,e) -> { //if first initialization attempt failed, and abortOnError is false, we will //keep looping in this thread until initialize eventually succeeds, @@ -638,10 +635,10 @@ public class ReplicationSource implements ReplicationSourceInterface { public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { if (cause == null) { - LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); + LOG.info("{} Closing source {} because: {}", logPeerId(), getQueueId(), reason); } else { LOG.error(String.format("%s Closing source %s because an error occurred: %s", - logPeerId(), this.queueId, reason), cause); + logPeerId(), getQueueId(), reason), cause); } this.sourceRunning = false; if (initThread != null && Thread.currentThread() != initThread) { @@ -698,7 +695,7 @@ public class ReplicationSource implements ReplicationSourceInterface { TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("{} Got exception while waiting for endpoint to shutdown " - + "for replication source : {}", logPeerId(), this.queueId, te); + + "for replication source : {}", logPeerId(), getQueueId(), te); } } } @@ -711,11 +708,6 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public String getQueueId() { - return this.queueId; - } - - @Override public Path getCurrentPath() { // only for testing for (ReplicationSourceShipper worker : workerThreads.values()) { @@ -957,10 +949,10 @@ public class ReplicationSource implements ReplicationSourceInterface { private final Path walDir; public ReplicationQueueListener(ReplicationSource source, - ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String queueId, Path walDir) { + ZKReplicationQueueStorage zkQueueStorage, ReplicationQueueInfo queueInfo, Path walDir) { super(zkQueueStorage.getZookeeper()); this.source = source; - this.queueNode = zkQueueStorage.getQueueNode(producer, queueId); + this.queueNode = zkQueueStorage.getQueueNode(queueInfo.getOwner(), queueInfo.getQueueId()); this.walDir = walDir; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index 56c8ee4..6254af5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -36,8 +36,7 @@ public final class ReplicationSourceFactory { private ReplicationSourceFactory() {} public static ReplicationSourceInterface create(Configuration conf, String queueId) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); - boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); + boolean isQueueRecovered = ReplicationQueueInfo.isQueueRecovered(queueId); ReplicationSourceInterface src; try { String defaultReplicationSourceImpl = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 461276e..1122e5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -50,15 +51,14 @@ public interface ReplicationSourceInterface { * @param queueStorage the replication queue storage * @param replicationPeer the replication peer * @param server the server which start and run this replication source - * @param producer the name of region server which produce WAL to the replication queue - * @param queueId the id of our replication queue + * @param queueInfo the replication queue * @param clusterId unique UUID for the cluster * @param walFileLengthProvider used to get the WAL length * @param metrics metrics for this replication source */ void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; @@ -105,7 +105,16 @@ public interface ReplicationSourceInterface { * * @return queue id */ - String getQueueId(); + default String getQueueId() { + return getReplicationQueueInfo().getQueueId(); + } + + /** + * Get the replication queue info + * + * @return the replication queue info + */ + ReplicationQueueInfo getReplicationQueueInfo(); /** * Get the id that the source is replicating to. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 3dc2d12..c050e1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -298,14 +298,15 @@ public class ReplicationSourceManager implements ReplicationSourceController { private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException { ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(server.getServerName(), queueId); // Init the just created replication source. Pass the default walProvider's wal file length // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica // replication, see #createCatalogReplicationSource(). WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null? this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty(); - src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(), - queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId)); + src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueInfo, + clusterId, walFileLengthProvider, new MetricsSource(queueId)); return src; } @@ -587,7 +588,7 @@ public class ReplicationSourceManager implements ReplicationSourceController { // a copy of the replication peer first to decide whether we should start the // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475). - String peerId = new ReplicationQueueInfo(queue).getPeerId(); + String peerId = ReplicationQueueInfo.parsePeerId(queue); ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId); if (oldPeer == null) { LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", @@ -926,8 +927,8 @@ public class ReplicationSourceManager implements ReplicationSourceController { this.clusterId.toString()); final ReplicationSourceInterface crs = new CatalogReplicationSource(); crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server, - server.getServerName(), peer.getId(), clusterId, walProvider.getWALFileLengthProvider(), - new MetricsSource(peer.getId())); + new ReplicationQueueInfo(server.getServerName(), peer.getId()), clusterId, + walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); // Add listener on the provider so we can pick up the WAL to replicate on roll. WALActionsListener listener = new WALActionsListener() { @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 7203fd1..ba440fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -69,13 +69,11 @@ public class ReplicationChecker { Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); for (ServerName replicator : queueStorage.getListOfReplicators()) { for (String queueId : queueStorage.getAllQueues(replicator)) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { + String peerId = ReplicationQueueInfo.parsePeerId(queueId); + if (!peerIds.contains(peerId)) { undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); - LOG.debug( - "Undeleted replication queue for removed peer found: " + - "[removedPeerId={}, replicator={}, queueId={}]", - queueInfo.getPeerId(), replicator, queueId); + LOG.debug("Undeleted replication queue for removed peer found: " + + "[removedPeerId={}, replicator={}, queueId={}]", peerId, replicator, queueId); } } } @@ -99,10 +97,9 @@ public class ReplicationChecker { undeletedQueueIds = getUnDeletedQueues(); undeletedQueueIds.forEach((replicator, queueIds) -> { queueIds.forEach(queueId -> { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); String msg = "Undeleted replication queue for removed peer found: " + - String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), - replicator, queueId); + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", + ReplicationQueueInfo.parsePeerId(queueId), replicator, queueId); errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); }); }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 8f28dee..6b87047 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationPeer replicationPeer; - private String queueId; + private ReplicationQueueInfo replicationQueueInfo; private Path currentPath; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; @@ -48,10 +48,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - this.queueId = queueId; + this.replicationQueueInfo = queueInfo; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; this.replicationPeer = replicationPeer; @@ -69,6 +69,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override + public ReplicationQueueInfo getReplicationQueueInfo() { + return this.replicationQueueInfo; + } + + @Override public ReplicationSourceInterface startup() { startup.set(true); return this; @@ -96,15 +101,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public String getQueueId() { - return queueId; - } - - @Override public String getPeerId() { - String[] parts = queueId.split("-", 2); - return parts.length != 1 ? - parts[0] : queueId; + return this.replicationQueueInfo.getPeerId(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index bd673bc..dde2c3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -139,7 +140,8 @@ public class TestReplicationSource { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, + new ReplicationQueueInfo(rss.getServerName(), queueId), null, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -177,7 +179,8 @@ public class TestReplicationSource { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, uuid, + rs.init(conf, null, null, manager, null, mockPeer, rss, + new ReplicationQueueInfo(rss.getServerName(), queueId), uuid, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -265,8 +268,8 @@ public class TestReplicationSource { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - source.init(testConf, null, null, manager, null, mockPeer, null, null, "testPeer", null, - p -> OptionalLong.empty(), null); + source.init(testConf, null, null, manager, null, mockPeer, null, + new ReplicationQueueInfo(null, "testPeer"), null, p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit( () -> source.terminate("testing source termination")); @@ -289,8 +292,9 @@ public class TestReplicationSource { ReplicationPeer mockPeer = mock(ReplicationPeer.class); Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); Configuration testConf = HBaseConfiguration.create(); - source.init(testConf, null, null, mockManager, null, mockPeer, null, null, - "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); + source.init(testConf, null, null, mockManager, null, mockPeer, null, + new ReplicationQueueInfo(null, "testPeer"), null, p -> OptionalLong.empty(), + mock(MetricsSource.class)); ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); ReplicationSourceShipper shipper = @@ -536,7 +540,8 @@ public class TestReplicationSource { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, + new ReplicationQueueInfo(rss.getServerName(), queueId), null, p -> OptionalLong.empty(), new MetricsSource(queueId)); return rss; } @@ -655,7 +660,8 @@ public class TestReplicationSource { TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); ReplicationSource source = new ReplicationSource(); - source.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), id, null, + source.init(conf, null, null, manager, null, mockPeer, rss, + new ReplicationQueueInfo(rss.getServerName(), id), null, p -> OptionalLong.empty(), metrics); final Path log1 = new Path(logDir, "log-walgroup-a.8"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index e6b745e..2b6ab55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; @@ -414,8 +415,8 @@ public abstract class TestReplicationSourceManager { assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); ReplicationSourceInterface source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"), - manager.getServer(), manager.getServer().getServerName(), id, null, p -> OptionalLong.empty(), - null); + manager.getServer(), new ReplicationQueueInfo(manager.getServer().getServerName(), id), null, + p -> OptionalLong.empty(), null); source.cleanOldWALs(file2, false); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); @@ -632,7 +633,8 @@ public abstract class TestReplicationSourceManager { ReplicationSourceInterface source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), - manager.getServer().getServerName(), peerId2, null, p -> OptionalLong.empty(), null); + new ReplicationQueueInfo(manager.getServer().getServerName(), peerId2), null, + p -> OptionalLong.empty(), null); source.cleanOldWALs(walName, true); // still there if peer id does not match assertTrue(fs.exists(remoteWAL)); @@ -640,7 +642,8 @@ public abstract class TestReplicationSourceManager { source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), - manager.getServer().getServerName(), slaveId, null, p -> OptionalLong.empty(), null); + new ReplicationQueueInfo(manager.getServer().getServerName(), slaveId), null, + p -> OptionalLong.empty(), null); source.cleanOldWALs(walName, true); assertFalse(fs.exists(remoteWAL)); } finally { @@ -821,7 +824,7 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 8e0ab0f..86c141b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -95,7 +95,8 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst(); queueStorage.removeReplicatorIfQueueIsEmpty(serverName); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); + ReplicationQueueInfo replicationQueueInfo = + new ReplicationQueueInfo(s3.getServerName(), queue3); List<ServerName> result = replicationQueueInfo.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName()));