This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 3204c467dda HBASE-29380 Two concurrent remove peer requests may hang 
(#7077) (#7093)
3204c467dda is described below

commit 3204c467ddaa5e2c89ec637b005a31280671a75e
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jun 12 19:50:10 2025 +0800

    HBASE-29380 Two concurrent remove peer requests may hang (#7077) (#7093)
    
    (cherry picked from commit 7cc2f54a5d9c684f9c0019553373118513742ff1)
    
    Signed-off-by: Nihal Jain <[email protected]>
    Signed-off-by: Nick Dimiduk <[email protected]>
    (cherry picked from commit 368386d697981eca5c165b18daaf560b2e19c6f5)
---
 .../master/procedure/MasterProcedureScheduler.java | 124 +++++++-------
 .../master/procedure/TestProcedureWaitAndWake.java | 184 +++++++++++++++++++++
 2 files changed, 240 insertions(+), 68 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 6f1338db944..2e7c0edc357 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hbase.master.procedure;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -418,7 +420,9 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
   // 
============================================================================
   private TableQueue getTableQueue(TableName tableName) {
     TableQueue node = AvlTree.get(tableMap, tableName, 
TABLE_QUEUE_KEY_COMPARATOR);
-    if (node != null) return node;
+    if (node != null) {
+      return node;
+    }
 
     node = new TableQueue(tableName, 
MasterProcedureUtil.getTablePriority(tableName),
       locking.getTableLock(tableName), 
locking.getNamespaceLock(tableName.getNamespaceAsString()));
@@ -431,6 +435,21 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     locking.removeTableLock(tableName);
   }
 
+  /**
+   * Tries to remove the queue and the table-lock of the specified table. If 
there are new
+   * operations pending (e.g. a new create), the remove will not be performed.
+   * @param table     the name of the table that should be marked as deleted
+   * @param procedure the procedure that is removing the table
+   * @return true if deletion succeeded, false otherwise meaning that there 
are other new operations
+   *         pending for that table (e.g. a new create).
+   */
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/(MasterProcedureScheduler.java|src/test/.*)")
+  boolean markTableAsDeleted(final TableName table, final Procedure<?> 
procedure) {
+    return tryCleanupQueue(table, procedure, () -> tableMap, 
TABLE_QUEUE_KEY_COMPARATOR,
+      locking::getTableLock, tableRunQueue, this::removeTableQueue);
+  }
+
   private static boolean isTableProcedure(Procedure<?> proc) {
     return proc instanceof TableProcedureInterface;
   }
@@ -467,23 +486,10 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
   }
 
   private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) 
{
-    schedLock();
-    try {
-      int index = getBucketIndex(serverBuckets, serverName.hashCode());
-      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, 
SERVER_QUEUE_KEY_COMPARATOR);
-      if (node == null) {
-        return;
-      }
-
-      LockAndQueue lock = locking.getServerLock(serverName);
-      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
-        removeFromRunQueue(serverRunQueue, node,
-          () -> "clean up server queue after " + proc + " completed");
-        removeServerQueue(serverName);
-      }
-    } finally {
-      schedUnlock();
-    }
+    // serverBuckets
+    tryCleanupQueue(serverName, proc,
+      () -> serverBuckets[getBucketIndex(serverBuckets, 
serverName.hashCode())],
+      SERVER_QUEUE_KEY_COMPARATOR, locking::getServerLock, serverRunQueue, 
this::removeServerQueue);
   }
 
   private static int getBucketIndex(Object[] buckets, int hashCode) {
@@ -516,23 +522,9 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     locking.removePeerLock(peerId);
   }
 
-  private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
-    schedLock();
-    try {
-      PeerQueue queue = AvlTree.get(peerMap, peerId, 
PEER_QUEUE_KEY_COMPARATOR);
-      if (queue == null) {
-        return;
-      }
-
-      final LockAndQueue lock = locking.getPeerLock(peerId);
-      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
-        removeFromRunQueue(peerRunQueue, queue,
-          () -> "clean up peer queue after " + procedure + " completed");
-        removePeerQueue(peerId);
-      }
-    } finally {
-      schedUnlock();
-    }
+  private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
+    tryCleanupQueue(peerId, procedure, () -> peerMap, 
PEER_QUEUE_KEY_COMPARATOR,
+      locking::getPeerLock, peerRunQueue, this::removePeerQueue);
   }
 
   private static boolean isPeerProcedure(Procedure<?> proc) {
@@ -560,6 +552,35 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     return proc instanceof MetaProcedureInterface;
   }
 
+  private <T extends Comparable<T>, TNode extends Queue<T>> boolean 
tryCleanupQueue(T id,
+    Procedure<?> proc, Supplier<TNode> getMap, AvlKeyComparator<TNode> 
comparator,
+    Function<T, LockAndQueue> getLock, FairQueue<T> runQueue, Consumer<T> 
removeQueue) {
+    schedLock();
+    try {
+      Queue<T> queue = AvlTree.get(getMap.get(), id, comparator);
+      if (queue == null) {
+        return true;
+      }
+
+      LockAndQueue lock = getLock.apply(id);
+      if (queue.isEmpty() && lock.isWaitingQueueEmpty() && !lock.isLocked()) {
+        // 1. the queue is empty
+        // 2. no procedure is in the lock's waiting queue
+        // 3. no other one holds the lock. It is possible that someone else 
holds the lock, usually
+        // our parent procedures
+        // If we can meet all the above conditions, it is safe for us to 
remove the queue
+        removeFromRunQueue(runQueue, queue, () -> "clean up queue after " + 
proc + " completed");
+        removeQueue.accept(id);
+        return true;
+      } else {
+        return false;
+      }
+
+    } finally {
+      schedUnlock();
+    }
+  }
+
   // 
============================================================================
   // Table Locking Helpers
   // 
============================================================================
@@ -699,39 +720,6 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     }
   }
 
-  /**
-   * Tries to remove the queue and the table-lock of the specified table. If 
there are new
-   * operations pending (e.g. a new create), the remove will not be performed.
-   * @param table     the name of the table that should be marked as deleted
-   * @param procedure the procedure that is removing the table
-   * @return true if deletion succeeded, false otherwise meaning that there 
are other new operations
-   *         pending for that table (e.g. a new create).
-   */
-  boolean markTableAsDeleted(final TableName table, final Procedure<?> 
procedure) {
-    schedLock();
-    try {
-      final TableQueue queue = getTableQueue(table);
-      final LockAndQueue tableLock = locking.getTableLock(table);
-      if (queue == null) {
-        return true;
-      }
-
-      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
-        // remove the table from the run-queue and the map
-        if (AvlIterableList.isLinked(queue)) {
-          tableRunQueue.remove(queue);
-        }
-        removeTableQueue(table);
-      } else {
-        // TODO: If there are no create, we can drop all the other ops
-        return false;
-      }
-    } finally {
-      schedUnlock();
-    }
-    return true;
-  }
-
   // 
============================================================================
   // Region Locking Helpers
   // 
============================================================================
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureWaitAndWake.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureWaitAndWake.java
new file mode 100644
index 00000000000..cb647b98652
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureWaitAndWake.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-29380
+ */
+@Category({ MasterTests.class, LargeTests.class })
+public class TestProcedureWaitAndWake {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestProcedureWaitAndWake.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  public static final class MyPeerProcedure extends 
AbstractPeerProcedure<Integer> {
+
+    private final CyclicBarrier barrier;
+
+    private boolean passedBarrier;
+
+    public MyPeerProcedure() {
+      this(null);
+    }
+
+    public MyPeerProcedure(CyclicBarrier barrier) {
+      super("1");
+      this.barrier = barrier;
+    }
+
+    @Override
+    public PeerOperationType getPeerOperationType() {
+      return PeerOperationType.REMOVE;
+    }
+
+    @Override
+    protected LockState acquireLock(MasterProcedureEnv env) {
+      // make sure we have two procedure arrive here at the same time, so one 
of them will enter the
+      // lock wait state
+      if (!passedBarrier) {
+        try {
+          barrier.await();
+        } catch (InterruptedException | BrokenBarrierException e) {
+          throw new RuntimeException(e);
+        }
+        passedBarrier = true;
+      }
+      return super.acquireLock(env);
+    }
+
+    @Override
+    protected Flow executeFromState(MasterProcedureEnv env, Integer state)
+      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+      if (state.intValue() == 0) {
+        setNextState(1);
+        addChildProcedure(new MySubPeerProcedure());
+        return Flow.HAS_MORE_STATE;
+      } else {
+        Thread.sleep(200);
+        return Flow.NO_MORE_STATE;
+      }
+    }
+
+    @Override
+    protected Integer getState(int stateId) {
+      return Integer.valueOf(stateId);
+    }
+
+    @Override
+    protected int getStateId(Integer state) {
+      return state.intValue();
+    }
+
+    @Override
+    protected Integer getInitialState() {
+      return 0;
+    }
+
+    @Override
+    protected void rollbackState(MasterProcedureEnv env, Integer state)
+      throws IOException, InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    public static final class MySubPeerProcedure
+      extends StateMachineProcedure<MasterProcedureEnv, Integer> implements 
PeerProcedureInterface {
+
+      @Override
+      public PeerOperationType getPeerOperationType() {
+        return PeerOperationType.REFRESH;
+      }
+
+      @Override
+      protected Flow executeFromState(MasterProcedureEnv env, Integer state)
+        throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+        return Flow.NO_MORE_STATE;
+      }
+
+      @Override
+      protected Integer getState(int stateId) {
+        return Integer.valueOf(stateId);
+      }
+
+      @Override
+      protected int getStateId(Integer state) {
+        return state.intValue();
+      }
+
+      @Override
+      protected Integer getInitialState() {
+        return 0;
+      }
+
+      @Override
+      public String getPeerId() {
+        return "1";
+      }
+
+      @Override
+      protected void rollbackState(MasterProcedureEnv env, Integer state)
+        throws IOException, InterruptedException {
+        throw new UnsupportedOperationException();
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
 8);
+    UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testPeerProcedure() {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    CyclicBarrier barrier = new CyclicBarrier(2);
+    MyPeerProcedure p1 = new MyPeerProcedure(barrier);
+    MyPeerProcedure p2 = new MyPeerProcedure(barrier);
+    long id1 = procExec.submitProcedure(p1);
+    long id2 = procExec.submitProcedure(p2);
+    UTIL.waitFor(10000, () -> procExec.isFinished(id1));
+    UTIL.waitFor(10000, () -> procExec.isFinished(id2));
+  }
+}

Reply via email to