Repository: hbase
Updated Branches:
refs/heads/branch-1 8cf6adae7 -> 09c7b1e96
refs/heads/branch-1.3 3ff9a458d -> ab1e0dd44
refs/heads/master 86f376862 -> a07892558
HBASE-16095 Add priority to TableDescriptor and priority region open thread pool
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ab1e0dd4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ab1e0dd4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ab1e0dd4
Branch: refs/heads/branch-1.3
Commit: ab1e0dd440ee53e03d0ebfa8f0f0b27d585880a5
Parents: 3ff9a45
Author: Enis Soztutar <[email protected]>
Authored: Wed Jul 13 10:31:55 2016 -0700
Committer: Enis Soztutar <[email protected]>
Committed: Wed Jul 13 15:45:43 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 25 +++++-
.../apache/hadoop/hbase/executor/EventType.java | 6 ++
.../hadoop/hbase/executor/ExecutorType.java | 3 +-
.../hadoop/hbase/TestHTableDescriptor.java | 7 ++
.../hadoop/hbase/executor/ExecutorService.java | 27 ++++---
.../hbase/regionserver/HRegionServer.java | 2 +
.../hbase/regionserver/RSRpcServices.java | 10 ++-
.../handler/OpenPriorityRegionHandler.java | 43 ++++++++++
.../hbase/regionserver/TestRegionOpen.java | 83 ++++++++++++++++++++
9 files changed, 192 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index d4c4c2b..a29481d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -212,6 +212,13 @@ public class HTableDescriptor implements
WritableComparable<HTableDescriptor> {
/** Default durability for HTD is USE_DEFAULT, which defaults to
HBase-global default value */
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
+ public static final String PRIORITY = "PRIORITY";
+ private static final ImmutableBytesWritable PRIORITY_KEY =
+ new ImmutableBytesWritable(Bytes.toBytes(PRIORITY));
+
+ /** Relative priority of the table used for rpc scheduling */
+ private static final int DEFAULT_PRIORITY = HConstants.NORMAL_QOS;
+
/*
* The below are ugly but better than creating them each time till we
* replace booleans being saved as Strings with plain booleans. Need a
@@ -265,6 +272,7 @@ public class HTableDescriptor implements
WritableComparable<HTableDescriptor> {
DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum
name
DEFAULT_VALUES.put(REGION_REPLICATION,
String.valueOf(DEFAULT_REGION_REPLICATION));
DEFAULT_VALUES.put(NORMALIZATION_ENABLED,
String.valueOf(DEFAULT_NORMALIZATION_ENABLED));
+ DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY));
for (String s : DEFAULT_VALUES.keySet()) {
RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
}
@@ -1211,9 +1219,13 @@ public class HTableDescriptor implements
WritableComparable<HTableDescriptor> {
* Returns the configured replicas per region
*/
public int getRegionReplication() {
- byte[] val = getValue(REGION_REPLICATION_KEY);
+ return getIntValue(REGION_REPLICATION_KEY, DEFAULT_REGION_REPLICATION);
+ }
+
+ private int getIntValue(ImmutableBytesWritable key, int defaultVal) {
+ byte[] val = getValue(key);
if (val == null || val.length == 0) {
- return DEFAULT_REGION_REPLICATION;
+ return defaultVal;
}
return Integer.parseInt(Bytes.toString(val));
}
@@ -1253,6 +1265,15 @@ public class HTableDescriptor implements
WritableComparable<HTableDescriptor> {
return this;
}
+ public HTableDescriptor setPriority(int priority) {
+ setValue(PRIORITY_KEY, Integer.toString(priority));
+ return this;
+ }
+
+ public int getPriority() {
+ return getIntValue(PRIORITY_KEY, DEFAULT_PRIORITY);
+ }
+
/**
* Returns all the column family names of the current table. The map of
* HTableDescriptor contains mapping of family name to HColumnDescriptors.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index a7759c5..9b7751d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -137,6 +137,12 @@ public enum EventType {
* Master asking RS to close meta.
*/
M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META),
+ /**
+ * Messages originating from Master to RS.<br>
+ * M_RS_OPEN_PRIORITY_REGION<br>
+ * Master asking RS to open a priority region.
+ */
+ M_RS_OPEN_PRIORITY_REGION (26,
ExecutorType.RS_OPEN_PRIORITY_REGION),
/**
* Messages originating from Client to Master.<br>
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 5a16149..e9b0ad5 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -47,7 +47,8 @@ public enum ExecutorType {
RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28),
- RS_COMPACTED_FILES_DISCHARGER (29);
+ RS_COMPACTED_FILES_DISCHARGER (29),
+ RS_OPEN_PRIORITY_REGION (30);
ExecutorType(int value) {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
index c09e41b..d126994 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
@@ -307,4 +307,11 @@ public class TestHTableDescriptor {
hcd.setBlocksize(2000);
htd.addFamily(hcd);
}
+
+ @Test
+ public void testPriority() {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("table"));
+ htd.setPriority(42);
+ assertEquals(42, htd.getPriority());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 018e173..479184f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -127,6 +127,10 @@ public class ExecutorService {
return executor;
}
+ @VisibleForTesting
+ public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
+ return getExecutor(type).getThreadPoolExecutor();
+ }
public void startExecutorService(final ExecutorType type, final int
maxThreads) {
String name = type.getExecutorName(this.servername);
@@ -180,7 +184,7 @@ public class ExecutorService {
}
return ret;
}
-
+
/**
* Executor instance.
*/
@@ -225,7 +229,12 @@ public class ExecutorService {
}
this.threadPoolExecutor.execute(event);
}
-
+
+ TrackingThreadPoolExecutor getThreadPoolExecutor() {
+ return threadPoolExecutor;
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() + "-" + id + "-" + name;
}
@@ -239,7 +248,7 @@ public class ExecutorService {
}
queuedEvents.add((EventHandler)r);
}
-
+
List<RunningEventStatus> running = Lists.newArrayList();
for (Map.Entry<Thread, Runnable> e :
threadPoolExecutor.getRunningTasks().entrySet()) {
@@ -250,18 +259,18 @@ public class ExecutorService {
}
running.add(new RunningEventStatus(e.getKey(), (EventHandler)r));
}
-
+
return new ExecutorStatus(this, queuedEvents, running);
}
}
-
+
/**
* A subclass of ThreadPoolExecutor that keeps track of the Runnables that
* are executing at any given point in time.
*/
static class TrackingThreadPoolExecutor extends ThreadPoolExecutor {
- private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
-
+ private ConcurrentMap<Thread, Runnable> running = Maps.newConcurrentMap();
+
public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
@@ -279,7 +288,7 @@ public class ExecutorService {
assert oldPut == null : "inconsistency for thread " + t;
super.beforeExecute(t, r);
}
-
+
/**
* @return a map of the threads currently running tasks
* inside this executor. Each key is an active thread,
@@ -310,7 +319,7 @@ public class ExecutorService {
this.queuedEvents = queuedEvents;
this.running = running;
}
-
+
/**
* Dump a textual representation of the executor's status
* to the given writer.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0c6ee83..9897b29 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1723,6 +1723,8 @@ public class HRegionServer extends HasThread implements
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
+ this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
+ conf.getInt("hbase.regionserver.executor.openpriorityregion.threads",
3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 640cd8f..329abb6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -168,6 +168,7 @@ import
org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -1653,8 +1654,13 @@ public class RSRpcServices implements
HBaseRPCErrorHandler,
} else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
- regionServer.service.submit(new OpenRegionHandler(
- regionServer, regionServer, region, htd, masterSystemTime,
coordination, ord));
+ if (htd.getPriority() >= HConstants.ADMIN_QOS ||
region.getTable().isSystemTable()) {
+ regionServer.service.submit(new OpenPriorityRegionHandler(
+ regionServer, regionServer, region, htd, masterSystemTime,
coordination, ord));
+ } else {
+ regionServer.service.submit(new OpenRegionHandler(
+ regionServer, regionServer, region, htd, masterSystemTime,
coordination, ord));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java
new file mode 100644
index 0000000..7ce2ac0
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenPriorityRegionHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Handles opening of a high priority region on a region server.
+ * <p>
+ * This is executed after receiving an OPEN RPC from the master or client.
+ */
[email protected]
+public class OpenPriorityRegionHandler extends OpenRegionHandler {
+ public OpenPriorityRegionHandler(Server server, RegionServerServices
rsServices,
+ HRegionInfo regionInfo, HTableDescriptor htd, long masterSystemTime,
+ OpenRegionCoordination coordination,
OpenRegionCoordination.OpenRegionDetails ord) {
+ super(server, rsServices, regionInfo, htd,
EventType.M_RS_OPEN_PRIORITY_REGION,
+ masterSystemTime, coordination, ord);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab1e0dd4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java
new file mode 100644
index 0000000..aac872d
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionOpen.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, RegionServerTests.class})
+public class TestRegionOpen {
+ @SuppressWarnings("unused")
+ private static final Log LOG = LogFactory.getLog(TestRegionOpen.class);
+ private static final int NB_SERVERS = 1;
+
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+ final TableName tableName =
TableName.valueOf(TestRegionOpen.class.getSimpleName());
+
+ @BeforeClass
+ public static void before() throws Exception {
+ HTU.startMiniCluster(NB_SERVERS);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ HTU.shutdownMiniCluster();
+ }
+
+ private static HRegionServer getRS() {
+ return
HTU.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
+ }
+
+ @Test(timeout = 60000)
+ public void testPriorityRegionIsOpenedWithSeparateThreadPool() throws
Exception {
+ ThreadPoolExecutor exec = getRS().getExecutorService()
+ .getExecutorThreadPool(ExecutorType.RS_OPEN_PRIORITY_REGION);
+
+ assertEquals(1, exec.getCompletedTaskCount()); // namespace region
+
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.setPriority(HConstants.HIGH_QOS);
+ htd.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+ try (Connection connection =
ConnectionFactory.createConnection(HTU.getConfiguration());
+ Admin admin = connection.getAdmin()) {
+ admin.createTable(htd);
+ }
+
+ assertEquals(2, exec.getCompletedTaskCount());
+ }
+}