This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 3204c467dda HBASE-29380 Two concurrent remove peer requests may hang
(#7077) (#7093)
3204c467dda is described below
commit 3204c467ddaa5e2c89ec637b005a31280671a75e
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jun 12 19:50:10 2025 +0800
HBASE-29380 Two concurrent remove peer requests may hang (#7077) (#7093)
(cherry picked from commit 7cc2f54a5d9c684f9c0019553373118513742ff1)
Signed-off-by: Nihal Jain <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
(cherry picked from commit 368386d697981eca5c165b18daaf560b2e19c6f5)
---
.../master/procedure/MasterProcedureScheduler.java | 124 +++++++-------
.../master/procedure/TestProcedureWaitAndWake.java | 184 +++++++++++++++++++++
2 files changed, 240 insertions(+), 68 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 6f1338db944..2e7c0edc357 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.master.procedure;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -418,7 +420,9 @@ public class MasterProcedureScheduler extends
AbstractProcedureScheduler {
//
============================================================================
private TableQueue getTableQueue(TableName tableName) {
TableQueue node = AvlTree.get(tableMap, tableName,
TABLE_QUEUE_KEY_COMPARATOR);
- if (node != null) return node;
+ if (node != null) {
+ return node;
+ }
node = new TableQueue(tableName,
MasterProcedureUtil.getTablePriority(tableName),
locking.getTableLock(tableName),
locking.getNamespaceLock(tableName.getNamespaceAsString()));
@@ -431,6 +435,21 @@ public class MasterProcedureScheduler extends
AbstractProcedureScheduler {
locking.removeTableLock(tableName);
}
+ /**
+ * Tries to remove the queue and the table-lock of the specified table. If
there are new
+ * operations pending (e.g. a new create), the remove will not be performed.
+ * @param table the name of the table that should be marked as deleted
+ * @param procedure the procedure that is removing the table
+ * @return true if deletion succeeded, false otherwise meaning that there
are other new operations
+ * pending for that table (e.g. a new create).
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/(MasterProcedureScheduler.java|src/test/.*)")
+ boolean markTableAsDeleted(final TableName table, final Procedure<?>
procedure) {
+ return tryCleanupQueue(table, procedure, () -> tableMap,
TABLE_QUEUE_KEY_COMPARATOR,
+ locking::getTableLock, tableRunQueue, this::removeTableQueue);
+ }
+
private static boolean isTableProcedure(Procedure<?> proc) {
return proc instanceof TableProcedureInterface;
}
@@ -467,23 +486,10 @@ public class MasterProcedureScheduler extends
AbstractProcedureScheduler {
}
private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc)
{
- schedLock();
- try {
- int index = getBucketIndex(serverBuckets, serverName.hashCode());
- ServerQueue node = AvlTree.get(serverBuckets[index], serverName,
SERVER_QUEUE_KEY_COMPARATOR);
- if (node == null) {
- return;
- }
-
- LockAndQueue lock = locking.getServerLock(serverName);
- if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
- removeFromRunQueue(serverRunQueue, node,
- () -> "clean up server queue after " + proc + " completed");
- removeServerQueue(serverName);
- }
- } finally {
- schedUnlock();
- }
+ // serverBuckets
+ tryCleanupQueue(serverName, proc,
+ () -> serverBuckets[getBucketIndex(serverBuckets,
serverName.hashCode())],
+ SERVER_QUEUE_KEY_COMPARATOR, locking::getServerLock, serverRunQueue,
this::removeServerQueue);
}
private static int getBucketIndex(Object[] buckets, int hashCode) {
@@ -516,23 +522,9 @@ public class MasterProcedureScheduler extends
AbstractProcedureScheduler {
locking.removePeerLock(peerId);
}
- private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
- schedLock();
- try {
- PeerQueue queue = AvlTree.get(peerMap, peerId,
PEER_QUEUE_KEY_COMPARATOR);
- if (queue == null) {
- return;
- }
-
- final LockAndQueue lock = locking.getPeerLock(peerId);
- if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
- removeFromRunQueue(peerRunQueue, queue,
- () -> "clean up peer queue after " + procedure + " completed");
- removePeerQueue(peerId);
- }
- } finally {
- schedUnlock();
- }
+ private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
+ tryCleanupQueue(peerId, procedure, () -> peerMap,
PEER_QUEUE_KEY_COMPARATOR,
+ locking::getPeerLock, peerRunQueue, this::removePeerQueue);
}
private static boolean isPeerProcedure(Procedure<?> proc) {
@@ -560,6 +552,35 @@ public class MasterProcedureScheduler extends
AbstractProcedureScheduler {
return proc instanceof MetaProcedureInterface;
}
+ private <T extends Comparable<T>, TNode extends Queue<T>> boolean
tryCleanupQueue(T id,
+ Procedure<?> proc, Supplier<TNode> getMap, AvlKeyComparator<TNode>
comparator,
+ Function<T, LockAndQueue> getLock, FairQueue<T> runQueue, Consumer<T>
removeQueue) {
+ schedLock();
+ try {
+ Queue<T> queue = AvlTree.get(getMap.get(), id, comparator);
+ if (queue == null) {
+ return true;
+ }
+
+ LockAndQueue lock = getLock.apply(id);
+ if (queue.isEmpty() && lock.isWaitingQueueEmpty() && !lock.isLocked()) {
+ // 1. the queue is empty
+ // 2. no procedure is in the lock's waiting queue
+ // 3. no other one holds the lock. It is possible that someone else
holds the lock, usually
+ // our parent procedures
+ // If we can meet all the above conditions, it is safe for us to
remove the queue
+ removeFromRunQueue(runQueue, queue, () -> "clean up queue after " +
proc + " completed");
+ removeQueue.accept(id);
+ return true;
+ } else {
+ return false;
+ }
+
+ } finally {
+ schedUnlock();
+ }
+ }
+
//
============================================================================
// Table Locking Helpers
//
============================================================================
@@ -699,39 +720,6 @@ public class MasterProcedureScheduler extends
AbstractProcedureScheduler {
}
}
- /**
- * Tries to remove the queue and the table-lock of the specified table. If
there are new
- * operations pending (e.g. a new create), the remove will not be performed.
- * @param table the name of the table that should be marked as deleted
- * @param procedure the procedure that is removing the table
- * @return true if deletion succeeded, false otherwise meaning that there
are other new operations
- * pending for that table (e.g. a new create).
- */
- boolean markTableAsDeleted(final TableName table, final Procedure<?>
procedure) {
- schedLock();
- try {
- final TableQueue queue = getTableQueue(table);
- final LockAndQueue tableLock = locking.getTableLock(table);
- if (queue == null) {
- return true;
- }
-
- if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
- // remove the table from the run-queue and the map
- if (AvlIterableList.isLinked(queue)) {
- tableRunQueue.remove(queue);
- }
- removeTableQueue(table);
- } else {
- // TODO: If there are no create, we can drop all the other ops
- return false;
- }
- } finally {
- schedUnlock();
- }
- return true;
- }
-
//
============================================================================
// Region Locking Helpers
//
============================================================================
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureWaitAndWake.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureWaitAndWake.java
new file mode 100644
index 00000000000..cb647b98652
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedureWaitAndWake.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-29380
+ */
+@Category({ MasterTests.class, LargeTests.class })
+public class TestProcedureWaitAndWake {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestProcedureWaitAndWake.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ public static final class MyPeerProcedure extends
AbstractPeerProcedure<Integer> {
+
+ private final CyclicBarrier barrier;
+
+ private boolean passedBarrier;
+
+ public MyPeerProcedure() {
+ this(null);
+ }
+
+ public MyPeerProcedure(CyclicBarrier barrier) {
+ super("1");
+ this.barrier = barrier;
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.REMOVE;
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ // make sure we have two procedure arrive here at the same time, so one
of them will enter the
+ // lock wait state
+ if (!passedBarrier) {
+ try {
+ barrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ throw new RuntimeException(e);
+ }
+ passedBarrier = true;
+ }
+ return super.acquireLock(env);
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, Integer state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ if (state.intValue() == 0) {
+ setNextState(1);
+ addChildProcedure(new MySubPeerProcedure());
+ return Flow.HAS_MORE_STATE;
+ } else {
+ Thread.sleep(200);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ protected Integer getState(int stateId) {
+ return Integer.valueOf(stateId);
+ }
+
+ @Override
+ protected int getStateId(Integer state) {
+ return state.intValue();
+ }
+
+ @Override
+ protected Integer getInitialState() {
+ return 0;
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, Integer state)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ public static final class MySubPeerProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, Integer> implements
PeerProcedureInterface {
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.REFRESH;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, Integer state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ return Flow.NO_MORE_STATE;
+ }
+
+ @Override
+ protected Integer getState(int stateId) {
+ return Integer.valueOf(stateId);
+ }
+
+ @Override
+ protected int getStateId(Integer state) {
+ return state.intValue();
+ }
+
+ @Override
+ protected Integer getInitialState() {
+ return 0;
+ }
+
+ @Override
+ public String getPeerId() {
+ return "1";
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, Integer state)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
8);
+ UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testPeerProcedure() {
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ CyclicBarrier barrier = new CyclicBarrier(2);
+ MyPeerProcedure p1 = new MyPeerProcedure(barrier);
+ MyPeerProcedure p2 = new MyPeerProcedure(barrier);
+ long id1 = procExec.submitProcedure(p1);
+ long id2 = procExec.submitProcedure(p2);
+ UTIL.waitFor(10000, () -> procExec.isFinished(id1));
+ UTIL.waitFor(10000, () -> procExec.isFinished(id2));
+ }
+}