This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-24666 by this push:
     new afd8b9b  HBASE-26194 Introduce a ReplicationServerSourceManager to 
simplify HReplicationServer (#3584)
afd8b9b is described below

commit afd8b9bbfa14cddc797978f94a996bd59b23e279
Author: XinSun <[email protected]>
AuthorDate: Tue Aug 17 16:28:18 2021 +0800

    HBASE-26194 Introduce a ReplicationServerSourceManager to simplify 
HReplicationServer (#3584)
    
    Signed-off-by: stack <[email protected]>
---
 .../apache/hadoop/hbase/security/SecurityInfo.java |   7 +
 .../hadoop/hbase/security/SecurityConstants.java   |   4 +
 .../hbase/replication/ReplicationQueueInfo.java    |  15 ++
 .../hbase/replication/HReplicationServer.java      | 122 ++---------
 .../replication/regionserver/MetricsSource.java    |  13 ++
 .../ReplicationServerSourceManager.java            | 234 +++++++++++++++++++++
 .../TestReplicationServerSourceManager.java        | 139 ++++++++++++
 7 files changed, 427 insertions(+), 107 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index f5f6922..4740d9d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -27,6 +27,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos;
 
 /**
  * Maps RPC protocol interfaces to required configuration
@@ -51,6 +53,11 @@ public class SecurityInfo {
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
     infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
         new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
+    
infos.put(ReplicationServerStatusProtos.ReplicationServerStatusService.getDescriptor()
+        .getName(),
+      new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
+    
infos.put(ReplicationServerProtos.ReplicationServerService.getDescriptor().getName(),
+      new SecurityInfo(SecurityConstants.REPLICATION_SERVER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
     // NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider 
ALSO ELSE
     // new Service will not be found when all is Kerberized!!!!
   }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
index 3e387e8..d0e13a3 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/SecurityConstants.java
@@ -34,6 +34,10 @@ public final class SecurityConstants {
   public static final String MASTER_KRB_KEYTAB_FILE = 
"hbase.master.keytab.file";
   public static final String REGIONSERVER_KRB_PRINCIPAL = 
"hbase.regionserver.kerberos.principal";
   public static final String REGIONSERVER_KRB_KEYTAB_FILE = 
"hbase.regionserver.keytab.file";
+  public static final String REPLICATION_SERVER_KRB_PRINCIPAL =
+    "hbase.replication.server.kerberos.principal";
+  public static final String REPLICATION_SERVER_KRB_KEYTAB_FILE =
+    "hbase.replication.server.keytab.file";
 
   /**
    * This config is for experts: don't set its value unless you really know 
what you are doing.
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index 49a2153..9c1c03a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -147,4 +148,18 @@ public class ReplicationQueueInfo {
   public boolean isQueueRecovered() {
     return queueRecovered;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ReplicationQueueInfo) {
+      ReplicationQueueInfo other = (ReplicationQueueInfo) o;
+      return Objects.equals(this.owner, other.owner) && 
Objects.equals(this.queueId, other.queueId);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(this.owner, this.queueId);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index f31a98d..1a9b18c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -20,19 +20,12 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.lang.management.MemoryUsage;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.OptionalLong;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -49,22 +42,14 @@ import 
org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.ReplicationService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
-import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
-import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
-import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
-import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import 
org.apache.hadoop.hbase.replication.replicationserver.RemoteWALFileLengthProvider;
+import 
org.apache.hadoop.hbase.replication.replicationserver.ReplicationServerSourceManager;
 import org.apache.hadoop.hbase.security.SecurityConstants;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Sleeper;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -89,7 +74,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatus
  */
 @InterfaceAudience.Private
 @SuppressWarnings({ "deprecation"})
-public class HReplicationServer extends Thread implements Server, 
ReplicationSourceController  {
+public class HReplicationServer extends Thread implements Server {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HReplicationServer.class);
 
@@ -150,18 +135,6 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
 
   final ReplicationServerRpcServices rpcServices;
 
-  // Total buffer size on this RegionServer for holding batched edits to be 
shipped.
-  private final long totalBufferLimit;
-  private AtomicLong totalBufferUsed = new AtomicLong();
-
-  private final MetricsReplicationGlobalSourceSource globalMetrics;
-  private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
-  private final ConcurrentMap<String, ReplicationSourceInterface> sources =
-    new ConcurrentHashMap<>();
-
-  private final ReplicationQueueStorage queueStorage;
-  private final ReplicationPeers replicationPeers;
-
   // Stub to do region server status calls against the master.
   private volatile ReplicationServerStatusService.BlockingInterface rssStub;
 
@@ -170,6 +143,8 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
 
   private ReplicationSinkService replicationSinkService;
 
+  private ReplicationServerSourceManager sourceManager;
+
   public HReplicationServer(final Configuration conf) throws Exception {
     try {
       this.startCode = System.currentTimeMillis();
@@ -198,11 +173,6 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
 
       this.shortOperationTimeout = 
conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
           HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
-      this.totalBufferLimit = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
-        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
-      this.globalMetrics =
-        
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
-          .getGlobalSource();
 
       initializeFileSystem();
       this.choreService = new ChoreService(getName(), true);
@@ -219,10 +189,6 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
         masterAddressTracker = null;
       }
 
-      this.queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf);
-      this.replicationPeers =
-        ReplicationFactory.getReplicationPeers(zooKeeper, this.conf);
-      this.replicationPeers.init();
       this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper);
       this.rpcServices.start(zooKeeper);
       this.choreService = new ChoreService(getName(), true);
@@ -497,7 +463,11 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
   /**
    * Start up replication source and sink handlers.
    */
-  private void startReplicationService() throws IOException {
+  private void startReplicationService() throws IOException, 
ReplicationException {
+    Path oldWalDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    this.sourceManager = new ReplicationServerSourceManager(this, walFs, 
walRootDir,
+      oldWalDir, clusterId);
+    this.sourceManager.init();
     if (this.replicationSinkService != null) {
       this.replicationSinkService.startReplicationService();
     }
@@ -510,6 +480,10 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
     return replicationSinkService;
   }
 
+  public ReplicationServerSourceManager getReplicationServerSourceManager() {
+    return this.sourceManager;
+  }
+
   /**
    * Report the status of the server. A server is online once all the startup 
is
    * completed (setting up filesystem, starting executorService threads, 
etc.). This
@@ -682,75 +656,9 @@ public class HReplicationServer extends Thread implements 
Server, ReplicationSou
     return interrupted;
   }
 
-  @Override
-  public long getTotalBufferLimit() {
-    return this.totalBufferLimit;
-  }
-
-  @Override
-  public AtomicLong getTotalBufferUsed() {
-    return this.totalBufferUsed;
-  }
-
-  @Override
-  public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
-    return this.globalMetrics;
-  }
-
-  @Override
-  public void finishRecoveredSource(RecoveredReplicationSource src) {
-    this.sources.remove(src.getQueueId());
-    this.sourceMetrics.remove(src.getQueueId());
-    deleteQueue(src.getReplicationQueueInfo());
-    LOG.info("Finished recovering queue {} with the following stats: {}", 
src.getQueueId(),
-      src.getStats());
-  }
-
   public void startReplicationSource(ServerName owner, String queueId)
     throws IOException, ReplicationException {
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(owner, queueId);
-    String peerId = replicationQueueInfo.getPeerId();
-    this.replicationPeers.addPeer(peerId);
-    Path walDir = new Path(walRootDir, 
AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
-    MetricsSource metrics = new MetricsSource(queueId);
-
-    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
-    // init replication source
-    src.init(conf, walFs, walDir, this, queueStorage, 
replicationPeers.getPeer(peerId), this,
-      replicationQueueInfo, clusterId, createWALFileLengthProvider(owner, 
queueId), metrics);
-    queueStorage.getWALsInQueue(owner, queueId)
-      .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
-    src.startup();
-    sources.put(queueId, src);
-    sourceMetrics.put(queueId, metrics);
-  }
-
-  /**
-   * Delete a complete queue of wals associated with a replication source
-   * @param queueInfo the replication queue to delete
-   */
-  private void deleteQueue(ReplicationQueueInfo queueInfo) {
-    abortWhenFail(() ->
-      this.queueStorage.removeQueue(queueInfo.getOwner(), 
queueInfo.getQueueId()));
-  }
-
-  @FunctionalInterface
-  private interface ReplicationQueueOperation {
-    void exec() throws ReplicationException;
-  }
-
-  private void abortWhenFail(ReplicationQueueOperation op) {
-    try {
-      op.exec();
-    } catch (ReplicationException e) {
-      abort("Failed to operate on replication queue", e);
-    }
-  }
-
-  private WALFileLengthProvider createWALFileLengthProvider(ServerName 
producer, String queueId) {
-    if (ReplicationQueueInfo.isQueueRecovered(queueId)) {
-      return p -> OptionalLong.empty();
-    }
-    return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
+    LOG.info("Start replication source, owner: {}, queueId: {}", owner, 
queueId);
+    this.sourceManager.startReplicationSource(owner, queueId);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 3ab0806..2135010 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -66,6 +67,18 @@ public class MetricsSource implements BaseSource {
     singleSourceSourceByTable = new HashMap<>();
   }
 
+
+  /**
+   * Constructor used to register the metrics
+   * On ReplicationServer, there may be multiple queues with the same queueId, 
so use queueOwner
+   * and queueId to distinguish them.
+   *
+   * @param queueInfo The replication queue this class is monitoring
+   */
+  public MetricsSource(ReplicationQueueInfo queueInfo) {
+    this(queueInfo.getOwner() + "." + queueInfo.getQueueId());
+  }
+
   /**
    * Constructor for injecting custom (or test) MetricsReplicationSourceSources
    * @param id Name of the source this class is monitoring
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/ReplicationServerSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/ReplicationServerSourceManager.java
new file mode 100644
index 0000000..683fbcf
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/ReplicationServerSourceManager.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible to manage all the replication sources on 
ReplicationServer.
+ */
[email protected]
+public class ReplicationServerSourceManager implements 
ReplicationSourceController {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationServerSourceManager.class);
+
+  private final Server server;
+
+  // Total buffer size on this ReplicationServer for holding batched edits to 
be shipped.
+  private final long totalBufferLimit;
+  private AtomicLong totalBufferUsed = new AtomicLong();
+
+  private final MetricsReplicationGlobalSourceSource globalMetrics;
+
+  private final ConcurrentMap<ReplicationQueueInfo, 
ReplicationSourceInterface> sources =
+    new ConcurrentHashMap<>();
+
+  private final ZKReplicationQueueStorage zkQueueStorage;
+  private final ReplicationPeers replicationPeers;
+
+  private final Configuration conf;
+
+  private final FileSystem fs;
+  private final Path walRootDir;
+  // Path to the wal archive
+  private final Path oldWalDir;
+
+  private final UUID clusterId;
+
+  public ReplicationServerSourceManager(Server server, FileSystem fs, Path 
walRootDir,
+    Path oldWalDir, UUID clusterId) {
+    this.server = server;
+    this.conf = server.getConfiguration();
+    this.fs = fs;
+    this.walRootDir = walRootDir;
+    this.oldWalDir = oldWalDir;
+    this.clusterId = clusterId;
+    this.zkQueueStorage = (ZKReplicationQueueStorage) ReplicationStorageFactory
+      .getReplicationQueueStorage(server.getZooKeeper(), conf);
+    this.replicationPeers =
+      ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
+    this.totalBufferLimit = 
this.conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.globalMetrics = CompatibilitySingletonFactory.getInstance(
+      MetricsReplicationSourceFactory.class).getGlobalSource();
+  }
+
+
+  public void init() throws IOException, ReplicationException {
+    this.replicationPeers.init();
+  }
+
+  @Override
+  public long getTotalBufferLimit() {
+    return totalBufferLimit;
+  }
+
+  @Override
+  public AtomicLong getTotalBufferUsed() {
+    return totalBufferUsed;
+  }
+
+  @Override
+  public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
+
+  @Override
+  public void finishRecoveredSource(RecoveredReplicationSource src) {
+    this.sources.remove(src.getReplicationQueueInfo());
+    deleteQueue(src.getReplicationQueueInfo());
+    LOG.info("Finished recovering queue {} with the following stats: {}", 
src.getQueueId(),
+      src.getStats());
+  }
+
+  /**
+   * Get a list of all sources, including normal sources and recovered sources.
+   * @return list of all sources
+   */
+  public List<ReplicationSourceInterface> getSources() {
+    return new ArrayList<>(this.sources.values());
+  }
+
+  public ReplicationSourceInterface getSource(ReplicationQueueInfo queueInfo) {
+    return this.sources.get(queueInfo);
+  }
+
+  public ReplicationPeers getReplicationPeers() {
+    return this.replicationPeers;
+  }
+
+  public void startReplicationSource(ServerName owner, String queueId) throws 
IOException,
+    ReplicationException {
+    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(owner, queueId);
+    String peerId = replicationQueueInfo.getPeerId();
+    this.replicationPeers.addPeer(peerId);
+
+    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, 
queueId);
+
+    ReplicationSourceInterface existSource = 
sources.putIfAbsent(replicationQueueInfo, src);
+    if (existSource != null) {
+      LOG.warn("Duplicate source exists for replication queue: owner={}, 
queueId={}", owner,
+        queueId);
+      return;
+    }
+
+    Path walDir = replicationQueueInfo.isQueueRecovered() ? oldWalDir :
+      new Path(walRootDir, 
AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
+
+    // init replication source
+    src.init(conf, fs, walDir, this, zkQueueStorage, 
replicationPeers.getPeer(peerId),
+      server, replicationQueueInfo, clusterId,
+      createWALFileLengthProvider(replicationQueueInfo), new 
MetricsSource(replicationQueueInfo));
+
+    List<String> waLsInQueue = zkQueueStorage.getWALsInQueue(owner, queueId);
+    waLsInQueue.forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
+
+    src.startup();
+    LOG.info("Start replication source for queue {} from region server {}", 
queueId, owner);
+  }
+
+  public void stopReplicationSource(ServerName owner, String queueId) throws 
IOException,
+    ReplicationException {
+    String terminateMessage = "Replication source was stopped by Master";
+    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(owner, queueId);
+    ReplicationSourceInterface src = sources.remove(replicationQueueInfo);
+    if (src != null) {
+      // Just terminate the source, do not clean queue.
+      src.terminate(terminateMessage);
+    }
+  }
+
+  public void removePeer(String peerId) {
+    String terminateMessage = "Replication stream was removed by a user";
+
+    List<ReplicationQueueInfo> queuesToDelete = sources.keySet().stream()
+      .filter(queueInfo -> 
queueInfo.getPeerId().equals(peerId)).collect(Collectors.toList());
+
+    for (ReplicationQueueInfo queueInfo : queuesToDelete) {
+      ReplicationSourceInterface source = sources.remove(queueInfo);
+      source.terminate(terminateMessage);
+      deleteQueue(queueInfo);
+    }
+    // Remove HFile Refs
+    abortWhenFail(() -> this.zkQueueStorage.removePeerFromHFileRefs(peerId));
+  }
+
+  /**
+   * Delete a complete queue of wals associated with a replication source
+   * @param queueInfo the replication queue to delete
+   */
+  private void deleteQueue(ReplicationQueueInfo queueInfo) {
+    abortWhenFail(() ->
+      this.zkQueueStorage.removeQueue(queueInfo.getOwner(), 
queueInfo.getQueueId()));
+  }
+
+  private void abortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
+
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  private WALFileLengthProvider 
createWALFileLengthProvider(ReplicationQueueInfo queueInfo) {
+    if (queueInfo.isQueueRecovered()) {
+      return p -> OptionalLong.empty();
+    }
+    return new RemoteWALFileLengthProvider(server.getAsyncClusterConnection(),
+      queueInfo.getOwner());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestReplicationServerSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestReplicationServerSourceManager.java
new file mode 100644
index 0000000..7baf97c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestReplicationServerSourceManager.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.HReplicationServer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestReplicationServerSourceManager extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationServerSourceManager.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestReplicationServerSourceManager.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private HReplicationServer replicationServer;
+
+  private ReplicationServerSourceManager manager;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    CONF1.setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true);
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    replicationServer = new HReplicationServer(UTIL1.getConfiguration());
+    replicationServer.start();
+    replicationServer.waitForServerOnline();
+    manager = replicationServer.getReplicationServerSourceManager();
+    super.setUpBase();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    replicationServer.stop("test");
+    super.tearDownBase();
+  }
+
+  @Test
+  public void testAddSource() throws Exception {
+    ServerName rs =
+      
UTIL1.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer().getServerName();
+    manager.startReplicationSource(rs, PEER_ID2);
+    loadData("aaa", row);
+    UTIL2.waitFor(30000, () -> htable2.getScanner(new Scan()).next() != null);
+  }
+
+  @Test
+  public void testRemovePeerMetricsCleanup() throws Exception {
+    HRegionServer rs =
+      
UTIL1.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
+    ReplicationSourceManager rsSourceManager =
+      rs.getReplicationSourceService().getReplicationManager();
+    int rsSourceSizeInitial = rsSourceManager.getSources().size();
+    String peerId = name.getMethodName();
+    ReplicationQueueInfo queueInfo = new 
ReplicationQueueInfo(rs.getServerName(), peerId);
+
+    addPeer(peerId, tableName);
+    UTIL1.waitFor(30000,
+      () -> rsSourceSizeInitial + 1 == rsSourceManager.getSources().size());
+
+    MetricsReplicationGlobalSourceSource globalMetrics = 
manager.getGlobalMetrics();
+    int globalLogQueueSizeInitial = globalMetrics.getSizeOfLogQueue();
+    LOG.debug("globalLogQueueSize: {} before starting source on replication 
server",
+      globalLogQueueSizeInitial);
+    // Start source on replication server
+    manager.startReplicationSource(rs.getServerName(), peerId);
+    ReplicationSourceInterface src = manager.getSource(queueInfo);
+    assertNotNull(src);
+    assertEquals(globalLogQueueSizeInitial + 
src.getSourceMetrics().getSizeOfLogQueue(),
+      globalMetrics.getSizeOfLogQueue());
+
+    // Stopping the peer should reset the global metrics
+    manager.stopReplicationSource(rs.getServerName(), peerId);
+    UTIL1.waitFor(30000, () -> manager.getSources().size() == 0);
+    assertEquals(globalLogQueueSizeInitial, globalMetrics.getSizeOfLogQueue());
+
+    // Adding the same source back again should reset the single source metrics
+    manager.startReplicationSource(rs.getServerName(), peerId);
+    src = manager.getSource(queueInfo);
+    assertNotNull(src);
+    assertEquals(globalLogQueueSizeInitial + 
src.getSourceMetrics().getSizeOfLogQueue(),
+      globalMetrics.getSizeOfLogQueue());
+
+    removePeer(peerId);
+  }
+}
\ No newline at end of file

Reply via email to