shahrs87 commented on a change in pull request #2970:
URL: https://github.com/apache/hbase/pull/2970#discussion_r580314979



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -189,13 +189,25 @@ public ReplicationSourceManager(final ReplicationQueues 
replicationQueues,
    * @param holdLogInZK if true then the log is retained in ZK
    */
   public synchronized void logPositionAndCleanOldLogs(Path log, String id, 
long position,
-      boolean queueRecovered, boolean holdLogInZK) throws 
ReplicationSourceWithoutPeerException {
+      boolean queueRecovered, boolean holdLogInZK) {
     String fileName = log.getName();
     this.replicationQueues.setLogPosition(id, fileName, position);
     if (holdLogInZK) {
       return;
     }
-    cleanOldLogs(fileName, id, queueRecovered);
+
+    try {
+      cleanOldLogs(fileName, id, queueRecovered);
+    } catch (ReplicationSourceWithoutPeerException rspe) {
+      // This means the source is running and replication peer have been 
removed
+      // We should call the removePeer workflow to terminate the source 
gracefully
+      LOG.warn("Replication peer" + id + "has been removed and source is still 
running", rspe);

Review comment:
       super nit: add a space between peer and id in the log message.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -189,13 +189,25 @@ public ReplicationSourceManager(final ReplicationQueues 
replicationQueues,
    * @param holdLogInZK if true then the log is retained in ZK
    */
   public synchronized void logPositionAndCleanOldLogs(Path log, String id, 
long position,
-      boolean queueRecovered, boolean holdLogInZK) throws 
ReplicationSourceWithoutPeerException {
+      boolean queueRecovered, boolean holdLogInZK) {
     String fileName = log.getName();
     this.replicationQueues.setLogPosition(id, fileName, position);
     if (holdLogInZK) {
       return;
     }
-    cleanOldLogs(fileName, id, queueRecovered);
+
+    try {
+      cleanOldLogs(fileName, id, queueRecovered);
+    } catch (ReplicationSourceWithoutPeerException rspe) {
+      // This means the source is running and replication peer have been 
removed
+      // We should call the removePeer workflow to terminate the source 
gracefully
+      LOG.warn("Replication peer" + id + "has been removed and source is still 
running", rspe);
+      String peerId = id;
+      if (peerId.contains("-")) {
+        peerId = peerId.split("-")[0];
+      }
+      removePeer(peerId);

Review comment:
       Instead of calling `ReplicationSourceManager#removePeer,` could we call 
`ReplicationSourceManager#peerRemoved`. That is the exact workflow that 
`ReplicationTrackerZKImpl#nodeDeleted` watcher would have called. 

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -189,13 +189,25 @@ public ReplicationSourceManager(final ReplicationQueues 
replicationQueues,
    * @param holdLogInZK if true then the log is retained in ZK
    */
   public synchronized void logPositionAndCleanOldLogs(Path log, String id, 
long position,
-      boolean queueRecovered, boolean holdLogInZK) throws 
ReplicationSourceWithoutPeerException {
+      boolean queueRecovered, boolean holdLogInZK) {
     String fileName = log.getName();
     this.replicationQueues.setLogPosition(id, fileName, position);
     if (holdLogInZK) {
       return;
     }
-    cleanOldLogs(fileName, id, queueRecovered);
+
+    try {
+      cleanOldLogs(fileName, id, queueRecovered);
+    } catch (ReplicationSourceWithoutPeerException rspe) {

Review comment:
       Should we move handling of `ReplicationSourceWithoutPeerException` 
exception to `ReplicationSourceManager #cleanOldLogs(SortedSet<String> wals, 
String key, String id)` ?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to