HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)
Includes four patches from Matteos' repository and then fix up to get it
all to
pass, fix findbugs, etc.. I apply the two patches in one go because applying
each independently puts hbase in a non-working state.
1. HBASE-14616 Procedure v2 - Replace the old AM with the new AM
This comes from Matteo's repo here:
https://github.com/matteobertozzi/hbase/commit/689227fcbfe8e6588433dbcdabf4526e3d478b2e
Patch replaces old AM with the new under subpackage master.assignment.
Mostly just updating classes to use new AM -- import changes -- rather
than the old. It also removes old AM and supporting classes.
See below for more detail.
2. HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi)
Adds running of remote procedure. Adds batching of remote calls.
Adds support for assign/unassign in procedures. Adds version info
reporting in rpc. Adds start of an AMv2.
3. and 4. are fixes around merge and split.
This work mostly comes from:
https://github.com/matteobertozzi/hbase/commit/3622cba4e331d2fc7bfc1932abb4c9cbf5802efa
Reporting of remote RS version is from here:
https://github.com/matteobertozzi/hbase/commit/ddb4df3964e8298c88c0210e83493aa91ac0942d.patch
And remote dispatch of procedures is from:
https://github.com/matteobertozzi/hbase/commit/186b9e7c4dae61a79509a6c3aad7f80ec61345e5
The split merge patches from here are also melded in:
https://github.com/matteobertozzi/hbase/commit/9a3a95a2c2974842a4849d1ad867e70764e7f707
and
https://github.com/matteobertozzi/hbase/commit/d6289307a02a777299f65238238a2a8af3253067
Adds testing util for new AM and new sets of tests.
Details:
M hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
Takes list of regionstates on construction rather than a Set on
construction.
NOTE!!!!! This is a change in a public class.
M
hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
Purge old overlapping states: PENDING_OPEN, PENDING_CLOSE, etc.
A
hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
Dispatch remote procedures every 150ms or 32 items -- which ever
happens first (configurable). Runs a timeout thread.
Carries notion of a remote procedure and of a buffer full of these.
"hbase.procedure.remote.dispatcher.threadpool.size" with default = 128
"hbase.procedure.remote.dispatcher.delay.msec" with default = 150ms
"hbase.procedure.remote.dispatcher.max.queue.size" with default = 32
M hbase-protocol-shaded/src/main/protobuf/Admin.proto
b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
Add execute procedures call ExecuteProcedures.
M hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
Add assign and unassign state support for procedures.
M
hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
Adds getting RS version out of RPC
Examples: (1.3.4 is 0x0103004, 2.1.0 is 0x0201000)
M hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
Remove periodic metrics chore. This is done over in new AM now.
Replace AM with the new.
M
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
Have AMv2 handle assigning meta.
M
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
Extract version number of the server making rpc.
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
Add new assign procedure. Runs assign via Procedure Dispatch.
There can only be one RegionTransitionProcedure per region running at the
time,
since each procedure takes a lock on the region.
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
D
hbase-server/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
A procedure-based AM (AMv2).
TODO
- handle region migration
- handle meta assignment first
- handle sys table assignment first (e.g. acl, namespace)
- handle table priorities
"hbase.assignment.bootstrap.thread.pool.size"; default size is 16.
"hbase.assignment.dispatch.wait.msec"; default wait is 150
"hbase.assignment.dispatch.wait.queue.max.size"; wait max default is 100
"hbase.assignment.rit.chore.interval.msec"; default is 5 * 1000;
"hbase.assignment.maximum.attempts"; default is 10;
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java
Procedure that runs subprocedure to unassign and then assign to new
location
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
Manage store of region state (in hbase:meta by default).
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
In-memory state of all regions. Used by AMv2.
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
Base RIT procedure for Assign and Unassign.
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
Unassign procedure.
A
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
Run region assignement in a manner that pays attention to target server
version.
Adds "hbase.regionserver.rpc.startup.waittime"; defaults 60 seconds.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8faab93a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8faab93a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8faab93a
Branch: refs/heads/HBASE-14614
Commit: 8faab93a9c5a25ea141d6d750aa31776ef078f66
Parents: f1c1f25
Author: Michael Stack <[email protected]>
Authored: Wed Mar 22 09:31:14 2017 -0700
Committer: Michael Stack <[email protected]>
Committed: Thu Mar 23 08:42:10 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ClusterStatus.java | 8 +-
.../apache/hadoop/hbase/MetaTableAccessor.java | 2 +-
.../hbase/client/ConnectionImplementation.java | 12 +
.../client/ShortCircuitMasterConnection.java | 13 +-
.../hbase/ipc/ServerTooBusyException.java | 7 +-
.../apache/hadoop/hbase/master/RegionState.java | 24 +-
.../hadoop/hbase/protobuf/ProtobufUtil.java | 2 +-
.../hbase/shaded/protobuf/ProtobufUtil.java | 125 +-
.../hbase/shaded/protobuf/RequestConverter.java | 16 +-
.../shaded/protobuf/ResponseConverter.java | 13 -
.../org/apache/hadoop/hbase/ChoreService.java | 2 +-
.../org/apache/hadoop/hbase/HConstants.java | 2 +-
.../java/org/apache/hadoop/hbase/TableName.java | 7 +-
.../hadoop/hbase/zookeeper/TestZKConfig.java | 1 +
.../master/MetricsAssignmentManagerSource.java | 23 +-
.../MetricsAssignmentManagerSourceImpl.java | 38 +-
.../procedure2/AbstractProcedureScheduler.java | 4 +-
.../hadoop/hbase/procedure2/LockAndQueue.java | 29 +-
.../hadoop/hbase/procedure2/Procedure.java | 6 +-
.../hadoop/hbase/procedure2/ProcedureEvent.java | 6 +-
.../hbase/procedure2/ProcedureExecutor.java | 18 +-
.../hbase/procedure2/ProcedureScheduler.java | 4 +-
.../procedure2/RemoteProcedureDispatcher.java | 367 +
.../hbase/procedure2/StateMachineProcedure.java | 3 +
.../hbase/procedure2/util/DelayedUtil.java | 58 +-
.../procedure2/ProcedureTestingUtility.java | 12 +-
.../hbase/procedure2/util/TestDelayedUtil.java | 2 +-
.../shaded/protobuf/generated/AdminProtos.java | 17559 ++++++++++-------
.../generated/MasterProcedureProtos.java | 7446 +++++--
.../shaded/protobuf/generated/MasterProtos.java | 6937 +++++--
.../generated/RegionServerStatusProtos.java | 1633 +-
.../src/main/protobuf/Admin.proto | 47 +-
.../src/main/protobuf/Master.proto | 36 +
.../src/main/protobuf/MasterProcedure.proto | 95 +-
.../src/main/protobuf/RegionServerStatus.proto | 27 -
.../hbase/rsgroup/RSGroupAdminServer.java | 12 +-
.../hbase/rsgroup/RSGroupBasedLoadBalancer.java | 2 +-
.../balancer/TestRSGroupBasedLoadBalancer.java | 2 +-
.../master/AssignmentManagerStatusTmpl.jamon | 51 +-
.../hbase/tmpl/master/MasterStatusTmpl.jamon | 2 +-
.../hadoop/hbase/client/VersionInfoUtil.java | 81 +-
.../hbase/ipc/BalancedQueueRpcExecutor.java | 3 -
.../ipc/FastPathBalancedQueueRpcExecutor.java | 2 +-
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 10 +-
.../hadoop/hbase/ipc/SimpleRpcServer.java | 4 +-
.../hadoop/hbase/master/AssignCallable.java | 49 -
.../hadoop/hbase/master/AssignmentManager.java | 3053 ---
.../hadoop/hbase/master/BulkAssigner.java | 122 -
.../apache/hadoop/hbase/master/BulkReOpen.java | 136 -
.../hadoop/hbase/master/CatalogJanitor.java | 6 +-
.../hbase/master/GeneralBulkAssigner.java | 213 -
.../org/apache/hadoop/hbase/master/HMaster.java | 165 +-
.../hadoop/hbase/master/LoadBalancer.java | 4 +-
.../hbase/master/MasterCoprocessorHost.java | 22 +
.../hadoop/hbase/master/MasterDumpServlet.java | 8 +-
.../hbase/master/MasterMetaBootstrap.java | 43 +-
.../hadoop/hbase/master/MasterRpcServices.java | 259 +-
.../hadoop/hbase/master/MasterServices.java | 21 +
.../hadoop/hbase/master/MasterWalManager.java | 11 +-
.../hbase/master/MetricsAssignmentManager.java | 39 +-
.../hadoop/hbase/master/RegionStateStore.java | 268 -
.../hadoop/hbase/master/RegionStates.java | 1170 --
.../hadoop/hbase/master/ServerManager.java | 75 +-
.../hbase/master/TableNamespaceManager.java | 5 +-
.../hadoop/hbase/master/UnAssignCallable.java | 47 -
.../master/assignment/AssignProcedure.java | 270 +
.../master/assignment/AssignmentManager.java | 1660 ++
.../assignment/MergeTableRegionsProcedure.java | 717 +
.../master/assignment/MoveRegionProcedure.java | 147 +
.../master/assignment/RegionStateStore.java | 327 +
.../hbase/master/assignment/RegionStates.java | 864 +
.../assignment/RegionTransitionProcedure.java | 315 +
.../assignment/SplitTableRegionProcedure.java | 731 +
.../master/assignment/UnassignProcedure.java | 216 +
.../hbase/master/balancer/BaseLoadBalancer.java | 2 +-
.../master/balancer/RegionLocationFinder.java | 14 +-
.../master/balancer/SimpleLoadBalancer.java | 9 +-
.../master/balancer/StochasticLoadBalancer.java | 17 +-
.../hbase/master/locking/LockProcedure.java | 36 +-
.../AbstractStateMachineTableProcedure.java | 2 +-
.../procedure/AddColumnFamilyProcedure.java | 31 +-
.../procedure/CloneSnapshotProcedure.java | 4 +-
.../procedure/CreateNamespaceProcedure.java | 1 -
.../master/procedure/CreateTableProcedure.java | 41 +-
.../procedure/DeleteColumnFamilyProcedure.java | 31 +-
.../master/procedure/DeleteTableProcedure.java | 10 +-
.../master/procedure/DisableTableProcedure.java | 154 +-
.../DispatchMergingRegionsProcedure.java | 584 +
.../master/procedure/EnableTableProcedure.java | 172 +-
.../procedure/MasterDDLOperationHelper.java | 93 +-
.../procedure/MasterProcedureConstants.java | 2 +-
.../master/procedure/MasterProcedureEnv.java | 28 +-
.../procedure/MasterProcedureScheduler.java | 95 +-
.../procedure/MergeTableRegionsProcedure.java | 909 -
.../procedure/ModifyColumnFamilyProcedure.java | 30 +-
.../master/procedure/ModifyTableProcedure.java | 30 +-
.../master/procedure/ProcedureSyncWait.java | 144 +-
.../master/procedure/RSProcedureDispatcher.java | 542 +
.../procedure/RestoreSnapshotProcedure.java | 27 +-
.../master/procedure/ServerCrashProcedure.java | 492 +-
.../procedure/SplitTableRegionProcedure.java | 785 -
.../procedure/TruncateTableProcedure.java | 6 +-
.../hadoop/hbase/quotas/MasterQuotaManager.java | 18 +-
.../hadoop/hbase/regionserver/CompactSplit.java | 736 +
.../hbase/regionserver/CompactSplitThread.java | 722 -
.../hbase/regionserver/HRegionServer.java | 99 +-
.../hbase/regionserver/RSRpcServices.java | 102 +-
.../hbase/regionserver/RegionMergeRequest.java | 109 +
.../regionserver/RegionServerServices.java | 10 -
.../hadoop/hbase/regionserver/SplitRequest.java | 85 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 2 +-
.../hadoop/hbase/util/ModifyRegionUtils.java | 24 +-
.../apache/hadoop/hbase/wal/WALSplitter.java | 5 +-
.../hadoop/hbase/HBaseTestingUtility.java | 7 +-
.../hadoop/hbase/MockRegionServerServices.java | 10 -
.../hadoop/hbase/TestRegionRebalancing.java | 13 +-
.../hbase/TestStochasticBalancerJmxMetrics.java | 2 +-
.../apache/hadoop/hbase/client/TestAdmin1.java | 20 +-
.../apache/hadoop/hbase/client/TestAdmin2.java | 4 +-
.../hadoop/hbase/client/TestEnableTable.java | 34 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 119 +-
.../hbase/client/TestMetaWithReplicas.java | 4 +-
.../client/TestScannersFromClientSide.java | 10 +-
.../hbase/client/TestServerBusyException.java | 234 +
.../hbase/client/TestTableFavoredNodes.java | 7 +-
.../coprocessor/TestIncrementTimeRange.java | 5 +-
.../hbase/io/encoding/TestChangingEncoding.java | 8 +-
.../hbase/ipc/TestSimpleRpcScheduler.java | 13 +-
.../hbase/master/MockNoopMasterServices.java | 14 +-
.../hadoop/hbase/master/MockRegionServer.java | 37 +-
.../hbase/master/TestAssignmentListener.java | 1 +
.../master/TestAssignmentManagerOnCluster.java | 1402 --
.../hadoop/hbase/master/TestCatalogJanitor.java | 1 +
.../master/TestDistributedLogSplitting.java | 1 +
.../apache/hadoop/hbase/master/TestMaster.java | 1 +
.../master/TestMasterBalanceThrottling.java | 9 +-
.../hadoop/hbase/master/TestMasterFailover.java | 19 +-
.../hbase/master/TestMasterStatusServlet.java | 5 +-
.../hbase/master/TestMetaShutdownHandler.java | 1 +
.../hadoop/hbase/master/TestRegionState.java | 17 +-
.../hadoop/hbase/master/TestRegionStates.java | 144 -
.../hadoop/hbase/master/TestRestartCluster.java | 1 +
.../assignment/AssignmentTestingUtil.java | 125 +
.../master/assignment/MockMasterServices.java | 201 +
.../assignment/TestAssignmentManager.java | 567 +
.../assignment/TestAssignmentOnRSCrash.java | 185 +
.../TestMergeTableRegionsProcedure.java | 239 +
.../master/assignment/TestRegionStates.java | 226 +
.../TestSplitTableRegionProcedure.java | 427 +
.../MasterProcedureTestingUtility.java | 67 +-
.../procedure/TestAddColumnFamilyProcedure.java | 34 +-
.../procedure/TestCloneSnapshotProcedure.java | 10 +-
.../procedure/TestCreateNamespaceProcedure.java | 4 +-
.../procedure/TestCreateTableProcedure.java | 46 +-
.../TestDeleteColumnFamilyProcedure.java | 31 +-
.../procedure/TestDeleteNamespaceProcedure.java | 4 +-
.../procedure/TestDeleteTableProcedure.java | 21 +-
.../procedure/TestDisableTableProcedure.java | 24 +-
.../procedure/TestEnableTableProcedure.java | 24 +-
.../TestMasterFailoverWithProcedures.java | 23 +-
.../procedure/TestMasterProcedureEvents.java | 2 +-
.../TestModifyColumnFamilyProcedure.java | 9 +-
.../procedure/TestModifyNamespaceProcedure.java | 4 +-
.../procedure/TestModifyTableProcedure.java | 18 +-
.../master/procedure/TestProcedureAdmin.java | 13 +-
.../procedure/TestRestoreSnapshotProcedure.java | 13 +-
.../procedure/TestServerCrashProcedure.java | 110 +-
.../TestSplitTableRegionProcedure.java | 420 -
.../procedure/TestTableDDLProcedureBase.java | 7 +-
.../procedure/TestTruncateTableProcedure.java | 11 +-
.../hbase/namespace/TestNamespaceAuditor.java | 4 +-
.../procedure/SimpleMasterProcedureManager.java | 2 +-
.../regionserver/TestCompactSplitThread.java | 24 +-
.../hbase/regionserver/TestCompaction.java | 10 +-
.../regionserver/TestHRegionFileSystem.java | 6 +-
.../TestRegionMergeTransactionOnCluster.java | 14 +-
.../TestSplitTransactionOnCluster.java | 76 +-
.../regionserver/wal/TestAsyncLogRolling.java | 9 +
.../hbase/regionserver/wal/TestLogRolling.java | 5 +
.../wal/TestSecureAsyncWALReplay.java | 5 +
.../hbase/regionserver/wal/TestWALReplay.java | 5 +
.../hadoop/hbase/util/BaseTestHBaseFsck.java | 4 +-
.../hadoop/hbase/util/TestHBaseFsckMOB.java | 2 +-
.../hadoop/hbase/util/TestHBaseFsckOneRS.java | 75 +-
.../hbase/util/TestHBaseFsckReplicas.java | 2 +-
.../hadoop/hbase/util/TestHBaseFsckTwoRS.java | 21 +-
.../src/test/resources/log4j.properties | 1 +
187 files changed, 32709 insertions(+), 24483 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
index f00016d..a7a26a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -81,7 +81,7 @@ public class ClusterStatus extends VersionedWritable {
private Collection<ServerName> deadServers;
private ServerName master;
private Collection<ServerName> backupMasters;
- private Set<RegionState> intransition;
+ private List<RegionState> intransition;
private String clusterId;
private String[] masterCoprocessors;
private Boolean balancerOn;
@@ -91,7 +91,7 @@ public class ClusterStatus extends VersionedWritable {
final Collection<ServerName> deadServers,
final ServerName master,
final Collection<ServerName> backupMasters,
- final Set<RegionState> rit,
+ final List<RegionState> rit,
final String[] masterCoprocessors,
final Boolean balancerOn) {
this.hbaseVersion = hbaseVersion;
@@ -262,7 +262,7 @@ public class ClusterStatus extends VersionedWritable {
}
@InterfaceAudience.Private
- public Set<RegionState> getRegionsInTransition() {
+ public List<RegionState> getRegionsInTransition() {
return this.intransition;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index ee8d5fd..15bc132 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -2049,7 +2049,7 @@ public class MetaTableAccessor {
+ Bytes.toStringBinary(HConstants.MERGEB_QUALIFIER));
}
- private static Put addRegionInfo(final Put p, final HRegionInfo hri)
+ public static Put addRegionInfo(final Put p, final HRegionInfo hri)
throws IOException {
p.addImmutable(getCatalogFamily(), HConstants.REGIONINFO_QUALIFIER,
hri.toByteArray());
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index adf1496..135946f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1316,6 +1316,12 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
return stub.mergeTableRegions(controller, request);
}
+ public MasterProtos.DispatchMergingRegionsResponse
dispatchMergingRegions(
+ RpcController controller, MasterProtos.DispatchMergingRegionsRequest
request)
+ throws ServiceException {
+ return stub.dispatchMergingRegions(controller, request);
+ }
+
@Override
public MasterProtos.AssignRegionResponse assignRegion(RpcController
controller,
MasterProtos.AssignRegionRequest request) throws ServiceException {
@@ -1335,6 +1341,12 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
}
@Override
+ public MasterProtos.SplitTableRegionResponse splitRegion(RpcController
controller,
+ MasterProtos.SplitTableRegionRequest request) throws
ServiceException {
+ return stub.splitRegion(controller, request);
+ }
+
+ @Override
public MasterProtos.DeleteTableResponse deleteTable(RpcController
controller,
MasterProtos.DeleteTableRequest request) throws ServiceException {
return stub.deleteTable(controller, request);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index d70c76f..3469782 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -480,4 +480,15 @@ public class ShortCircuitMasterConnection implements
MasterKeepAliveConnection {
return stub.listReplicationPeers(controller, request);
}
-}
+ @Override
+ public SplitTableRegionResponse splitRegion(RpcController controller,
SplitTableRegionRequest request)
+ throws ServiceException {
+ return stub.splitRegion(controller, request);
+ }
+
+ @Override
+ public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController
controller,
+ DispatchMergingRegionsRequest request) throws ServiceException {
+ return stub.dispatchMergingRegions(controller, request);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
index c6ba030..0dd8e64 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
@@ -25,14 +25,13 @@ import
org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
- * Throw this in rpc call if there are too many pending requests for one
region server
+ * Throw this in RPC call if there are too many pending requests for one
region server
*/
+@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServerTooBusyException extends DoNotRetryIOException {
-
public ServerTooBusyException(InetSocketAddress address, long count) {
- super("There are " + count + " concurrent rpc requests for " + address);
+ super("Busy Server! " + count + " concurrent RPCs against " + address);
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
index a930732..7116763 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
@@ -36,10 +36,8 @@ public class RegionState {
@InterfaceStability.Evolving
public enum State {
OFFLINE, // region is in an offline state
- PENDING_OPEN, // same as OPENING, to be removed
OPENING, // server has begun to open but not yet done
OPEN, // server opened region and updated meta
- PENDING_CLOSE, // same as CLOSING, to be removed
CLOSING, // server has begun to close but not yet done
CLOSED, // server closed region and updated meta
SPLITTING, // server started split of a region
@@ -64,18 +62,12 @@ public class RegionState {
case OFFLINE:
rs = ClusterStatusProtos.RegionState.State.OFFLINE;
break;
- case PENDING_OPEN:
- rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN;
- break;
case OPENING:
rs = ClusterStatusProtos.RegionState.State.OPENING;
break;
case OPEN:
rs = ClusterStatusProtos.RegionState.State.OPEN;
break;
- case PENDING_CLOSE:
- rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE;
- break;
case CLOSING:
rs = ClusterStatusProtos.RegionState.State.CLOSING;
break;
@@ -124,8 +116,6 @@ public class RegionState {
state = OFFLINE;
break;
case PENDING_OPEN:
- state = PENDING_OPEN;
- break;
case OPENING:
state = OPENING;
break;
@@ -133,8 +123,6 @@ public class RegionState {
state = OPEN;
break;
case PENDING_CLOSE:
- state = PENDING_CLOSE;
- break;
case CLOSING:
state = CLOSING;
break;
@@ -166,7 +154,7 @@ public class RegionState {
state = MERGING_NEW;
break;
default:
- throw new IllegalStateException("");
+ throw new IllegalStateException("Unhandled state " + protoState);
}
return state;
}
@@ -231,22 +219,16 @@ public class RegionState {
this.ritDuration += (this.stamp - previousStamp);
}
- /**
- * PENDING_CLOSE (to be removed) is the same as CLOSING
- */
public boolean isClosing() {
- return state == State.PENDING_CLOSE || state == State.CLOSING;
+ return state == State.CLOSING;
}
public boolean isClosed() {
return state == State.CLOSED;
}
- /**
- * PENDING_OPEN (to be removed) is the same as OPENING
- */
public boolean isOpening() {
- return state == State.PENDING_OPEN || state == State.OPENING;
+ return state == State.OPENING;
}
public boolean isOpened() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 4f68447..fcf2c34 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1803,7 +1803,7 @@ public final class ProtobufUtil {
* has a serialized {@link ServerName} in it.
* @return Returns null if <code>data</code> is null else converts passed
data
* to a ServerName instance.
- * @throws DeserializationException
+ * @throws DeserializationException
*/
public static ServerName toServerName(final byte [] data) throws
DeserializationException {
if (data == null || data.length <= 0) return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index f44979c..5cec10d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -20,19 +20,19 @@ package org.apache.hadoop.hbase.shaded.protobuf;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -83,11 +83,13 @@ import org.apache.hadoop.hbase.io.LimitInputStream;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
import org.apache.hadoop.hbase.quotas.QuotaScope;
import org.apache.hadoop.hbase.quotas.QuotaType;
import org.apache.hadoop.hbase.quotas.ThrottleType;
import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
@@ -101,8 +103,6 @@ import
org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -149,7 +149,6 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -166,6 +165,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.VersionInfo;
@@ -1814,33 +1814,6 @@ public final class ProtobufUtil {
}
/**
- * A helper to close a region for split or merge
- * using admin protocol.
- *
- * @param controller RPC controller
- * @param admin Admin service
- * @param server the RS that hosts the target region
- * @param regionInfo the target region info
- * @return true if the region is closed
- * @throws IOException
- */
- public static boolean closeRegionForSplitOrMerge(
- final RpcController controller,
- final AdminService.BlockingInterface admin,
- final ServerName server,
- final HRegionInfo... regionInfo) throws IOException {
- CloseRegionForSplitOrMergeRequest closeRegionForRequest =
- ProtobufUtil.buildCloseRegionForSplitOrMergeRequest(server,
regionInfo);
- try {
- CloseRegionForSplitOrMergeResponse response =
- admin.closeRegionForSplitOrMerge(controller, closeRegionForRequest);
- return ResponseConverter.isClosed(response);
- } catch (ServiceException se) {
- throw getRemoteException(se);
- }
- }
-
- /**
* A helper to warmup a region given a region name
* using admin protocol
*
@@ -1992,6 +1965,46 @@ public final class ProtobufUtil {
}
}
+ /**
+ * A helper to merge regions using admin protocol. Send request to
+ * regionserver.
+ * @param admin
+ * @param region_a
+ * @param region_b
+ * @param forcible true if do a compulsory merge, otherwise we will only
merge
+ * two adjacent regions
+ * @param user effective user
+ * @throws IOException
+ */
+ public static void mergeRegions(final RpcController controller,
+ final AdminService.BlockingInterface admin,
+ final HRegionInfo region_a, final HRegionInfo region_b,
+ final boolean forcible, final User user) throws IOException {
+ final MergeRegionsRequest request = ProtobufUtil.buildMergeRegionsRequest(
+ region_a.getRegionName(), region_b.getRegionName(),forcible);
+ if (user != null) {
+ try {
+ user.runAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ admin.mergeRegions(controller, request);
+ return null;
+ }
+ });
+ } catch (InterruptedException ie) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ } else {
+ try {
+ admin.mergeRegions(controller, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ }
+
// End helpers for Admin
/*
@@ -3003,8 +3016,8 @@ public final class ProtobufUtil {
backupMasters.add(ProtobufUtil.toServerName(sn));
}
- Set<RegionState> rit = null;
- rit = new HashSet<>(proto.getRegionsInTransitionList().size());
+ List<RegionState> rit =
+ new ArrayList<>(proto.getRegionsInTransitionList().size());
for (RegionInTransition region : proto.getRegionsInTransitionList()) {
RegionState value = RegionState.convert(region.getRegionState());
rit.add(value);
@@ -3163,26 +3176,6 @@ public final class ProtobufUtil {
}
/**
- * Create a CloseRegionForSplitOrMergeRequest for given regions
- *
- * @param server the RS server that hosts the region
- * @param regionsToClose the info of the regions to close
- * @return a CloseRegionForSplitRequest
- */
- public static CloseRegionForSplitOrMergeRequest
buildCloseRegionForSplitOrMergeRequest(
- final ServerName server,
- final HRegionInfo... regionsToClose) {
- CloseRegionForSplitOrMergeRequest.Builder builder =
- CloseRegionForSplitOrMergeRequest.newBuilder();
- for(int i = 0; i < regionsToClose.length; i++) {
- RegionSpecifier regionToClose = RequestConverter.buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionsToClose[i].getRegionName());
- builder.addRegion(regionToClose);
- }
- return builder.build();
- }
-
- /**
* Create a CloseRegionRequest for a given encoded region name
*
* @param encodedRegionName the name of the region to close
@@ -3220,6 +3213,28 @@ public final class ProtobufUtil {
return builder.build();
}
+ /**
+ * Create a MergeRegionsRequest for the given regions
+ * @param regionA name of region a
+ * @param regionB name of region b
+ * @param forcible true if it is a compulsory merge
+ * @return a MergeRegionsRequest
+ */
+ public static MergeRegionsRequest buildMergeRegionsRequest(
+ final byte[] regionA, final byte[] regionB, final boolean forcible) {
+ MergeRegionsRequest.Builder builder = MergeRegionsRequest.newBuilder();
+ RegionSpecifier regionASpecifier = RequestConverter.buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionA);
+ RegionSpecifier regionBSpecifier = RequestConverter.buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionB);
+ builder.setRegionA(regionASpecifier);
+ builder.setRegionB(regionBSpecifier);
+ builder.setForcible(forcible);
+ // send the master's wall clock time as well, so that the RS can refer to
it
+ builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+ return builder.build();
+ }
+
/**
* Get a ServerName from the passed in data bytes.
* @param data Data with a serialize server name in it; can handle the old
style
@@ -3263,4 +3278,4 @@ public final class ProtobufUtil {
int port = Addressing.parsePort(str);
return ServerName.valueOf(hostname, port, -1L);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index a513d66..7b50c3f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -117,7 +117,6 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOr
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@@ -1113,19 +1112,6 @@ public final class RequestConverter {
return builder.build();
}
- public static SplitTableRegionRequest buildSplitTableRegionRequest(
- final HRegionInfo regionInfo,
- final byte[] splitPoint,
- final long nonceGroup,
- final long nonce) {
- SplitTableRegionRequest.Builder builder =
SplitTableRegionRequest.newBuilder();
- builder.setRegionInfo(HRegionInfo.convert(regionInfo));
- builder.setSplitRow(UnsafeByteOperations.unsafeWrap(splitPoint));
- builder.setNonceGroup(nonceGroup);
- builder.setNonce(nonce);
- return builder.build();
- }
-
/**
* Create a protocol buffer AssignRegionRequest
*
@@ -1508,7 +1494,7 @@ public final class RequestConverter {
/**
* Create a RegionOpenInfo based on given region info and version of offline
node
*/
- private static RegionOpenInfo buildRegionOpenInfo(
+ public static RegionOpenInfo buildRegionOpenInfo(
final HRegionInfo region,
final List<ServerName> favoredNodes, Boolean openForReplay) {
RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index ecadbbc..c489628 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -34,7 +34,6 @@ import
org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.SingleResponse;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
@@ -254,18 +253,6 @@ public final class ResponseConverter {
}
/**
- * Check if the region is closed from a CloseRegionForSplitResponse
- *
- * @param proto the CloseRegionForSplitResponse
- * @return the region close state
- */
- public static boolean isClosed
- (final CloseRegionForSplitOrMergeResponse proto) {
- if (proto == null || !proto.hasClosed()) return false;
- return proto.getClosed();
- }
-
- /**
* A utility to build a GetServerInfoResponse.
*
* @param serverName
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
----------------------------------------------------------------------
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
index d4ec48e..19363d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
@@ -248,7 +248,7 @@ public class ChoreService implements ChoreServicer {
*/
static class ChoreServiceThreadFactory implements ThreadFactory {
private final String threadPrefix;
- private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
+ private final static String THREAD_NAME_SUFFIX = "_Chore_";
private AtomicInteger threadNumber = new AtomicInteger(1);
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 609e9a5..3789f71 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -146,7 +146,7 @@ public final class HConstants {
public static final int DEFAULT_HBASE_BALANCER_PERIOD = 300000;
/** The name of the ensemble table */
- public static final String ENSEMBLE_TABLE_NAME = "hbase:ensemble";
+ public static final TableName ENSEMBLE_TABLE_NAME =
TableName.valueOf("hbase:ensemble");
/** Config for pluggable region normalizer */
public static final String HBASE_MASTER_NORMALIZER_CLASS =
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
index 9b9755b..cba03c0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java
@@ -89,7 +89,12 @@ public final class TableName implements
Comparable<TableName> {
public static final String OLD_META_STR = ".META.";
public static final String OLD_ROOT_STR = "-ROOT-";
-
+ /**
+ * @return True if <code>tn</code> is the hbase:meta table name.
+ */
+ public static boolean isMetaTableName(final TableName tn) {
+ return tn.equals(TableName.META_TABLE_NAME);
+ }
/**
* TableName for old -ROOT- table. It is used to read/process old WALs which
have
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git
a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
index 216fe0c..8536ce2 100644
---
a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
+++
b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
----------------------------------------------------------------------
diff --git
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
index f6c9cb8..7e1f836 100644
---
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
+++
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java
@@ -46,12 +46,10 @@ public interface MetricsAssignmentManagerSource extends
BaseSource {
String RIT_COUNT_OVER_THRESHOLD_NAME = "ritCountOverThreshold";
String RIT_OLDEST_AGE_NAME = "ritOldestAge";
String RIT_DURATION_NAME = "ritDuration";
- String ASSIGN_TIME_NAME = "assign";
- String BULK_ASSIGN_TIME_NAME = "bulkAssign";
-
- void updateAssignmentTime(long time);
- void updateBulkAssignTime(long time);
+ String OPERATION_COUNT_NAME = "operationCount";
+ String ASSIGN_TIME_NAME = "assign";
+ String UNASSIGN_TIME_NAME = "unassign";
/**
* Set the number of regions in transition.
@@ -75,4 +73,19 @@ public interface MetricsAssignmentManagerSource extends
BaseSource {
void setRITOldestAge(long age);
void updateRitDuration(long duration);
+
+ /**
+ * Increment the count of assignment operation (assign/unassign).
+ */
+ void incrementOperationCounter();
+
+ /**
+ * Add the time took to perform the last assign operation
+ */
+ void updateAssignTime(long time);
+
+ /**
+ * Add the time took to perform the last unassign operation
+ */
+ void updateUnassignTime(long time);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
----------------------------------------------------------------------
diff --git
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
index ab504f5..722358d 100644
---
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
+++
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
@InterfaceAudience.Private
@@ -32,8 +33,10 @@ public class MetricsAssignmentManagerSourceImpl
private MutableGaugeLong ritCountOverThresholdGauge;
private MutableGaugeLong ritOldestAgeGauge;
private MetricHistogram ritDurationHisto;
+
+ private MutableFastCounter operationCounter;
private MetricHistogram assignTimeHisto;
- private MetricHistogram bulkAssignTimeHisto;
+ private MetricHistogram unassignTimeHisto;
public MetricsAssignmentManagerSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT);
@@ -49,31 +52,40 @@ public class MetricsAssignmentManagerSourceImpl
ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l);
ritCountOverThresholdGauge =
metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l);
ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l);
- assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME);
- bulkAssignTimeHisto =
metricsRegistry.newTimeHistogram(BULK_ASSIGN_TIME_NAME);
+ operationCounter = metricsRegistry.getCounter(OPERATION_COUNT_NAME, 0l);
ritDurationHisto = metricsRegistry.newTimeHistogram(RIT_DURATION_NAME);
+ assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME);
+ unassignTimeHisto = metricsRegistry.newTimeHistogram(UNASSIGN_TIME_NAME);
}
@Override
- public void updateAssignmentTime(long time) {
- assignTimeHisto.add(time);
+ public void setRIT(final int ritCount) {
+ ritGauge.set(ritCount);
}
@Override
- public void updateBulkAssignTime(long time) {
- bulkAssignTimeHisto.add(time);
+ public void setRITCountOverThreshold(final int ritCount) {
+ ritCountOverThresholdGauge.set(ritCount);
}
- public void setRIT(int ritCount) {
- ritGauge.set(ritCount);
+ @Override
+ public void setRITOldestAge(final long ritCount) {
+ ritOldestAgeGauge.set(ritCount);
}
- public void setRITCountOverThreshold(int ritCount) {
- ritCountOverThresholdGauge.set(ritCount);
+ @Override
+ public void incrementOperationCounter() {
+ operationCounter.incr();
}
- public void setRITOldestAge(long ritCount) {
- ritOldestAgeGauge.set(ritCount);
+ @Override
+ public void updateAssignTime(final long time) {
+ assignTimeHisto.add(time);
+ }
+
+ @Override
+ public void updateUnassignTime(final long time) {
+ unassignTimeHisto.add(time);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index 646bc1f..fc80c9c 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -25,13 +25,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
[email protected]
public abstract class AbstractProcedureScheduler implements ProcedureScheduler
{
private static final Log LOG =
LogFactory.getLog(AbstractProcedureScheduler.class);
-
private final ReentrantLock schedLock = new ReentrantLock();
private final Condition schedWaitCond = schedLock.newCondition();
private boolean running = false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
index 19ba28c..e11c23c 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
@@ -19,24 +19,25 @@
package org.apache.hadoop.hbase.procedure2;
/**
- * Locking for mutual exclusion between procedures. Only by procedure
framework internally.
+ * Locking for mutual exclusion between procedures. Used only by procedure
framework internally.
* {@link LockAndQueue} has two purposes:
* <ol>
- * <li>Acquire/release exclusive/shared locks</li>
- * <li>Maintain a list of procedures waiting for this lock<br>
- * To do so, {@link LockAndQueue} extends {@link ProcedureDeque} class.
Using inheritance over
- * composition for this need is unusual, but the choice is motivated by
million regions
- * assignment case as it will reduce memory footprint and number of
objects to be GCed.
+ * <li>Acquire/release exclusive/shared locks.</li>
+ * <li>Maintains a list of procedures waiting on this lock.
+ * {@link LockAndQueue} extends {@link ProcedureDeque} class. Blocked
Procedures are added
+ * to our super Deque. Using inheritance over composition to keep the
Deque of waiting
+ * Procedures is unusual, but we do it this way because in certain cases,
there will be
+ * millions of regions. This layout uses less memory.
* </ol>
*
- * NOT thread-safe. Needs external concurrency control. For eg. Uses in
MasterProcedureScheduler are
+ * <p>NOT thread-safe. Needs external concurrency control: e.g. uses in
MasterProcedureScheduler are
* guarded by schedLock().
* <br>
* There is no need of 'volatile' keyword for member variables because of
memory synchronization
* guarantees of locks (see 'Memory Synchronization',
*
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html)
* <br>
- * We do not implement Lock interface because we need exclusive + shared
locking, and also
+ * We do not implement Lock interface because we need exclusive and shared
locking, and also
* because try-lock functions require procedure id.
* <br>
* We do not use ReentrantReadWriteLock directly because of its high memory
overhead.
@@ -104,6 +105,9 @@ public class LockAndQueue extends ProcedureDeque implements
LockStatus {
return true;
}
+ /**
+ * @return True if we released a lock.
+ */
public boolean releaseExclusiveLock(final Procedure proc) {
if (isLockOwner(proc.getProcId())) {
exclusiveLockProcIdOwner = Long.MIN_VALUE;
@@ -111,4 +115,11 @@ public class LockAndQueue extends ProcedureDeque
implements LockStatus {
}
return false;
}
-}
+
+ @Override
+ public String toString() {
+ return "exclusiveLockOwner=" + (hasExclusiveLock()?
getExclusiveLockProcIdOwner(): "NONE") +
+ ", sharedLockCount=" + getSharedLockCount() +
+ ", waitingProcCount=" + size();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index fee5250..2841697 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -253,9 +253,8 @@ public abstract class Procedure<TEnvironment> implements
Comparable<Procedure> {
*/
protected StringBuilder toStringSimpleSB() {
final StringBuilder sb = new StringBuilder();
- toStringClassDetails(sb);
- sb.append(", procId=");
+ sb.append("procId=");
sb.append(getProcId());
if (hasParent()) {
@@ -275,6 +274,9 @@ public abstract class Procedure<TEnvironment> implements
Comparable<Procedure> {
sb.append(", failed=" + getException());
}
+ sb.append(", ");
+ toStringClassDetails(sb);
+
return sb;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
index cb90ac0..43cce3a 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Basic ProcedureEvent that contains an "object", which can be a description
or a reference to the
@@ -50,6 +49,7 @@ public class ProcedureEvent<T> {
@Override
public String toString() {
- return getClass().getSimpleName() + "(" + object + ")";
+ return getClass().getSimpleName() + " for " + object + ", ready=" +
isReady() +
+ ", suspended procedures count=" + getSuspendedProcedures().size();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 0856aa2..3145e83 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -360,8 +360,7 @@ public class ProcedureExecutor<TEnvironment> {
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed
proc=" + proc;
if (debugEnabled) {
- LOG.debug(String.format("Loading state=%s isFailed=%s: %s",
- proc.getState(), proc.hasException(), proc));
+ LOG.debug(String.format("Loading %s", proc));
}
Long rootProcId = getRootProcedureId(proc);
@@ -483,7 +482,7 @@ public class ProcedureExecutor<TEnvironment> {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
- LOG.info("Starting executor threads=" + corePoolSize);
+ LOG.info("Starting executor worker threads=" + corePoolSize);
// Create the Thread Group for the executors
threadGroup = new ThreadGroup("ProcedureExecutor");
@@ -522,7 +521,9 @@ public class ProcedureExecutor<TEnvironment> {
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// Start the executors. Here we must have the lastProcId set.
- LOG.debug("Start workers " + workerThreads.size());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Start workers " + workerThreads.size());
+ }
timeoutExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
@@ -1147,8 +1148,7 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.isSuccess()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Completed in " +
- StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
+ LOG.debug("Completed " + proc + " in " +
StringUtils.humanTimeDiff(proc.elapsedTime()));
}
// Finalize the procedure state
if (proc.getProcId() == rootProcId) {
@@ -1342,7 +1342,7 @@ public class ProcedureExecutor<TEnvironment> {
return;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
- String msg = "CODE-BUG: Uncatched runtime exception for procedure: " +
procedure;
+ String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
LOG.error(msg, e);
procedure.setFailure(new RemoteProcedureException(msg, e));
}
@@ -1674,7 +1674,7 @@ public class ProcedureExecutor<TEnvironment> {
// if the procedure is in a waiting state again, put it back in the
queue
procedure.updateTimestamp();
if (procedure.isWaiting()) {
- delayed.setTimeoutTimestamp(procedure.getTimeoutTimestamp());
+ delayed.setTimeout(procedure.getTimeoutTimestamp());
queue.add(delayed);
}
} else {
@@ -1752,7 +1752,7 @@ public class ProcedureExecutor<TEnvironment> {
}
@Override
- public long getTimeoutTimestamp() {
+ public long getTimeout() {
return timeout;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index 16ff781..617532b 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -23,13 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Keep track of the runnable procedures
*/
@InterfaceAudience.Private
[email protected]
public interface ProcedureScheduler {
/**
* Start the scheduler
@@ -93,7 +91,7 @@ public interface ProcedureScheduler {
Procedure poll(long timeout, TimeUnit unit);
/**
- * Mark the event has not ready.
+ * Mark the event as not ready.
* procedures calling waitEvent() will be suspended.
* @param event the event to mark as suspended/not ready
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
new file mode 100644
index 0000000..0e33887
--- /dev/null
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import
org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.collect.ArrayListMultimap;
+
+/**
+ * A procedure dispatcher that aggregates and sends after elapsed time or
after we hit
+ * count threshold. Creates its own threadpool to run RPCs with timeout.
+ * <ul>
+ * <li>Each server queue has a dispatch buffer</li>
+ * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
+ * </ul>
+ * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done,
+ * call {@link #stop()}.
+ */
[email protected]
+public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends
Comparable<TRemote>> {
+ private static final Log LOG =
LogFactory.getLog(RemoteProcedureDispatcher.class);
+
+ public static final String THREAD_POOL_SIZE_CONF_KEY =
+ "hbase.procedure.remote.dispatcher.threadpool.size";
+ private static final int DEFAULT_THREAD_POOL_SIZE = 128;
+
+ public static final String DISPATCH_DELAY_CONF_KEY =
+ "hbase.procedure.remote.dispatcher.delay.msec";
+ private static final int DEFAULT_DISPATCH_DELAY = 150;
+
+ public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
+ "hbase.procedure.remote.dispatcher.max.queue.size";
+ private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
+ new ConcurrentHashMap<TRemote, BufferNode>();
+
+ private final int operationDelay;
+ private final int queueMaxSize;
+ private final int corePoolSize;
+
+ private TimeoutExecutorThread timeoutExecutor;
+ private ThreadPoolExecutor threadPool;
+
+ protected RemoteProcedureDispatcher(Configuration conf) {
+ this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY,
DEFAULT_THREAD_POOL_SIZE);
+ this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY,
DEFAULT_DISPATCH_DELAY);
+ this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY,
DEFAULT_MAX_QUEUE_SIZE);
+ }
+
+ public boolean start() {
+ if (running.getAndSet(true)) {
+ LOG.warn("Already running");
+ return false;
+ }
+
+ LOG.info("Starting procedure remote dispatcher; threads=" +
this.corePoolSize +
+ ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" +
this.operationDelay);
+
+ // Create the timeout executor
+ timeoutExecutor = new TimeoutExecutorThread();
+ timeoutExecutor.start();
+
+ // Create the thread pool that will execute RPCs
+ threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L,
TimeUnit.SECONDS,
+ Threads.newDaemonThreadFactory("ProcedureRemoteDispatcher",
getUncaughtExceptionHandler()));
+ return true;
+ }
+
+ public boolean stop() {
+ if (!running.getAndSet(false)) {
+ return false;
+ }
+
+ LOG.info("Stopping procedure remote dispatcher");
+
+ // send stop signals
+ timeoutExecutor.sendStopSignal();
+ threadPool.shutdownNow();
+ return true;
+ }
+
+ public void join() {
+ assert !running.get() : "expected not running";
+
+ // wait the timeout executor
+ timeoutExecutor.awaitTermination();
+ timeoutExecutor = null;
+
+ // wait for the thread pool to terminate
+ threadPool.shutdownNow();
+ try {
+ while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOG.warn("Waiting for thread-pool to terminate");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for thread-pool termination", e);
+ }
+ }
+
+ protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.warn("Failed to execute remote procedures " + t.getName(), e);
+ }
+ };
+ }
+
+ //
============================================================================================
+ // Node Helpers
+ //
============================================================================================
+ /**
+ * Add a node that will be able to execute remove procedures
+ * @param key the node identifier
+ */
+ public void addNode(final TRemote key) {
+ final BufferNode newNode = new BufferNode(key);
+ nodeMap.putIfAbsent(key, newNode);
+ }
+
+ /**
+ * Get the remote node that will execute remote procedures
+ * @param key the node identifier
+ */
+ public RemoteNode getNode(final TRemote key) {
+ assert key != null : "found null key for node";
+ return nodeMap.get(key);
+ }
+
+ /**
+ * Remove a remote node
+ * @param key the node identifier
+ */
+ public boolean removeNode(final TRemote key) {
+ final BufferNode node = nodeMap.remove(key);
+ if (node == null) return false;
+ node.abortOperationsInQueue();
+ return true;
+ }
+
+ //
============================================================================================
+ // Task Helpers
+ //
============================================================================================
+ protected Future<Void> submitTask(Callable<Void> task) {
+ return threadPool.submit(task);
+ }
+
+ protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit
unit) {
+ final FutureTask<Void> futureTask = new FutureTask(task);
+ timeoutExecutor.add(new DelayedTask(futureTask, delay, unit));
+ return futureTask;
+ }
+
+ protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure>
operations);
+ protected abstract void abortPendingOperations(TRemote key,
Set<RemoteProcedure> operations);
+
+ /**
+ * Data structure with reference to remote operation.
+ */
+ public static abstract class RemoteOperation {
+ private final RemoteProcedure remoteProcedure;
+
+ protected RemoteOperation(final RemoteProcedure remoteProcedure) {
+ this.remoteProcedure = remoteProcedure;
+ }
+
+ public RemoteProcedure getRemoteProcedure() {
+ return remoteProcedure;
+ }
+ }
+
+ /**
+ * Remote procedure reference.
+ * @param <TEnv>
+ * @param <TRemote>
+ */
+ public interface RemoteProcedure<TEnv, TRemote> {
+ RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
+ void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation
response);
+ void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
+ }
+
+ /**
+ * Account of what procedures are running on remote node.
+ * @param <TEnv>
+ * @param <TRemote>
+ */
+ public interface RemoteNode<TEnv, TRemote> {
+ TRemote getKey();
+ void add(RemoteProcedure<TEnv, TRemote> operation);
+ void dispatch();
+ }
+
+ protected ArrayListMultimap<Class<?>, RemoteOperation>
buildAndGroupRequestByType(final TEnv env,
+ final TRemote remote, final Set<RemoteProcedure> operations) {
+ final ArrayListMultimap<Class<?>, RemoteOperation> requestByType =
ArrayListMultimap.create();
+ for (RemoteProcedure proc: operations) {
+ RemoteOperation operation = proc.remoteCallBuild(env, remote);
+ requestByType.put(operation.getClass(), operation);
+ }
+ return requestByType;
+ }
+
+ protected <T extends RemoteOperation> List<T> fetchType(
+ final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final
Class<T> type) {
+ return (List<T>)requestByType.removeAll(type);
+ }
+
+ //
============================================================================================
+ // Timeout Helpers
+ //
============================================================================================
+ private final class TimeoutExecutorThread extends Thread {
+ private final DelayQueue<DelayedWithTimeout> queue = new
DelayQueue<DelayedWithTimeout>();
+
+ public TimeoutExecutorThread() {
+ super("ProcedureDispatcherTimeoutThread");
+ }
+
+ @Override
+ public void run() {
+ while (running.get()) {
+ final DelayedWithTimeout task =
DelayedUtil.takeWithoutInterrupt(queue);
+ if (task == null || task == DelayedUtil.DELAYED_POISON) {
+ // the executor may be shutting down, and the task is just the
shutdown request
+ continue;
+ }
+
+ if (task instanceof DelayedTask) {
+ threadPool.execute(((DelayedTask)task).getObject());
+ } else {
+ ((BufferNode)task).dispatch();
+ }
+ }
+ }
+
+ public void add(final DelayedWithTimeout delayed) {
+ queue.add(delayed);
+ }
+
+ public void remove(final DelayedWithTimeout delayed) {
+ queue.remove(delayed);
+ }
+
+ public void sendStopSignal() {
+ queue.add(DelayedUtil.DELAYED_POISON);
+ }
+
+ public void awaitTermination() {
+ try {
+ final long startTime = EnvironmentEdgeManager.currentTime();
+ for (int i = 0; isAlive(); ++i) {
+ sendStopSignal();
+ join(250);
+ if (i > 0 && (i % 8) == 0) {
+ LOG.warn("Waiting termination of thread " + getName() + ", " +
+ StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() -
startTime));
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn(getName() + " join wait got interrupted", e);
+ }
+ }
+ }
+
+ //
============================================================================================
+ // Internals Helpers
+ //
============================================================================================
+
+ /**
+ * Node that contains a set of RemoteProcedures
+ */
+ protected final class BufferNode extends
DelayedContainerWithTimestamp<TRemote>
+ implements RemoteNode<TEnv, TRemote> {
+ private Set<RemoteProcedure> operations;
+
+ protected BufferNode(final TRemote key) {
+ super(key, 0);
+ }
+
+ public TRemote getKey() {
+ return getObject();
+ }
+
+ public synchronized void add(final RemoteProcedure operation) {
+ if (this.operations == null) {
+ this.operations = new HashSet<>();
+ setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
+ timeoutExecutor.add(this);
+ }
+ this.operations.add(operation);
+ if (this.operations.size() > queueMaxSize) {
+ timeoutExecutor.remove(this);
+ dispatch();
+ }
+ }
+
+ public synchronized void dispatch() {
+ if (operations != null) {
+ remoteDispatch(getKey(), operations);
+ this.operations = null;
+ }
+ }
+
+ public synchronized void abortOperationsInQueue() {
+ if (operations != null) {
+ abortPendingOperations(getKey(), operations);
+ this.operations = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", operations=" + this.operations;
+ }
+ }
+
+ /**
+ * Delayed object that holds a FutureTask.
+ * used to submit something later to the thread-pool.
+ */
+ private static final class DelayedTask extends
DelayedContainerWithTimestamp<FutureTask<Void>> {
+ public DelayedTask(final FutureTask<Void> task, final long delay, final
TimeUnit unit) {
+ super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 5c3a4c7..ea2a41f 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -108,6 +108,9 @@ public abstract class StateMachineProcedure<TEnvironment,
TState>
if (aborted.get() && isRollbackSupported(getCurrentState())) {
setAbortFailure(getClass().getSimpleName(), "abort requested");
} else {
+ if (aborted.get()) {
+ LOG.warn("ignoring abort request " + state);
+ }
setNextState(getStateId(state));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index ea34c49..3719019 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -32,13 +32,19 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public final class DelayedUtil {
private DelayedUtil() { }
+ /**
+ * Add a timeout to a Delay
+ */
public interface DelayedWithTimeout extends Delayed {
- long getTimeoutTimestamp();
+ long getTimeout();
}
+ /**
+ * POISON implementation; used to mark special state: e.g. shutdown.
+ */
public static final DelayedWithTimeout DELAYED_POISON = new
DelayedWithTimeout() {
@Override
- public long getTimeoutTimestamp() {
+ public long getTimeout() {
return 0;
}
@@ -49,7 +55,7 @@ public final class DelayedUtil {
@Override
public int compareTo(final Delayed o) {
- return Long.compare(0, DelayedUtil.getTimeoutTimestamp(o));
+ return Long.compare(0, DelayedUtil.getTimeout(o));
}
@Override
@@ -59,10 +65,13 @@ public final class DelayedUtil {
@Override
public String toString() {
- return getClass().getSimpleName() + "(POISON)";
+ return "POISON";
}
};
+ /**
+ * @return null (if an interrupt) or an instance of E; resets interrupt on
calling thread.
+ */
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E>
queue) {
try {
return queue.take();
@@ -72,33 +81,42 @@ public final class DelayedUtil {
}
}
- public static long getRemainingTime(final TimeUnit resultUnit, final long
timeoutTime) {
+ /**
+ * @return Time remaining as milliseconds.
+ */
+ public static long getRemainingTime(final TimeUnit resultUnit, final long
timeout) {
final long currentTime = EnvironmentEdgeManager.currentTime();
- if (currentTime >= timeoutTime) {
+ if (currentTime >= timeout) {
return 0;
}
- return resultUnit.convert(timeoutTime - currentTime,
TimeUnit.MILLISECONDS);
+ return resultUnit.convert(timeout - currentTime, TimeUnit.MILLISECONDS);
}
public static int compareDelayed(final Delayed o1, final Delayed o2) {
- return Long.compare(getTimeoutTimestamp(o1), getTimeoutTimestamp(o2));
+ return Long.compare(getTimeout(o1), getTimeout(o2));
}
- private static long getTimeoutTimestamp(final Delayed o) {
+ private static long getTimeout(final Delayed o) {
assert o instanceof DelayedWithTimeout : "expected DelayedWithTimeout
instance, got " + o;
- return ((DelayedWithTimeout)o).getTimeoutTimestamp();
+ return ((DelayedWithTimeout)o).getTimeout();
}
public static abstract class DelayedObject implements DelayedWithTimeout {
@Override
public long getDelay(final TimeUnit unit) {
- return DelayedUtil.getRemainingTime(unit, getTimeoutTimestamp());
+ return DelayedUtil.getRemainingTime(unit, getTimeout());
}
@Override
public int compareTo(final Delayed other) {
return DelayedUtil.compareDelayed(this, other);
}
+
+ @Override
+ public String toString() {
+ long timeout = getTimeout();
+ return "timeout=" + timeout + ", delay=" +
getDelay(TimeUnit.MILLISECONDS);
+ }
}
public static abstract class DelayedContainer<T> extends DelayedObject {
@@ -126,25 +144,25 @@ public final class DelayedUtil {
@Override
public String toString() {
- return getClass().getSimpleName() + "(" + getObject() + ")";
+ return "containedObject=" + getObject() + ", " + super.toString();
}
}
public static class DelayedContainerWithTimestamp<T> extends
DelayedContainer<T> {
- private long timeoutTimestamp;
+ private long timeout;
- public DelayedContainerWithTimestamp(final T object, final long
timeoutTimestamp) {
+ public DelayedContainerWithTimestamp(final T object, final long timeout) {
super(object);
- setTimeoutTimestamp(timeoutTimestamp);
+ setTimeout(timeout);
}
@Override
- public long getTimeoutTimestamp() {
- return timeoutTimestamp;
+ public long getTimeout() {
+ return timeout;
}
- public void setTimeoutTimestamp(final long timeoutTimestamp) {
- this.timeoutTimestamp = timeoutTimestamp;
+ public void setTimeout(final long timeout) {
+ this.timeout = timeout;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 226666f..0240465 100644
---
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -18,12 +18,16 @@
package org.apache.hadoop.hbase.procedure2;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.concurrent.Callable;
import java.util.ArrayList;
import java.util.Set;
+import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,18 +39,14 @@ import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class ProcedureTestingUtility {
private static final Log LOG =
LogFactory.getLog(ProcedureTestingUtility.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
----------------------------------------------------------------------
diff --git
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
index a2cd70f..019b456 100644
---
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
+++
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestDelayedUtil.java
@@ -80,7 +80,7 @@ public class TestDelayedUtil {
}
@Override
- public long getTimeoutTimestamp() {
+ public long getTimeout() {
return 0;
}
}