This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-24666 by this push:
     new fd2f3d1  HBASE-26084 Add owner of replication queue for 
ReplicationQueueInfo (#3477)
fd2f3d1 is described below

commit fd2f3d1abdcb79b0fc037ff92e65459660c5b502
Author: XinSun <ddu...@gmail.com>
AuthorDate: Thu Aug 12 17:14:35 2021 +0800

    HBASE-26084 Add owner of replication queue for ReplicationQueueInfo (#3477)
    
    Signed-off-by: stack <st...@apple.com>
---
 .../hbase/replication/ReplicationQueueInfo.java    | 24 +++++++++++++++-
 .../hadoop/hbase/replication/ReplicationUtils.java |  3 +-
 .../replication/ZKReplicationQueueStorage.java     |  2 +-
 .../master/replication/ReplicationPeerManager.java |  3 +-
 .../hbase/replication/HReplicationServer.java      | 22 +++++++--------
 .../regionserver/DumpReplicationQueues.java        |  2 +-
 .../regionserver/RecoveredReplicationSource.java   |  7 +++--
 .../regionserver/ReplicationSource.java            | 32 ++++++++--------------
 .../regionserver/ReplicationSourceFactory.java     |  3 +-
 .../regionserver/ReplicationSourceInterface.java   | 17 +++++++++---
 .../regionserver/ReplicationSourceManager.java     | 11 ++++----
 .../hadoop/hbase/util/hbck/ReplicationChecker.java | 15 ++++------
 .../hbase/replication/ReplicationSourceDummy.java  | 20 ++++++--------
 .../regionserver/TestReplicationSource.java        | 22 +++++++++------
 .../regionserver/TestReplicationSourceManager.java | 13 +++++----
 .../TestReplicationSourceManagerZkImpl.java        |  3 +-
 16 files changed, 113 insertions(+), 86 deletions(-)

diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index d39a37e..49a2153 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 public class ReplicationQueueInfo {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationQueueInfo.class);
 
+  private final ServerName owner;
   private final String peerId;
   private final String queueId;
   private boolean queueRecovered;
@@ -46,7 +47,8 @@ public class ReplicationQueueInfo {
    * The passed queueId will be either the id of the peer or the handling 
story of that queue
    * in the form of id-servername-*
    */
-  public ReplicationQueueInfo(String queueId) {
+  public ReplicationQueueInfo(ServerName owner, String queueId) {
+    this.owner = owner;
     this.queueId = queueId;
     String[] parts = queueId.split("-", 2);
     this.queueRecovered = parts.length != 1;
@@ -58,6 +60,22 @@ public class ReplicationQueueInfo {
   }
 
   /**
+   * A util method to parse the peerId from queueId.
+   */
+  public static String parsePeerId(String queueId) {
+    String[] parts = queueId.split("-", 2);
+    return parts.length != 1 ? parts[0] : queueId;
+  }
+
+  /**
+   * A util method to check whether a queue is recovered.
+   */
+  public static boolean isQueueRecovered(String queueId) {
+    String[] parts = queueId.split("-", 2);
+    return parts.length != 1;
+  }
+
+  /**
    * Parse dead server names from queue id. servername can contain "-" such as
    * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing 
for the following
    * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server 
name>-...
@@ -114,6 +132,10 @@ public class ReplicationQueueInfo {
     return Collections.unmodifiableList(this.deadRegionServers);
   }
 
+  public ServerName getOwner() {
+    return this.owner;
+  }
+
   public String getPeerId() {
     return this.peerId;
   }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 7bafbc2..7dbfe41a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -86,8 +86,7 @@ public final class ReplicationUtils {
     for (ServerName replicator : queueStorage.getListOfReplicators()) {
       List<String> queueIds = queueStorage.getAllQueues(replicator);
       for (String queueId : queueIds) {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-        if (queueInfo.getPeerId().equals(peerId)) {
+        if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
           queueStorage.removeQueue(replicator, queueId);
         }
       }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 08ac142..141e890 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -205,7 +205,7 @@ public class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
 
   private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
       List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
-    String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+    String peerId = ReplicationQueueInfo.parsePeerId(queueId);
     for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
       String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), 
peerId);
       Pair<Long, Integer> p = 
getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index add5121..c2a21a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -103,8 +103,7 @@ public class ReplicationPeerManager {
     for (ServerName replicator : queueStorage.getListOfReplicators()) {
       List<String> queueIds = queueStorage.getAllQueues(replicator);
       for (String queueId : queueIds) {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-        if (queueInfo.getPeerId().equals(peerId)) {
+        if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
           throw new DoNotRetryIOException("undeleted queue for peerId: " + 
peerId +
             ", replicator: " + replicator + ", queueId: " + queueId);
         }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 2654565..f31a98d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -701,25 +701,24 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
   public void finishRecoveredSource(RecoveredReplicationSource src) {
     this.sources.remove(src.getQueueId());
     this.sourceMetrics.remove(src.getQueueId());
-    deleteQueue(src.getQueueId());
+    deleteQueue(src.getReplicationQueueInfo());
     LOG.info("Finished recovering queue {} with the following stats: {}", 
src.getQueueId(),
       src.getStats());
   }
 
-  public void startReplicationSource(ServerName producer, String queueId)
+  public void startReplicationSource(ServerName owner, String queueId)
     throws IOException, ReplicationException {
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queueId);
+    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(owner, queueId);
     String peerId = replicationQueueInfo.getPeerId();
     this.replicationPeers.addPeer(peerId);
-    Path walDir =
-      new Path(walRootDir, 
AbstractFSWALProvider.getWALDirectoryName(producer.toString()));
+    Path walDir = new Path(walRootDir, 
AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
     MetricsSource metrics = new MetricsSource(queueId);
 
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
     // init replication source
     src.init(conf, walFs, walDir, this, queueStorage, 
replicationPeers.getPeer(peerId), this,
-      producer, queueId, clusterId, createWALFileLengthProvider(producer, 
queueId), metrics);
-    queueStorage.getWALsInQueue(producer, queueId)
+      replicationQueueInfo, clusterId, createWALFileLengthProvider(owner, 
queueId), metrics);
+    queueStorage.getWALsInQueue(owner, queueId)
       .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
     src.startup();
     sources.put(queueId, src);
@@ -728,10 +727,11 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
 
   /**
    * Delete a complete queue of wals associated with a replication source
-   * @param queueId the id of replication queue to delete
+   * @param queueInfo the replication queue to delete
    */
-  private void deleteQueue(String queueId) {
-    abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), 
queueId));
+  private void deleteQueue(ReplicationQueueInfo queueInfo) {
+    abortWhenFail(() ->
+      this.queueStorage.removeQueue(queueInfo.getOwner(), 
queueInfo.getQueueId()));
   }
 
   @FunctionalInterface
@@ -748,7 +748,7 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
   }
 
   private WALFileLengthProvider createWALFileLengthProvider(ServerName 
producer, String queueId) {
-    if (new ReplicationQueueInfo(queueId).isQueueRecovered()) {
+    if (ReplicationQueueInfo.isQueueRecovered(queueId)) {
       return p -> OptionalLong.empty();
     }
     return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 11b0c7c..ef4fde3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -316,7 +316,7 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
         deadRegionServers.add(regionserver.getServerName());
       }
       for (String queueId : queueIds) {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        ReplicationQueueInfo queueInfo = new 
ReplicationQueueInfo(regionserver, queueId);
         List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
         Collections.sort(wals);
         if (!peerIds.contains(queueInfo.getPeerId())) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 147556f..c8200ca 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -51,11 +52,11 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   @Override
   public void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
-    ReplicationPeer replicationPeer, Server server, ServerName producer, 
String queueId,
+    ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo 
queueInfo,
     UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource 
metrics)
     throws IOException {
-    super.init(conf, fs, walDir, overallController, queueStorage, 
replicationPeer, server, producer,
-      queueId, clusterId, walFileLengthProvider, metrics);
+    super.init(conf, fs, walDir, overallController, queueStorage, 
replicationPeer, server,
+      queueInfo, clusterId, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0d9ee4b..4e46a79 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -112,8 +112,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   private UUID clusterId;
   // total number of edits we replicated
   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
-  // The znode we currently play with
-  protected String queueId;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Indicates if this particular source is running
@@ -190,7 +188,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   @Override
   public void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
-    ReplicationPeer replicationPeer, Server server, ServerName producer, 
String queueId,
+    ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo 
queueInfo,
     UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource 
metrics)
     throws IOException {
     this.server = server;
@@ -212,8 +210,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     this.metrics = metrics;
     this.clusterId = clusterId;
 
-    this.queueId = queueId;
-    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
+    this.replicationQueueInfo = queueInfo;
 
     // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
     defaultBandwidth = 
this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -229,15 +226,15 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       if (queueStorage instanceof ZKReplicationQueueStorage) {
         ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) 
queueStorage;
         zkQueueStorage.getZookeeper().registerListener(
-          new ReplicationQueueListener(this, zkQueueStorage, producer, 
queueId, walDir));
+          new ReplicationQueueListener(this, zkQueueStorage, queueInfo, 
walDir));
         LOG.info("Register a ZKListener to track the WALs from {}'s 
replication queue, queueId={}",
-          producer, queueId);
+          queueInfo.getOwner(), queueInfo.getQueueId());
       } else {
         throw new UnsupportedOperationException(
           "hbase.replication.offload.enabled=true only support 
ZKReplicationQueueStorage");
       }
     }
-    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
+    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", 
getQueueId(),
       replicationPeer.getId(), this.currentBandwidth);
   }
 
@@ -346,7 +343,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
             createNewWALReader(walGroupId, worker.getStartPosition());
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
-            + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
+            + ".replicationSource.wal-reader." + walGroupId + "," + 
getQueueId(),
           (t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
         worker.setWALReader(walReader);
         worker.startup((t,e) -> this.uncaughtException(t, e, null, 
this.getPeerId()));
@@ -594,7 +591,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     setSourceStartupStatus(true);
     initThread = new Thread(this::initialize);
     Threads.setDaemonThreadRunning(initThread,
-      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
+      Thread.currentThread().getName() + ".replicationSource," + getQueueId(),
       (t,e) -> {
         //if first initialization attempt failed, and abortOnError is false, 
we will
         //keep looping in this thread until initialize eventually succeeds,
@@ -638,10 +635,10 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   public void terminate(String reason, Exception cause, boolean clearMetrics,
       boolean join) {
     if (cause == null) {
-      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, 
reason);
+      LOG.info("{} Closing source {} because: {}", logPeerId(), getQueueId(), 
reason);
     } else {
       LOG.error(String.format("%s Closing source %s because an error occurred: 
%s",
-        logPeerId(), this.queueId, reason), cause);
+        logPeerId(), getQueueId(), reason), cause);
     }
     this.sourceRunning = false;
     if (initThread != null && Thread.currentThread() != initThread) {
@@ -698,7 +695,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
             TimeUnit.MILLISECONDS);
         } catch (TimeoutException te) {
           LOG.warn("{} Got exception while waiting for endpoint to shutdown "
-            + "for replication source : {}", logPeerId(), this.queueId, te);
+            + "for replication source : {}", logPeerId(), getQueueId(), te);
         }
       }
     }
@@ -711,11 +708,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public String getQueueId() {
-    return this.queueId;
-  }
-
-  @Override
   public Path getCurrentPath() {
     // only for testing
     for (ReplicationSourceShipper worker : workerThreads.values()) {
@@ -957,10 +949,10 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     private final Path walDir;
 
     public ReplicationQueueListener(ReplicationSource source,
-      ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String 
queueId, Path walDir) {
+      ZKReplicationQueueStorage zkQueueStorage, ReplicationQueueInfo 
queueInfo, Path walDir) {
       super(zkQueueStorage.getZookeeper());
       this.source = source;
-      this.queueNode = zkQueueStorage.getQueueNode(producer, queueId);
+      this.queueNode = zkQueueStorage.getQueueNode(queueInfo.getOwner(), 
queueInfo.getQueueId());
       this.walDir = walDir;
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 56c8ee4..6254af5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -36,8 +36,7 @@ public final class ReplicationSourceFactory {
   private ReplicationSourceFactory() {}
 
   public static ReplicationSourceInterface create(Configuration conf, String 
queueId) {
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queueId);
-    boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
+    boolean isQueueRecovered = ReplicationQueueInfo.isQueueRecovered(queueId);
     ReplicationSourceInterface src;
     try {
       String defaultReplicationSourceImpl =
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 461276e..1122e5e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -50,15 +51,14 @@ public interface ReplicationSourceInterface {
    * @param queueStorage the replication queue storage
    * @param replicationPeer the replication peer
    * @param server the server which start and run this replication source
-   * @param producer the name of region server which produce WAL to the 
replication queue
-   * @param queueId the id of our replication queue
+   * @param queueInfo the replication queue
    * @param clusterId unique UUID for the cluster
    * @param walFileLengthProvider used to get the WAL length
    * @param metrics metrics for this replication source
    */
   void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
-    ReplicationPeer replicationPeer, Server server, ServerName producer, 
String queueId,
+    ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo 
queueInfo,
     UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource 
metrics)
     throws IOException;
 
@@ -105,7 +105,16 @@ public interface ReplicationSourceInterface {
    *
    * @return queue id
    */
-  String getQueueId();
+  default String getQueueId() {
+    return getReplicationQueueInfo().getQueueId();
+  }
+
+  /**
+   * Get the replication queue info
+   *
+   * @return the replication queue info
+   */
+  ReplicationQueueInfo getReplicationQueueInfo();
 
   /**
    * Get the id that the source is replicating to.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3dc2d12..c050e1f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -298,14 +298,15 @@ public class ReplicationSourceManager implements 
ReplicationSourceController {
   private ReplicationSourceInterface createSource(String queueId, 
ReplicationPeer replicationPeer)
       throws IOException {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
+    ReplicationQueueInfo queueInfo = new 
ReplicationQueueInfo(server.getServerName(), queueId);
     // Init the just created replication source. Pass the default 
walProvider's wal file length
     // provider. Presumption is we replicate user-space Tables only. For 
hbase:meta region replica
     // replication, see #createCatalogReplicationSource().
     WALFileLengthProvider walFileLengthProvider =
       this.walFactory.getWALProvider() != null?
         this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> 
OptionalLong.empty();
-    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, 
server.getServerName(),
-      queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId));
+    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, 
queueInfo,
+      clusterId, walFileLengthProvider, new MetricsSource(queueId));
     return src;
   }
 
@@ -587,7 +588,7 @@ public class ReplicationSourceManager implements 
ReplicationSourceController {
     // a copy of the replication peer first to decide whether we should start 
the
     // RecoveredReplicationSource. If the latest peer is not the old peer, we 
should also skip to
     // start the RecoveredReplicationSource, Otherwise the rs will abort (See 
HBASE-20475).
-    String peerId = new ReplicationQueueInfo(queue).getPeerId();
+    String peerId = ReplicationQueueInfo.parsePeerId(queue);
     ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
     if (oldPeer == null) {
       LOG.info("Not transferring queue since the replication peer {} for queue 
{} does not exist",
@@ -926,8 +927,8 @@ public class ReplicationSourceManager implements 
ReplicationSourceController {
       this.clusterId.toString());
     final ReplicationSourceInterface crs = new CatalogReplicationSource();
     crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, 
server,
-      server.getServerName(), peer.getId(), clusterId, 
walProvider.getWALFileLengthProvider(),
-      new MetricsSource(peer.getId()));
+      new ReplicationQueueInfo(server.getServerName(), peer.getId()), 
clusterId,
+      walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
     // Add listener on the provider so we can pick up the WAL to replicate on 
roll.
     WALActionsListener listener = new WALActionsListener() {
       @Override public void postLogRoll(Path oldPath, Path newPath) throws 
IOException {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 7203fd1..ba440fc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -69,13 +69,11 @@ public class ReplicationChecker {
     Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
     for (ServerName replicator : queueStorage.getListOfReplicators()) {
       for (String queueId : queueStorage.getAllQueues(replicator)) {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-        if (!peerIds.contains(queueInfo.getPeerId())) {
+        String peerId = ReplicationQueueInfo.parsePeerId(queueId);
+        if (!peerIds.contains(peerId)) {
           undeletedQueues.computeIfAbsent(replicator, key -> new 
ArrayList<>()).add(queueId);
-          LOG.debug(
-            "Undeleted replication queue for removed peer found: " +
-              "[removedPeerId={}, replicator={}, queueId={}]",
-            queueInfo.getPeerId(), replicator, queueId);
+          LOG.debug("Undeleted replication queue for removed peer found: " +
+              "[removedPeerId={}, replicator={}, queueId={}]", peerId, 
replicator, queueId);
         }
       }
     }
@@ -99,10 +97,9 @@ public class ReplicationChecker {
     undeletedQueueIds = getUnDeletedQueues();
     undeletedQueueIds.forEach((replicator, queueIds) -> {
       queueIds.forEach(queueId -> {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
         String msg = "Undeleted replication queue for removed peer found: " +
-          String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", 
queueInfo.getPeerId(),
-            replicator, queueId);
+          String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+            ReplicationQueueInfo.parsePeerId(queueId), replicator, queueId);
         
errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
 msg);
       });
     });
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 8f28dee..6b87047 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   private ReplicationPeer replicationPeer;
-  private String queueId;
+  private ReplicationQueueInfo replicationQueueInfo;
   private Path currentPath;
   private MetricsSource metrics;
   private WALFileLengthProvider walFileLengthProvider;
@@ -48,10 +48,10 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   @Override
   public void init(Configuration conf, FileSystem fs, Path walDir,
     ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
-    ReplicationPeer replicationPeer, Server server, ServerName producer, 
String queueId,
+    ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo 
queueInfo,
     UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource 
metrics)
     throws IOException {
-    this.queueId = queueId;
+    this.replicationQueueInfo = queueInfo;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
     this.replicationPeer = replicationPeer;
@@ -69,6 +69,11 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   }
 
   @Override
+  public ReplicationQueueInfo getReplicationQueueInfo() {
+    return this.replicationQueueInfo;
+  }
+
+  @Override
   public ReplicationSourceInterface startup() {
     startup.set(true);
     return this;
@@ -96,15 +101,8 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public String getQueueId() {
-    return queueId;
-  }
-
-  @Override
   public String getPeerId() {
-    String[] parts = queueId.split("-", 2);
-    return parts.length != 1 ?
-        parts[0] : queueId;
+    return this.replicationQueueInfo.getPeerId();
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index bd673bc..dde2c3c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -139,7 +140,8 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, null, manager, null, mockPeer, rss, 
rss.getServerName(), queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss,
+      new ReplicationQueueInfo(rss.getServerName(), queueId), null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -177,7 +179,8 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, null, manager, null, mockPeer, rss, 
rss.getServerName(), queueId, uuid,
+    rs.init(conf, null, null, manager, null, mockPeer, rss,
+      new ReplicationQueueInfo(rss.getServerName(), queueId), uuid,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -265,8 +268,8 @@ public class TestReplicationSource {
       testConf.setInt("replication.source.maxretriesmultiplier", 1);
       ReplicationSourceManager manager = 
Mockito.mock(ReplicationSourceManager.class);
       Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-      source.init(testConf, null, null, manager, null, mockPeer, null, null, 
"testPeer", null,
-        p -> OptionalLong.empty(), null);
+      source.init(testConf, null, null, manager, null, mockPeer, null,
+        new ReplicationQueueInfo(null, "testPeer"), null, p -> 
OptionalLong.empty(), null);
       ExecutorService executor = Executors.newSingleThreadExecutor();
       Future<?> future = executor.submit(
         () -> source.terminate("testing source termination"));
@@ -289,8 +292,9 @@ public class TestReplicationSource {
     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
     Configuration testConf = HBaseConfiguration.create();
-    source.init(testConf, null, null, mockManager, null, mockPeer, null, null,
-      "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
+    source.init(testConf, null, null, mockManager, null, mockPeer, null,
+      new ReplicationQueueInfo(null, "testPeer"), null, p -> 
OptionalLong.empty(),
+      mock(MetricsSource.class));
     ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
       conf, null, 0, null, source, null);
     ReplicationSourceShipper shipper =
@@ -536,7 +540,8 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, null, manager, null, mockPeer, rss, 
rss.getServerName(), queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss,
+      new ReplicationQueueInfo(rss.getServerName(), queueId), null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     return rss;
   }
@@ -655,7 +660,8 @@ public class TestReplicationSource {
         
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
 
       ReplicationSource source = new ReplicationSource();
-      source.init(conf, null, null, manager, null, mockPeer, rss, 
rss.getServerName(), id, null,
+      source.init(conf, null, null, manager, null, mockPeer, rss,
+        new ReplicationQueueInfo(rss.getServerName(), id), null,
         p -> OptionalLong.empty(), metrics);
 
       final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index e6b745e..2b6ab55 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@@ -414,8 +415,8 @@ public abstract class TestReplicationSourceManager {
     assertEquals(files, 
manager.getWalsByIdRecoveredQueues().get(id).get(group));
     ReplicationSourceInterface source = new ReplicationSource();
     source.init(conf, fs, null, manager, manager.getQueueStorage(), 
rp1.getPeer("1"),
-      manager.getServer(), manager.getServer().getServerName(), id, null, p -> 
OptionalLong.empty(),
-      null);
+      manager.getServer(), new 
ReplicationQueueInfo(manager.getServer().getServerName(), id), null,
+      p -> OptionalLong.empty(), null);
     source.cleanOldWALs(file2, false);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), 
manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -632,7 +633,8 @@ public abstract class TestReplicationSourceManager {
       ReplicationSourceInterface source = new ReplicationSource();
       source.init(conf, fs, null, manager, manager.getQueueStorage(),
         mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
-        manager.getServer().getServerName(), peerId2, null, p -> 
OptionalLong.empty(), null);
+        new ReplicationQueueInfo(manager.getServer().getServerName(), 
peerId2), null,
+        p -> OptionalLong.empty(), null);
       source.cleanOldWALs(walName, true);
       // still there if peer id does not match
       assertTrue(fs.exists(remoteWAL));
@@ -640,7 +642,8 @@ public abstract class TestReplicationSourceManager {
       source = new ReplicationSource();
       source.init(conf, fs, null, manager, manager.getQueueStorage(),
         mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
-        manager.getServer().getServerName(), slaveId, null, p -> 
OptionalLong.empty(), null);
+        new ReplicationQueueInfo(manager.getServer().getServerName(), 
slaveId), null,
+        p -> OptionalLong.empty(), null);
       source.cleanOldWALs(walName, true);
       assertFalse(fs.exists(remoteWAL));
     } finally {
@@ -821,7 +824,7 @@ public abstract class TestReplicationSourceManager {
     @Override
     public void init(Configuration conf, FileSystem fs, Path walDir,
       ReplicationSourceController overallController, ReplicationQueueStorage 
queueStorage,
-      ReplicationPeer replicationPeer, Server server, ServerName producer, 
String queueId,
+      ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo 
queueInfo,
       UUID clusterId, WALFileLengthProvider walFileLengthProvider, 
MetricsSource metrics)
       throws IOException {
       throw new IOException("Failing deliberately");
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 8e0ab0f..86c141b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -95,7 +95,8 @@ public class TestReplicationSourceManagerZkImpl extends 
TestReplicationSourceMan
         queueStorage.claimQueue(serverName, unclaimed.get(0), 
s3.getServerName()).getFirst();
     queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
 
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(queue3);
+    ReplicationQueueInfo replicationQueueInfo =
+      new ReplicationQueueInfo(s3.getServerName(), queue3);
     List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
     // verify
     assertTrue(result.contains(server.getServerName()));

Reply via email to