Repository: hbase Updated Branches: refs/heads/master 8fb9a91d4 -> 992e5717d
http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-protocol-shaded/src/main/protobuf/Master.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index f4e7da6..0a000ee 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -568,6 +568,27 @@ message SecurityCapabilitiesResponse { repeated Capability capabilities = 1; } +message ListDrainingRegionServersRequest { +} + +message ListDrainingRegionServersResponse { + repeated ServerName server_name = 1; +} + +message DrainRegionServersRequest { + repeated ServerName server_name = 1; +} + +message DrainRegionServersResponse { +} + +message RemoveDrainFromRegionServersRequest { + repeated ServerName server_name = 1; +} + +message RemoveDrainFromRegionServersResponse { +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -863,4 +884,16 @@ service MasterService { /** Disable a replication peer */ rpc DisableReplicationPeer(DisableReplicationPeerRequest) returns(DisableReplicationPeerResponse); + + /** Returns a list of ServerNames marked as draining. */ + rpc listDrainingRegionServers(ListDrainingRegionServersRequest) + returns(ListDrainingRegionServersResponse); + + /** Mark a list of ServerNames as draining. */ + rpc drainRegionServers(DrainRegionServersRequest) + returns(DrainRegionServersResponse); + + /** Unmark a list of ServerNames marked as draining. */ + rpc removeDrainFromRegionServers(RemoveDrainFromRegionServersRequest) + returns(RemoveDrainFromRegionServersResponse); } http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6b135d9..613c5c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3196,4 +3196,54 @@ public class HMaster extends HRegionServer implements MasterServices { cpHost.postDisableReplicationPeer(peerId); } } + + @Override + public void drainRegionServer(final ServerName server) { + String parentZnode = getZooKeeper().znodePaths.drainingZNode; + try { + String node = ZKUtil.joinZNode(parentZnode, server.getServerName()); + ZKUtil.createAndFailSilent(getZooKeeper(), node); + } catch (KeeperException ke) { + LOG.warn(this.zooKeeper.prefix("Unable to add drain for '" + server.getServerName() + "'."), + ke); + } + } + + @Override + public List<ServerName> listDrainingRegionServers() { + String parentZnode = getZooKeeper().znodePaths.drainingZNode; + List<ServerName> serverNames = new ArrayList<ServerName>(); + List<String> serverStrs = null; + try { + serverStrs = ZKUtil.listChildrenNoWatch(getZooKeeper(), parentZnode); + } catch (KeeperException ke) { + LOG.warn(this.zooKeeper.prefix("Unable to list draining servers."), ke); + } + // No nodes is empty draining list or ZK connectivity issues. + if (serverStrs == null) { + return serverNames; + } + + // Skip invalid ServerNames in result + for (String serverStr : serverStrs) { + try { + serverNames.add(ServerName.parseServerName(serverStr)); + } catch (IllegalArgumentException iae) { + LOG.warn("Unable to cast '" + serverStr + "' to ServerName.", iae); + } + } + return serverNames; + } + + @Override + public void removeDrainFromRegionServer(ServerName server) { + String parentZnode = getZooKeeper().znodePaths.drainingZNode; + String node = ZKUtil.joinZNode(parentZnode, server.getServerName()); + try { + ZKUtil.deleteNodeFailSilent(getZooKeeper(), node); + } catch (KeeperException ke) { + LOG.warn( + this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 8ee72c6..76da838 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -1693,4 +1693,55 @@ public class MasterRpcServices extends RSRpcServices throw new ServiceException(e); } } + + @Override + public ListDrainingRegionServersResponse listDrainingRegionServers(RpcController controller, + ListDrainingRegionServersRequest request) throws ServiceException { + ListDrainingRegionServersResponse.Builder response = + ListDrainingRegionServersResponse.newBuilder(); + try { + master.checkInitialized(); + List<ServerName> servers = master.listDrainingRegionServers(); + for (ServerName server : servers) { + response.addServerName(ProtobufUtil.toServerName(server)); + } + } catch (IOException io) { + throw new ServiceException(io); + } + + return response.build(); + } + + @Override + public DrainRegionServersResponse drainRegionServers(RpcController controller, + DrainRegionServersRequest request) throws ServiceException { + DrainRegionServersResponse.Builder response = DrainRegionServersResponse.newBuilder(); + try { + master.checkInitialized(); + for (HBaseProtos.ServerName pbServer : request.getServerNameList()) { + master.drainRegionServer(ProtobufUtil.toServerName(pbServer)); + } + } catch (IOException io) { + throw new ServiceException(io); + } + + return response.build(); + } + + @Override + public RemoveDrainFromRegionServersResponse removeDrainFromRegionServers(RpcController controller, + RemoveDrainFromRegionServersRequest request) throws ServiceException { + RemoveDrainFromRegionServersResponse.Builder response = + RemoveDrainFromRegionServersResponse.newBuilder(); + try { + master.checkInitialized(); + for (HBaseProtos.ServerName pbServer : request.getServerNameList()) { + master.removeDrainFromRegionServer(ProtobufUtil.toServerName(pbServer)); + } + } catch (IOException io) { + throw new ServiceException(io); + } + + return response.build(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index a7395bb..869e7ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; @@ -443,4 +444,23 @@ public interface MasterServices extends Server { * @param peerId a short name that identifies the peer */ void disableReplicationPeer(String peerId) throws ReplicationException, IOException; + + /** + * Mark a region server as draining to prevent additional regions from getting assigned to it. + * @param server Region servers to drain. + */ + void drainRegionServer(final ServerName server); + + /** + * List region servers marked as draining to not get additional regions assigned to them. + * @return List of draining servers. + */ + List<ServerName> listDrainingRegionServers(); + + /** + * Remove drain from a region server to allow additional regions assignments. + * @param server Region server to remove drain from. + */ + void removeDrainFromRegionServer(final ServerName server); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index b1cf1d2..62fde74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -24,8 +24,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -758,4 +764,75 @@ public class TestAdmin2 { ProcedureInfo[] procList = admin.listProcedures(); assertTrue(procList.length >= 0); } + + /* + * Test that invalid draining server names (invalid start code) don't get added to drain list. + */ + @Test(timeout = 10000, expected = IllegalArgumentException.class) + public void testCheckDrainServerName() throws Exception { + List<ServerName> servers = new ArrayList<ServerName>(); + servers.add(ServerName.parseServerName("127.0.0.1:123")); + admin.drainRegionServers(servers); + } + + /* + * This test drains all regions so cannot be run in parallel with other tests. + */ + @Test(timeout = 30000) + public void testDrainRegionServers() throws Exception { + List<ServerName> drainingServers = admin.listDrainingRegionServers(); + assertTrue(drainingServers.isEmpty()); + + // Drain all region servers. + Collection<ServerName> clusterServers = admin.getClusterStatus().getServers(); + drainingServers = new ArrayList<ServerName>(); + for (ServerName server : clusterServers) { + drainingServers.add(server); + } + admin.drainRegionServers(drainingServers); + + // Check that drain lists all region servers. + drainingServers = admin.listDrainingRegionServers(); + assertEquals(clusterServers.size(), drainingServers.size()); + for (ServerName server : clusterServers) { + assertTrue(drainingServers.contains(server)); + } + + // Try for 20 seconds to create table (new region). Will not complete because all RSs draining. + TableName hTable = TableName.valueOf("testDrainRegionServer"); + final HTableDescriptor htd = new HTableDescriptor(hTable); + htd.addFamily(new HColumnDescriptor("cf")); + + final Runnable createTable = new Thread() { + @Override + public void run() { + try { + admin.createTable(htd); + } catch (IOException ioe) { + assertTrue(false); // Should not get IOException. + } + } + }; + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final java.util.concurrent.Future<?> future = executor.submit(createTable); + executor.shutdown(); + try { + future.get(20, TimeUnit.SECONDS); + } catch (TimeoutException ie) { + assertTrue(true); // Expecting timeout to happen. + } + + // Kill executor if still processing. + if (!executor.isTerminated()) { + executor.shutdownNow(); + assertTrue(true); + } + + // Remove drain list. + admin.removeDrainFromRegionServers(drainingServers); + drainingServers = admin.listDrainingRegionServers(); + assertTrue(drainingServers.isEmpty()); + + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 4e85d29..2a5be12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -399,4 +399,19 @@ public class MockNoopMasterServices implements MasterServices, Server { @Override public void disableReplicationPeer(String peerId) throws ReplicationException, IOException { } + + @Override + public void drainRegionServer(ServerName server) { + return; + } + + @Override + public List<ServerName> listDrainingRegionServers() { + return null; + } + + @Override + public void removeDrainFromRegionServer(ServerName servers) { + return; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/992e5717/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java index 7326327..485c1f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperACL.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import javax.security.auth.login.AppConfigurationEntry; @@ -319,5 +320,25 @@ public class TestZooKeeperACL { } } + @Test(timeout = 10000) + public void testAdminDrainAllowedOnSecureZK() throws Exception { + if (!secureZKAvailable) { + return; + } + List<ServerName> drainingServers = new ArrayList<ServerName>(); + drainingServers.add(ServerName.parseServerName("ZZZ,123,123")); + + // If unable to connect to secure ZK cluster then this operation would fail. + TEST_UTIL.getAdmin().drainRegionServers(drainingServers); + + drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers(); + assertEquals(1, drainingServers.size()); + assertEquals(ServerName.parseServerName("ZZZ,123,123"), drainingServers.get(0)); + + TEST_UTIL.getAdmin().removeDrainFromRegionServers(drainingServers); + drainingServers = TEST_UTIL.getAdmin().listDrainingRegionServers(); + assertEquals(0, drainingServers.size()); + } + }