HBASE-20739 Add priority for SCP

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4cb70ea9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4cb70ea9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4cb70ea9

Branch: refs/heads/HBASE-19064
Commit: 4cb70ea9f54c01b38acd5e76f482f8f4350d80bc
Parents: d23a517
Author: zhangduo <zhang...@apache.org>
Authored: Tue Jun 19 16:29:01 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Wed Jun 20 15:17:07 2018 +0800

----------------------------------------------------------------------
 .../hbase/master/procedure/FairQueue.java       |  6 +-
 .../procedure/MasterProcedureScheduler.java     | 62 ++++++++++++++++----
 .../master/procedure/MasterProcedureUtil.java   |  9 ++-
 .../hbase/master/procedure/SchemaLocking.java   |  4 ++
 .../hbase/master/procedure/ServerQueue.java     |  4 +-
 5 files changed, 66 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb70ea9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
index ac8e577..59a5ef8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
@@ -38,8 +38,10 @@ public class FairQueue<T extends Comparable<T>> {
       return;
     }
     // Find the one which priority is less than us
-    // For now only TableQueue has priority, and there are only a small number 
of tables which
-    // have higher priority so this will not be an expensive operation.
+    // For now only TableQueue and ServerQueue has priority. For TableQueue 
there are only a small
+    // number of tables which have higher priority, and for ServerQueue there 
is only one server
+    // which could carry meta which leads to a higher priority, so this will 
not be an expensive
+    // operation.
     Queue<T> base = queueHead;
     do {
       if (base.getPriority() < queue.getPriority()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb70ea9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 8389961..1420986 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -127,7 +127,8 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     } else if (isTableProcedure(proc)) {
       doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
     } else if (isServerProcedure(proc)) {
-      doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, 
addFront);
+      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
+      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, 
addFront);
     } else if (isPeerProcedure(proc)) {
       doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
     } else {
@@ -316,10 +317,11 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
         return;
       }
     } else if (proc instanceof PeerProcedureInterface) {
-      PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
-      tryCleanupPeerQueue(iProcPeer.getPeerId(), proc);
+      tryCleanupPeerQueue(getPeerId(proc), proc);
+    } else if (proc instanceof ServerProcedureInterface) {
+      tryCleanupServerQueue(getServerName(proc), proc);
     } else {
-      // No cleanup for ServerProcedureInterface types, yet.
+      // No cleanup for other procedure types, yet.
       return;
     }
   }
@@ -366,16 +368,52 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
   // 
============================================================================
   //  Server Queue Lookup Helpers
   // 
============================================================================
-  private ServerQueue getServerQueue(ServerName serverName) {
+  private ServerQueue getServerQueue(ServerName serverName, 
ServerProcedureInterface proc) {
     final int index = getBucketIndex(serverBuckets, serverName.hashCode());
     ServerQueue node = AvlTree.get(serverBuckets[index], serverName, 
SERVER_QUEUE_KEY_COMPARATOR);
-    if (node != null) return node;
-
-    node = new ServerQueue(serverName, locking.getServerLock(serverName));
+    if (node != null) {
+      return node;
+    }
+    int priority;
+    if (proc != null) {
+      priority = MasterProcedureUtil.getServerPriority(proc);
+    } else {
+      LOG.warn("Usually this should not happen as proc can only be null when 
calling from " +
+        "wait/wake lock, which means at least we should have one procedure in 
the queue which " +
+        "wants to acquire the lock or just released the lock.");
+      priority = 1;
+    }
+    node = new ServerQueue(serverName, priority, 
locking.getServerLock(serverName));
     serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
     return node;
   }
 
+  private void removeServerQueue(ServerName serverName) {
+    int index = getBucketIndex(serverBuckets, serverName.hashCode());
+    serverBuckets[index] =
+      AvlTree.remove(serverBuckets[index], serverName, 
SERVER_QUEUE_KEY_COMPARATOR);
+    locking.removeServerLock(serverName);
+  }
+
+  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) 
{
+    schedLock();
+    try {
+      int index = getBucketIndex(serverBuckets, serverName.hashCode());
+      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, 
SERVER_QUEUE_KEY_COMPARATOR);
+      if (node == null) {
+        return;
+      }
+
+      LockAndQueue lock = locking.getServerLock(serverName);
+      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
+        removeFromRunQueue(serverRunQueue, node);
+        removeServerQueue(serverName);
+      }
+    } finally {
+      schedUnlock();
+    }
+  }
+
   private static int getBucketIndex(Object[] buckets, int hashCode) {
     return Math.abs(hashCode) % buckets.length;
   }
@@ -809,7 +847,9 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     try {
       final LockAndQueue lock = locking.getServerLock(serverName);
       if (lock.tryExclusiveLock(procedure)) {
-        removeFromRunQueue(serverRunQueue, getServerQueue(serverName));
+        // We do not need to create a new queue so just pass null, as in tests 
we may pass
+        // procedures other than ServerProcedureInterface
+        removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
         return false;
       }
       waitProcedure(lock, procedure);
@@ -831,7 +871,9 @@ public class MasterProcedureScheduler extends 
AbstractProcedureScheduler {
     try {
       final LockAndQueue lock = locking.getServerLock(serverName);
       lock.releaseExclusiveLock(procedure);
-      addToRunQueue(serverRunQueue, getServerQueue(serverName));
+      // We do not need to create a new queue so just pass null, as in tests 
we may pass procedures
+      // other than ServerProcedureInterface
+      addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
       int waitingCount = wakeWaitingProcedures(lock);
       wakePollIfNeeded(waitingCount);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb70ea9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 51e2452..587cc82 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -170,11 +170,10 @@ public final class MasterProcedureUtil {
   }
 
   /**
-   * Return the total levels of table priority. Now we have 3 levels, for meta 
table, other system
-   * tables and user tables. Notice that the actual value of priority should 
be decreased from this
-   * value down to 1.
+   * Return the priority for the given procedure. For now we only have two 
priorities, 100 for
+   * server carrying meta, and 1 for others.
    */
-  public static int getTablePriorityLevels() {
-    return 3;
+  public static int getServerPriority(ServerProcedureInterface proc) {
+    return proc.hasMetaTableRegion() ? 100 : 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb70ea9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index 9a5eb7e..69266ee 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -95,6 +95,10 @@ class SchemaLocking {
     return getLock(serverLocks, serverName);
   }
 
+  LockAndQueue removeServerLock(ServerName serverName) {
+    return serverLocks.remove(serverName);
+  }
+
   LockAndQueue getPeerLock(String peerId) {
     return getLock(peerLocks, peerId);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4cb70ea9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 5526f3b..3a1b3c4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -25,8 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 class ServerQueue extends Queue<ServerName> {
 
-  public ServerQueue(ServerName serverName, LockStatus serverLock) {
-    super(serverName, serverLock);
+  public ServerQueue(ServerName serverName, int priority, LockStatus 
serverLock) {
+    super(serverName, priority, serverLock);
   }
 
   @Override

Reply via email to