This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new a0dd384  HBASE-25741: Deadlock during peer cleanup with 
NoNodeException (#3204)
a0dd384 is described below

commit a0dd384691b571bf56e82f295e330810545ae090
Author: Sandeep Pal <[email protected]>
AuthorDate: Wed May 5 17:21:25 2021 -0700

    HBASE-25741: Deadlock during peer cleanup with NoNodeException (#3204)
    
    Introduced due to commit from  HBASE-25583. Fix is to issue the cleanup 
asynchronously.
    
    Signed-off-by: Bharath Vissapragada <[email protected]>
---
 .../regionserver/ReplicationSourceManager.java     | 19 +++++++++++++++++-
 .../hbase/replication/TestReplicationSource.java   | 23 +++++++++++++++++-----
 ...tReplicationSourceWithoutReplicationZnodes.java | 19 +++++++++++++-----
 3 files changed, 50 insertions(+), 11 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b0e32f8..2d6c29a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -241,7 +241,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       if (peerId.contains("-")) {
         peerId = peerId.split("-")[0];
       }
-      peerRemoved(peerId);
+      schedulePeerRemoval(peerId);
     }
     walSet.clear();
   }
@@ -653,6 +653,23 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     transferQueues(regionserver);
   }
 
+  /**
+   * We want to run the peer removal in a separate thread when the peer removal
+   * is called from ReplicationSource shipper thread on encountering 
NoNodeException
+   * because peerRemoved terminate the source which might leave replication 
source
+   * in orphaned state.
+   * See HBASE-25741.
+   * @param peerId peer ID to be removed.
+   */
+  private void schedulePeerRemoval(final String peerId) {
+    LOG.info(String.format("Scheduling an async peer removal for peer %s", 
peerId));
+    this.executor.submit(new Runnable() {
+      @Override public void run() {
+        peerRemoved(peerId);
+      }
+    });
+  }
+
   @Override
   public void peerRemoved(String peerId) {
     removePeer(peerId);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index ca09aa8..65b501d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -181,7 +181,7 @@ public class TestReplicationSource {
         TEST_UTIL.getConfiguration());
     for(int i = 0; i < 3; i++) {
       byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b,b,b);
+      KeyValue kv = new KeyValue(b, b, b);
       WALEdit edit = new WALEdit();
       edit.add(kv);
       WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
@@ -256,11 +256,10 @@ public class TestReplicationSource {
   private void appendEntries(WALProvider.Writer writer, int numEntries) throws 
IOException {
     for (int i = 0; i < numEntries; i++) {
       byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b,b,b);
+      KeyValue kv = new KeyValue(b, b, b);
       WALEdit edit = new WALEdit();
       edit.add(kv);
-      WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
-              HConstants.DEFAULT_CLUSTER_ID);
+      WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, 
HConstants.DEFAULT_CLUSTER_ID);
       NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
       scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
       key.setScopes(scopes);
@@ -565,7 +564,13 @@ public class TestReplicationSource {
     };
 
     final ReplicationSource source = 
mocks.createReplicationSourceAndManagerWithMocks(endpoint);
-    source.run();
+    source.startup();
+    // source thread should be active
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return source.isAlive();
+      }
+    });
     source.enqueueLog(log1);
 
     // Wait for source to replicate
@@ -588,6 +593,14 @@ public class TestReplicationSource {
         return !source.isSourceActive();
       }
     });
+
+    // And the source thread be terminated
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return !source.isAlive();
+      }
+    });
+    assertTrue("Source should be removed", 
mocks.manager.getSources().isEmpty());
   }
 
   @Test
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
index c823548..832c28a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
@@ -22,6 +22,7 @@ import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -78,19 +79,27 @@ public class TestReplicationSourceWithoutReplicationZnodes
       wal.sync(txid);
 
       wal.rollWriter();
-      ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
-      ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + 
server.getServerName() + "/1");
-
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() {
+          return !manager.getSources().isEmpty();
+        }
+      });
       Assert.assertEquals("There should be exactly one source",
         1, manager.getSources().size());
       Assert.assertEquals("Replication source is not correct",
         ReplicationSourceDummyWithNoTermination.class,
         manager.getSources().get(0).getClass());
+      // delete the znodes for peer
+      ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+      ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + 
server.getServerName() + "/1");
       manager
         
.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 
0, false,
           false);
-      Assert.assertTrue("Replication source should be terminated and removed",
-        manager.getSources().isEmpty());
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() {
+          return manager.getSources().isEmpty();
+        }
+      });
     } finally {
       conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
     }

Reply via email to