Repository: hbase
Updated Branches:
  refs/heads/master 37b29e909 -> 456057ef9


HBASE-18846 Accommodate the hbase-indexer/lily/SEP consumer deploy-type

Patch to start a standalone RegionServer that register's itself and
optionally stands up Services. Can work w/o a Master in the mix.
Useful testing. Also can be used by hbase-indexer to put up a
Replication sink that extends public-facing APIs w/o need to extend
internals. See JIRA release note for detail.

This patch adds booleans for whether to start Admin and Client Service.
Other refactoring moves all thread and service start into the one fat
location so we can ask to by-pass 'services' if we don't need them.
See JIRA for an example hbase-server.xml that has config to shutdown
WAL, cache, etc.

Adds checks if a service/thread has been setup before going to use it.

Renames the ExecutorService in HRegionServer from service to
executorService.

See JIRA too for example Connection implementation that makes use of
Connection plugin point to receive a replication stream. The default
replication sink catches the incoming replication stream, undoes the
WALEdits and then creates a Table to call a batch with the
edits; up on JIRA, an example Connection plugin (legit, supported)
returns a Table with an overridden batch method where in we do index
inserts returning appropriate results to keep the replication engine
ticking over.

Upsides: an unadulterated RegionServer that will keep replication metrics
and even hosts a web UI if wanted. No hacks. Just ordained configs
shutting down unused services. Injection of the indexing function at a
blessed point with no pollution by hbase internals; only public imports.
No user of Private nor LimitedPrivate classes.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/456057ef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/456057ef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/456057ef

Branch: refs/heads/master
Commit: 456057ef90f152315a7f244141f3fca4ff748336
Parents: 37b29e9
Author: Michael Stack <[email protected]>
Authored: Tue Sep 26 22:27:58 2017 -0700
Committer: Michael Stack <[email protected]>
Committed: Mon Oct 23 21:16:13 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/master/HMaster.java | 114 +--
 .../hbase/regionserver/HRegionServer.java       | 705 ++++++++++---------
 .../hbase/regionserver/RSRpcServices.java       |  70 +-
 .../regionserver/TestRegionServerNoMaster.java  |   2 +-
 4 files changed, 489 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/456057ef/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index cbb1537..8f2ae6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -99,7 +99,6 @@ import 
org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@@ -472,66 +471,73 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   public HMaster(final Configuration conf, CoordinatedStateManager csm)
       throws IOException, KeeperException {
     super(conf, csm);
-    this.rsFatals = new MemoryBoundedLogMessageBuffer(
-      conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
+    try {
+      this.rsFatals = new MemoryBoundedLogMessageBuffer(
+          conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
 
-    LOG.info("hbase.rootdir=" + getRootDir() +
-      ", hbase.cluster.distributed=" + 
this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
+      LOG.info("hbase.rootdir=" + getRootDir() +
+          ", hbase.cluster.distributed=" + 
this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
 
-    // Disable usage of meta replicas in the master
-    this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
+      // Disable usage of meta replicas in the master
+      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
 
-    Replication.decorateMasterConfiguration(this.conf);
+      Replication.decorateMasterConfiguration(this.conf);
 
-    // Hack! Maps DFSClient => Master for logs.  HDFS made this
-    // config param for task trackers, but we can piggyback off of it.
-    if (this.conf.get("mapreduce.task.attempt.id") == null) {
-      this.conf.set("mapreduce.task.attempt.id", "hb_m_" + 
this.serverName.toString());
-    }
+      // Hack! Maps DFSClient => Master for logs.  HDFS made this
+      // config param for task trackers, but we can piggyback off of it.
+      if (this.conf.get("mapreduce.task.attempt.id") == null) {
+        this.conf.set("mapreduce.task.attempt.id", "hb_m_" + 
this.serverName.toString());
+      }
 
-    // should we check the compression codec type at master side, default 
true, HBASE-6370
-    this.masterCheckCompression = 
conf.getBoolean("hbase.master.check.compression", true);
+      // should we check the compression codec type at master side, default 
true, HBASE-6370
+      this.masterCheckCompression = 
conf.getBoolean("hbase.master.check.compression", true);
 
-    // should we check encryption settings at master side, default true
-    this.masterCheckEncryption = 
conf.getBoolean("hbase.master.check.encryption", true);
+      // should we check encryption settings at master side, default true
+      this.masterCheckEncryption = 
conf.getBoolean("hbase.master.check.encryption", true);
 
-    this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
+      this.metricsMaster = new MetricsMaster(new 
MetricsMasterWrapperImpl(this));
 
-    // preload table descriptor at startup
-    this.preLoadTableDescriptors = 
conf.getBoolean("hbase.master.preload.tabledescriptors", true);
+      // preload table descriptor at startup
+      this.preLoadTableDescriptors = 
conf.getBoolean("hbase.master.preload.tabledescriptors", true);
 
-    this.maxBlancingTime = getMaxBalancingTime();
-    this.maxRitPercent = 
conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
-      HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
+      this.maxBlancingTime = getMaxBalancingTime();
+      this.maxRitPercent = 
conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
+          HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
 
-    // Do we publish the status?
+      // Do we publish the status?
 
-    boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
-        HConstants.STATUS_PUBLISHED_DEFAULT);
-    Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
-        conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
-            ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
-            ClusterStatusPublisher.Publisher.class);
+      boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
+          HConstants.STATUS_PUBLISHED_DEFAULT);
+      Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
+          conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
+              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
+              ClusterStatusPublisher.Publisher.class);
 
-    if (shouldPublish) {
-      if (publisherClass == null) {
-        LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
-            ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
-            " is not set - not publishing status");
-      } else {
-        clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, 
publisherClass);
-        getChoreService().scheduleChore(clusterStatusPublisherChore);
+      if (shouldPublish) {
+        if (publisherClass == null) {
+          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
+              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
+              " is not set - not publishing status");
+        } else {
+          clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, 
publisherClass);
+          getChoreService().scheduleChore(clusterStatusPublisherChore);
+        }
       }
-    }
 
-    // Some unit tests don't need a cluster, so no zookeeper at all
-    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
-      setInitLatch(new CountDownLatch(1));
-      activeMasterManager = new ActiveMasterManager(zooKeeper, 
this.serverName, this);
-      int infoPort = putUpJettyServer();
-      startActiveMasterManager(infoPort);
-    } else {
-      activeMasterManager = null;
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        setInitLatch(new CountDownLatch(1));
+        activeMasterManager = new ActiveMasterManager(zooKeeper, 
this.serverName, this);
+        int infoPort = putUpJettyServer();
+        startActiveMasterManager(infoPort);
+      } else {
+        activeMasterManager = null;
+      }
+    } catch (Throwable t) {
+      // Make sure we log the exception. HMaster is often started via 
reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction of Master", t);
+      throw t;
     }
   }
 
@@ -1100,15 +1106,15 @@ public class HMaster extends HRegionServer implements 
MasterServices {
    */
   private void startServiceThreads() throws IOException{
    // Start the executor service pools
-   this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
+   this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
       conf.getInt("hbase.master.executor.openregion.threads", 5));
-   this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
+   this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
       conf.getInt("hbase.master.executor.closeregion.threads", 5));
-   this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
+   
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
       conf.getInt("hbase.master.executor.serverops.threads", 5));
-   
this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
+   
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
-   this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
+   this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
 
    // We depend on there being only one instance of this executor running
@@ -1116,7 +1122,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
    // tables.
    // Any time changing this maxThreads to > 1, pls see the comment at
    // AccessController#postCompletedCreateTableAction
-   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+   
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 
1);
    startProcedureExecutor();
 
    // Start log cleaner thread

http://git-wip-us.apache.org/repos/asf/hbase/blob/456057ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2c0bd03..7ed5360 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -234,16 +233,10 @@ public class HRegionServer extends HasThread implements
   /**
    * For testing only!  Set to true to skip notifying region assignment to 
master .
    */
+  @VisibleForTesting
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
 
-  /*
-   * Strings to be used in forming the exception message for
-   * RegionsAlreadyInTransitionException.
-   */
-  protected static final String OPEN = "OPEN";
-  protected static final String CLOSE = "CLOSE";
-
   //RegionName vs current action in progress
   //true - if open region action in progress
   //false - if close region action in progress
@@ -315,8 +308,8 @@ public class HRegionServer extends HasThread implements
   // Leases
   protected Leases leases;
 
-  // Instance of the hbase executor service.
-  protected ExecutorService service;
+  // Instance of the hbase executor executorService.
+  protected ExecutorService executorService;
 
   // If false, the file system has become unavailable
   protected volatile boolean fsOk;
@@ -380,7 +373,7 @@ public class HRegionServer extends HasThread implements
   /**
    * ChoreService used to schedule tasks that we want to run periodically
    */
-  private final ChoreService choreService;
+  private ChoreService choreService;
 
   /*
    * Check for compactions requests.
@@ -396,7 +389,7 @@ public class HRegionServer extends HasThread implements
 
   // WAL roller. log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
-  protected final LogRoller walRoller;
+  protected LogRoller walRoller;
 
   // flag set after we're done setting up server threads
   final AtomicBoolean online = new AtomicBoolean(false);
@@ -533,6 +526,15 @@ public class HRegionServer extends HasThread implements
   private final NettyEventLoopGroupConfig eventLoopGroupConfig;
 
   /**
+   * True if this RegionServer is coming up in a cluster where there is no 
Master;
+   * means it needs to just come up and make do without a Master to talk to: 
e.g. in test or
+   * HRegionServer is doing other than its usual duties: e.g. as an 
hollowed-out host whose only
+   * purpose is as a Replication-stream sink; see HBASE-18846 for more.
+   */
+  private final boolean masterless;
+  static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
+
+  /**
    * Starts a HRegionServer at the default location.
    */
   public HRegionServer(Configuration conf) throws IOException, 
InterruptedException {
@@ -541,145 +543,156 @@ public class HRegionServer extends HasThread implements
 
   /**
    * Starts a HRegionServer at the default location
+   *
    * @param csm implementation of CoordinatedStateManager to be used
    */
+  // Don't start any services or managers in here in the Consructor.
+  // Defer till after we register with the Master as much as possible. See 
#startServices.
   public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws 
IOException {
     super("RegionServer");  // thread name
-    this.startcode = System.currentTimeMillis();
-    this.fsOk = true;
-    this.conf = conf;
-    // initialize netty event loop group at the very beginning as we may use 
it to start rpc server,
-    // rpc client and WAL.
-    this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, 
"RS-EventLoopGroup");
-    NettyRpcClientConfigHelper.setEventLoopConfig(conf, 
eventLoopGroupConfig.group(),
-      eventLoopGroupConfig.clientChannelClass());
-    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, 
eventLoopGroupConfig.group(),
-      eventLoopGroupConfig.clientChannelClass());
-    MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
-    HFile.checkHFileVersion(this.conf);
-    checkCodecs(this.conf);
-    this.userProvider = UserProvider.instantiate(conf);
-    FSUtils.setupShortCircuitRead(this.conf);
-
-    Replication.decorateRegionServerConfiguration(this.conf);
-
-    // Disable usage of meta replicas in the regionserver
-    this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
-    // Config'ed params
-    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 
10 * 1000);
-    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
-
-    this.sleeper = new Sleeper(this.msgInterval, this);
-
-    boolean isNoncesEnabled = 
conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
-    this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : 
null;
-
-    this.numRegionsToReport = conf.getInt(
-      "hbase.regionserver.numregionstoreport", 10);
-
-    this.operationTimeout = conf.getInt(
-      HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-
-    this.shortOperationTimeout = conf.getInt(
-      HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
-      HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
-
-    this.abortRequested = false;
-    this.stopped = false;
-
-    rpcServices = createRpcServices();
-    if (this instanceof HMaster) {
-      useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
-    } else {
-      useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
-      if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
-        if (shouldUseThisHostnameInstead()) {
-          String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + 
RS_HOSTNAME_KEY +
-            " are mutually exclusive. Do not set " + 
RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
-            " to true while " + RS_HOSTNAME_KEY + " is used";
-          throw new IOException(msg);
-        } else {
-          useThisHostnameInstead = rpcServices.isa.getHostName();
+    try {
+      this.startcode = System.currentTimeMillis();
+      this.conf = conf;
+      this.fsOk = true;
+      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
+      this.eventLoopGroupConfig = setupNetty(this.conf);
+      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
+      HFile.checkHFileVersion(this.conf);
+      checkCodecs(this.conf);
+      this.userProvider = UserProvider.instantiate(conf);
+      FSUtils.setupShortCircuitRead(this.conf);
+      Replication.decorateRegionServerConfiguration(this.conf);
+
+      // Disable usage of meta replicas in the regionserver
+      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
+      // Config'ed params
+      this.numRetries = 
this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 
10 * 1000);
+      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 
1000);
+
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      boolean isNoncesEnabled = 
conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
+      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) 
: null;
+
+      this.numRegionsToReport = 
conf.getInt("hbase.regionserver.numregionstoreport", 10);
+
+      this.operationTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+
+      this.shortOperationTimeout = 
conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
+
+      this.abortRequested = false;
+      this.stopped = false;
+
+      rpcServices = createRpcServices();
+      if (this instanceof HMaster) {
+        useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
+      } else {
+        useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
+        if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) 
{
+          if (shouldUseThisHostnameInstead()) {
+            String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + 
RS_HOSTNAME_KEY +
+                " are mutually exclusive. Do not set " + 
RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
+                " to true while " + RS_HOSTNAME_KEY + " is used";
+            throw new IOException(msg);
+          } else {
+            useThisHostnameInstead = rpcServices.isa.getHostName();
+          }
         }
       }
-    }
-    String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
-      rpcServices.isa.getHostName();
-    serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), 
startcode);
-
-    rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
-    rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
-
-    // login the zookeeper client principal (if using security)
-    ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
-      HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
-    // login the server principal (if using secure Hadoop)
-    login(userProvider, hostName);
-    // init superusers and add the server principal (if using security)
-    // or process owner as default super user.
-    Superusers.initialize(conf);
-
-    regionServerAccounting = new RegionServerAccounting(conf);
-    cacheConfig = new CacheConfig(conf);
-    mobCacheConfig = new MobCacheConfig(conf);
-    uncaughtExceptionHandler = new UncaughtExceptionHandler() {
-      @Override
-      public void uncaughtException(Thread t, Throwable e) {
-        abort("Uncaught exception in service thread " + t.getName(), e);
-      }
-    };
+      String hostName = shouldUseThisHostnameInstead() ?
+          this.useThisHostnameInstead : this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, 
this.rpcServices.isa.getPort(), this.startcode);
+
+      rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
+      rpcRetryingCallerFactory = 
RpcRetryingCallerFactory.instantiate(this.conf);
+
+      // login the zookeeper client principal (if using security)
+      ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
+          HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
+      // login the server principal (if using secure Hadoop)
+      login(userProvider, hostName);
+      // init superusers and add the server principal (if using security)
+      // or process owner as default super user.
+      Superusers.initialize(conf);
+
+      regionServerAccounting = new RegionServerAccounting(conf);
+      cacheConfig = new CacheConfig(conf);
+      mobCacheConfig = new MobCacheConfig(conf);
+      uncaughtExceptionHandler = new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+          abort("Uncaught exception in executorService thread " + t.getName(), 
e);
+        }
+      };
 
-    initializeFileSystem();
+      initializeFileSystem();
+      spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
 
-    service = new ExecutorService(getServerName().toShortString());
-    spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
+      this.configurationManager = new ConfigurationManager();
+      setupWindows(getConfiguration(), getConfigurationManager());
 
-    // Some unit tests don't need a cluster, so no zookeeper at all
-    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
-      // Open connection to zookeeper and set primary watcher
-      zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
-        rpcServices.isa.getPort(), this, canCreateBaseZNode());
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
+          rpcServices.isa.getPort(), this, canCreateBaseZNode());
 
-      this.csm = (BaseCoordinatedStateManager) csm;
-      this.csm.initialize(this);
-      this.csm.start();
+        // If no master in cluster, skip trying to track one or look for a 
cluster status.
+        if (!this.masterless) {
+          this.csm = (BaseCoordinatedStateManager) csm;
+          this.csm.initialize(this);
+          this.csm.start();
 
-      masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
-      masterAddressTracker.start();
+          masterAddressTracker = new MasterAddressTracker(getZooKeeper(), 
this);
+          masterAddressTracker.start();
 
-      clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
-      clusterStatusTracker.start();
+          clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
+          clusterStatusTracker.start();
+        }
+      }
+      // This violates 'no starting stuff in Constructor' but Master depends 
on the below chore
+      // and executor being created and takes a different startup route. Lots 
of overlap between HRS
+      // and M (An M IS A HRS now). Need to refactor so less duplication 
between M and its super
+      // Master expects Constructor to put up web servers. Ugh.
+      // class HRS. TODO.
+      this.choreService = new ChoreService(getServerName().toString(), true);
+      this.executorService = new 
ExecutorService(getServerName().toShortString());
+      this.rpcServices.start();
+      putUpWebUI();
+    } catch (Throwable t) {
+      // Make sure we log the exception. HRegionServer is often started via 
reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction RegionServer", t);
+      throw t;
     }
-    this.configurationManager = new ConfigurationManager();
-
-    rpcServices.start();
-    putUpWebUI();
-    this.walRoller = new LogRoller(this, this);
-    this.choreService = new ChoreService(getServerName().toString(), true);
-    this.flushThroughputController = 
FlushThroughputControllerFactory.create(this, conf);
+  }
 
+  /**
+   * If running on Windows, do windows-specific setup.
+   */
+  private static void setupWindows(final Configuration conf, 
ConfigurationManager cm) {
     if (!SystemUtils.IS_OS_WINDOWS) {
       Signal.handle(new Signal("HUP"), new SignalHandler() {
         @Override
         public void handle(Signal signal) {
-          getConfiguration().reloadConfiguration();
-          configurationManager.notifyAllObservers(getConfiguration());
+          conf.reloadConfiguration();
+          cm.notifyAllObservers(conf);
         }
       });
     }
-    // Create the CompactedFileDischarger chore service. This chore helps to
-    // remove the compacted files
-    // that will no longer be used in reads.
-    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set 
this to
-    // 2 mins so that compacted files can be archived before the TTLCleaner 
runs
-    int cleanerInterval =
-        conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 
1000);
-    this.compactedFileDischarger =
-        new CompactedHFilesDischarger(cleanerInterval, this, this);
-    choreService.scheduleChore(compactedFileDischarger);
+  }
+
+  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
+    // Initialize netty event loop group at start as we may use it for rpc 
server, rpc client & WAL.
+    NettyEventLoopGroupConfig nelgc =
+      new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
+    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), 
nelgc.clientChannelClass());
+    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), 
nelgc.clientChannelClass());
+    return nelgc;
   }
 
   private void initializeFileSystem() throws IOException {
@@ -723,7 +736,7 @@ public class HRegionServer extends HasThread implements
       "hbase.regionserver.kerberos.principal", host);
   }
 
-  protected void waitForMasterActive(){
+  protected void waitForMasterActive() {
   }
 
   protected String getProcessName() {
@@ -731,7 +744,7 @@ public class HRegionServer extends HasThread implements
   }
 
   protected boolean canCreateBaseZNode() {
-    return false;
+    return this.masterless;
   }
 
   protected boolean canUpdateTableDescriptor() {
@@ -754,28 +767,28 @@ public class HRegionServer extends HasThread implements
   @Override
   public boolean registerService(com.google.protobuf.Service instance) {
     /*
-     * No stacking of instances is allowed for a single service name
+     * No stacking of instances is allowed for a single executorService name
      */
     com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
         instance.getDescriptorForType();
     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
     if (coprocessorServiceHandlers.containsKey(serviceName)) {
-      LOG.error("Coprocessor service " + serviceName
+      LOG.error("Coprocessor executorService " + serviceName
           + " already registered, rejecting request from " + instance);
       return false;
     }
 
     coprocessorServiceHandlers.put(serviceName, instance);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Registered regionserver coprocessor service: service=" + 
serviceName);
+      LOG.debug("Registered regionserver coprocessor executorService: 
executorService=" + serviceName);
     }
     return true;
   }
 
   /**
    * Create a 'smarter' Connection, one that is capable of by-passing RPC if 
the request is to
-   * the local server. Safe to use going to local or remote server.
-   * Create this instance in a method can be intercepted and mocked in tests.
+   * the local server; i.e. a short-circuit Connection. Safe to use going to 
local or remote
+   * server. Create this instance in a method can be intercepted and mocked in 
tests.
    * @throws IOException
    */
   @VisibleForTesting
@@ -821,28 +834,19 @@ public class HRegionServer extends HasThread implements
 
   /**
    * All initialization needed before we go register with Master.
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the 
Master.
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
    *
    * @throws IOException
    * @throws InterruptedException
    */
-  private void preRegistrationInitialization(){
+  private void preRegistrationInitialization() {
     try {
       setupClusterConnection();
-
-      this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, 
clusterConnection);
-      this.secureBulkLoadManager.start();
-
-      // Health checker thread.
-      if (isHealthCheckerConfigured()) {
-        int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
-          HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
-        healthCheckChore = new HealthCheckChore(sleepTime, this, 
getConfiguration());
-      }
-
       initializeZooKeeper();
-      if (!isStopped() && !isAborted()) {
-        initializeThreads();
-      }
+      // Setup RPC client for master communication
+      this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new 
InetSocketAddress(
+          this.rpcServices.isa.getAddress(), 0), 
clusterConnection.getConnectionMetrics());
     } catch (Throwable t) {
       // Call stop if error or process will stick around for ever since server
       // puts up non-daemon threads.
@@ -862,6 +866,9 @@ public class HRegionServer extends HasThread implements
   
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
     justification="cluster Id znode read would give us correct response")
   private void initializeZooKeeper() throws IOException, InterruptedException {
+    // Nothing to do in here if no Master in the mix.
+    if (this.masterless) return;
+
     // Create the master address tracker, register with zk, and start it.  Then
     // block until a master is available.  No point in starting up if no master
     // running.
@@ -908,7 +915,7 @@ public class HRegionServer extends HasThread implements
   }
 
   
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED",
-      justification="We don't care about the return")
+    justification="We don't care about the return")
   private void doLatch(final CountDownLatch latch) throws InterruptedException 
{
     if (latch != null) {
       // Result is ignored intentionally but if I remove the below, findbugs 
complains (the
@@ -934,67 +941,11 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
-   * @return False if cluster shutdown in progress
+   * @return True if the cluster is up.
    */
   private boolean isClusterUp() {
-    return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
-  }
-
-  private void initializeThreads() throws IOException {
-    // Cache flushing thread.
-    this.cacheFlusher = new MemStoreFlusher(conf, this);
-
-    // Compaction thread
-    this.compactSplitThread = new CompactSplit(this);
-
-    // Background thread to check for compactions; needed if region has not 
gotten updates
-    // in a while. It will take care of not checking too frequently on 
store-by-store basis.
-    this.compactionChecker = new CompactionChecker(this, 
this.threadWakeFrequency, this);
-    this.periodicFlusher = new 
PeriodicMemStoreFlusher(this.threadWakeFrequency, this);
-    this.leases = new Leases(this.threadWakeFrequency);
-
-    // Create the thread to clean the moved regions list
-    movedRegionsCleaner = MovedRegionsCleaner.create(this);
-
-    if (this.nonceManager != null) {
-      // Create the scheduled chore that cleans up nonces.
-      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
-    }
-
-    // Setup the Quota Manager
-    rsQuotaManager = new RegionServerRpcQuotaManager(this);
-    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
-
-    if (QuotaUtil.isQuotaEnabled(conf)) {
-      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
-    }
-
-    // Setup RPC client for master communication
-    rpcClient = RpcClientFactory.createClient(conf, clusterId, new 
InetSocketAddress(
-        rpcServices.isa.getAddress(), 0), 
clusterConnection.getConnectionMetrics());
-
-    boolean onlyMetaRefresh = false;
-    int storefileRefreshPeriod = conf.getInt(
-        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
-      , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
-    if (storefileRefreshPeriod == 0) {
-      storefileRefreshPeriod = conf.getInt(
-          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
-          
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
-      onlyMetaRefresh = true;
-    }
-    if (storefileRefreshPeriod > 0) {
-      this.storefileRefresher = new 
StorefileRefresherChore(storefileRefreshPeriod,
-          onlyMetaRefresh, this, this);
-    }
-    registerConfigurationObservers();
-  }
-
-  private void registerConfigurationObservers() {
-    // Registering the compactSplitThread object with the ConfigurationManager.
-    configurationManager.registerObserver(this.compactSplitThread);
-    configurationManager.registerObserver(this.rpcServices);
-    configurationManager.registerObserver(this);
+    return this.masterless ||
+        this.clusterStatusTracker != null && 
this.clusterStatusTracker.isClusterUp();
   }
 
   /**
@@ -1019,6 +970,7 @@ public class HRegionServer extends HasThread implements
 
       // Try and register with the Master; tell it we are here.  Break if
       // server is stopped or the clusterup flag is down or hdfs went wacky.
+      // Once registered successfully, go ahead and start up all Services.
       while (keepLooping()) {
         RegionServerStartupResponse w = reportForDuty();
         if (w == null) {
@@ -1030,14 +982,19 @@ public class HRegionServer extends HasThread implements
         }
       }
 
-      if (!isStopped() && isHealthy()){
+      if (!isStopped() && isHealthy()) {
         // start the snapshot handler and other procedure handlers,
         // since the server is ready to run
-        rspmHost.start();
-
+        if (this.rspmHost != null) {
+          this.rspmHost.start();
+        }
         // Start the Quota Manager
-        rsQuotaManager.start(getRpcServer().getScheduler());
-        rsSpaceQuotaManager.start();
+        if (this.rsQuotaManager != null) {
+          rsQuotaManager.start(getRpcServer().getScheduler());
+        }
+        if (this.rsSpaceQuotaManager != null) {
+          this.rsSpaceQuotaManager.start();
+        }
       }
 
       // We registered with the Master.  Go into run mode.
@@ -1176,8 +1133,7 @@ public class HRegionServer extends HasThread implements
 
     if (!this.killed && this.fsOk) {
       waitOnAllRegionsToClose(abortRequested);
-      LOG.info("stopping server " + this.serverName +
-        "; all regions closed.");
+      LOG.info("stopping server " + this.serverName + "; all regions closed.");
     }
 
     //fsOk flag may be changed when closing regions throws exception.
@@ -1223,8 +1179,7 @@ public class HRegionServer extends HasThread implements
     if (this.zooKeeper != null) {
       this.zooKeeper.close();
     }
-    LOG.info("stopping server " + this.serverName +
-      "; zookeeper connection closed.");
+    LOG.info("stopping server " + this.serverName + "; zookeeper connection 
closed.");
 
     LOG.info(Thread.currentThread().getName() + " exiting");
   }
@@ -1267,8 +1222,7 @@ public class HRegionServer extends HasThread implements
     ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, 
reportEndTime);
     try {
       RegionServerReportRequest.Builder request = 
RegionServerReportRequest.newBuilder();
-      ServerName sn = ServerName.parseVersionedServerName(
-        this.serverName.getVersionedBytes());
+      ServerName sn = 
ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
       request.setServer(ProtobufUtil.toServerName(sn));
       request.setLoad(sl);
       rss.regionServerReport(null, request.build());
@@ -1379,8 +1333,7 @@ public class HRegionServer extends HasThread implements
       maxMemory = usage.getMax();
     }
 
-    ClusterStatusProtos.ServerLoad.Builder serverLoad =
-      ClusterStatusProtos.ServerLoad.newBuilder();
+    ClusterStatusProtos.ServerLoad.Builder serverLoad = 
ClusterStatusProtos.ServerLoad.newBuilder();
     serverLoad.setNumberOfRequests((int) 
regionServerWrapper.getRequestsPerSecond());
     serverLoad.setTotalNumberOfRequests((int) 
regionServerWrapper.getTotalRequestCount());
     serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
@@ -1414,7 +1367,7 @@ public class HRegionServer extends HasThread implements
       serverLoad.setInfoServerPort(-1);
     }
 
-    // for the replicationLoad purpose. Only need to get from one service
+    // for the replicationLoad purpose. Only need to get from one 
executorService
     // either source or sink will get the same info
     ReplicationSourceService rsources = getReplicationSourceService();
 
@@ -1472,12 +1425,12 @@ public class HRegionServer extends HasThread implements
         // iterator of onlineRegions to close all user regions.
         for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
           RegionInfo hri = e.getValue().getRegionInfo();
-          if 
(!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
-              && !closedRegions.contains(hri.getEncodedName())) {
+          if 
(!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
+              !closedRegions.contains(hri.getEncodedName())) {
             closedRegions.add(hri.getEncodedName());
             // Don't update zk with this close transition; pass false.
             closeRegionIgnoreErrors(hri, abort);
-              }
+          }
         }
         // No regions in RIT, we could stop waiting now.
         if (this.regionsInTransitionInRS.isEmpty()) {
@@ -1539,8 +1492,8 @@ public class HRegionServer extends HasThread implements
         // The hostname the master sees us as.
         if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
           String hostnameFromMasterPOV = e.getValue();
-          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
-            rpcServices.isa.getPort(), this.startcode);
+          this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 
rpcServices.isa.getPort(),
+              this.startcode);
           if (shouldUseThisHostnameInstead() &&
               !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
             String msg = "Master passed us a different hostname to use; was=" +
@@ -1580,13 +1533,13 @@ public class HRegionServer extends HasThread implements
       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
       // config param for task trackers, but we can piggyback off of it.
       if (this.conf.get("mapreduce.task.attempt.id") == null) {
-        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
-          this.serverName.toString());
+        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + 
this.serverName.toString());
       }
 
       // Save it in a file, this will allow to see if we crash
       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
 
+      // This call sets up an initialized replication and WAL. Later we start 
it up.
       setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metricsRegionServer = new MetricsRegionServer(new 
MetricsRegionServerWrapperImpl(this));
@@ -1595,14 +1548,19 @@ public class HRegionServer extends HasThread implements
       this.pauseMonitor = new JvmPauseMonitor(conf, 
getMetrics().getMetricsSource());
       pauseMonitor.start();
 
-      startServiceThreads();
-      startHeapMemoryManager();
-      // Call it after starting HeapMemoryManager.
-      initializeMemStoreChunkCreator();
-      LOG.info("Serving as " + this.serverName +
-        ", RpcServer on " + rpcServices.isa +
-        ", sessionid=0x" +
-        
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
+      // There is a rare case where we do NOT want services to start. Check 
config.
+      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
+        startServices();
+      }
+      // In here we start up the replication Service. Above we initialized it. 
TODO. Reconcile.
+      // or make sense of it.
+      startReplicationService();
+
+
+      // Set up ZK
+      LOG.info("Serving as " + this.serverName + ", RpcServer on " + 
rpcServices.isa +
+          ", sessionid=0x" +
+          
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
 
       // Wake up anyone waiting for this server to online
       synchronized (online) {
@@ -1627,15 +1585,15 @@ public class HRegionServer extends HasThread implements
       long globalMemStoreSize = pair.getFirst();
       boolean offheap = this.regionServerAccounting.isOffheap();
       // When off heap memstore in use, take full area for chunk pool.
-      float poolSizePercentage = offheap ? 1.0F
-          : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 
MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
+      float poolSizePercentage = offheap? 1.0F:
+          conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 
MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
       float initialCountPercentage = 
conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
           MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
       int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, 
MemStoreLAB.CHUNK_SIZE_DEFAULT);
       // init the chunkCreator
       ChunkCreator chunkCreator =
           ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, 
poolSizePercentage,
-            initialCountPercentage, this.hMemManager);
+      initialCountPercentage, this.hMemManager);
     }
   }
 
@@ -1652,8 +1610,7 @@ public class HRegionServer extends HasThread implements
     rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
     rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
     byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
-    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
-      getMyEphemeralNodePath(), data);
+    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, 
getMyEphemeralNodePath(), data);
   }
 
   private void deleteMyEphemeralNode() throws KeeperException {
@@ -1756,8 +1713,7 @@ public class HRegionServer extends HasThread implements
     // immediately upon region server startup
     private long iteration = 1;
 
-    CompactionChecker(final HRegionServer h, final int sleepTime,
-        final Stoppable stopper) {
+    CompactionChecker(final HRegionServer h, final int sleepTime, final 
Stoppable stopper) {
       super("CompactionChecker", stopper, sleepTime);
       this.instance = h;
       LOG.info(this.getName() + " runs every " + 
StringUtils.formatTime(sleepTime));
@@ -1766,8 +1722,8 @@ public class HRegionServer extends HasThread implements
        * If not set, the compaction will use default priority.
        */
       this.majorCompactPriority = this.instance.conf.
-        getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
-        DEFAULT_PRIORITY);
+          getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
+              DEFAULT_PRIORITY);
     }
 
     @Override
@@ -1793,12 +1749,13 @@ public class HRegionServer extends HasThread implements
               if (majorCompactPriority == DEFAULT_PRIORITY ||
                   majorCompactPriority > hr.getCompactPriority()) {
                 this.instance.compactSplitThread.requestCompaction(hr, s,
-                  getName() + " requests major compaction; use default 
priority", Store.NO_PRIORITY,
-                  CompactionLifeCycleTracker.DUMMY, null);
+                    getName() + " requests major compaction; use default 
priority",
+                    Store.NO_PRIORITY,
+                CompactionLifeCycleTracker.DUMMY, null);
               } else {
                 this.instance.compactSplitThread.requestCompaction(hr, s,
-                  getName() + " requests major compaction; use configured 
priority",
-                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, 
null);
+                    getName() + " requests major compaction; use configured 
priority",
+                    this.majorCompactPriority, 
CompactionLifeCycleTracker.DUMMY, null);
               }
             }
           } catch (IOException e) {
@@ -1844,7 +1801,7 @@ public class HRegionServer extends HasThread implements
 
   /**
    * Report the status of the server. A server is online once all the startup 
is
-   * completed (setting up filesystem, starting service threads, etc.). This
+   * completed (setting up filesystem, starting executorService threads, 
etc.). This
    * method is designed mostly to be useful in tests.
    *
    * @return true if online, false if not.
@@ -1856,6 +1813,7 @@ public class HRegionServer extends HasThread implements
   /**
    * Setup WAL log and replication if enabled.
    * Replication setup is done in here because it wants to be hooked up to WAL.
+   *
    * @throws IOException
    */
   private void setupWALAndReplication() throws IOException {
@@ -1867,11 +1825,11 @@ public class HRegionServer extends HasThread implements
     if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
     if (this.walFs.exists(logDir)) {
       throw new RegionServerRunningException("Region server has already " +
-        "created directory at " + this.serverName.toString());
+          "created directory at " + this.serverName.toString());
     }
 
-    // Instantiate replication manager if replication enabled.  Pass it the
-    // log directories.
+    // Instantiate replication if replication enabled.  Pass it the log 
directories.
+    // In here we create the Replication instances. Later they are initialized 
and started up.
     createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
 
     // listeners the wal factory will add to wals it creates.
@@ -1887,8 +1845,9 @@ public class HRegionServer extends HasThread implements
     // We use WALActionsListener to get the newly rolled WALs, so we need to 
get the
     // WALActionsListeners from ReplicationSourceHandler before constructing 
WALFactory. And then
     // ReplicationSourceHandler need to use WALFactory get the length of the 
wal file being written.
-    // So we here we need to construct WALFactory first, and then pass it to 
the initialize method
+    // So we here we need to construct WALFactory first, and then pass it to 
the initialized method
     // of ReplicationSourceHandler.
+    // TODO: I can't follow replication; it has initialize and then later on 
we start it!
     WALFactory factory = new WALFactory(conf, listeners, 
serverName.toString());
     this.walFactory = factory;
     if (this.replicationSourceHandler != null) {
@@ -1900,6 +1859,25 @@ public class HRegionServer extends HasThread implements
     }
   }
 
+  /**
+   * Start up replication source and sink handlers.
+   * @throws IOException
+   */
+  private void startReplicationService() throws IOException {
+    if (this.replicationSourceHandler == this.replicationSinkHandler &&
+        this.replicationSourceHandler != null) {
+      this.replicationSourceHandler.startReplicationService();
+    } else {
+      if (this.replicationSourceHandler != null) {
+        this.replicationSourceHandler.startReplicationService();
+      }
+      if (this.replicationSinkHandler != null) {
+        this.replicationSinkHandler.startReplicationService();
+      }
+    }
+  }
+
+
   public MetricsRegionServer getRegionServerMetrics() {
     return this.metricsRegionServer;
   }
@@ -1913,6 +1891,8 @@ public class HRegionServer extends HasThread implements
 
   /*
    * Start maintenance Threads, Server, Worker and lease checker threads.
+   * Start all threads we need to run. This is called after we've successfully
+   * registered with the Master.
    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
    * get an unhandled exception. We cannot set the handler on all threads.
    * Server's internal Listener thread is off limits. For Server, if an OOME, 
it
@@ -1923,35 +1903,62 @@ public class HRegionServer extends HasThread implements
    * keeps its own internal stop mechanism so needs to be stopped by this
    * hosting server. Worker logs the exception and exits.
    */
-  private void startServiceThreads() throws IOException {
+  private void startServices() throws IOException {
+    if (!isStopped() && !isAborted()) {
+      initializeThreads();
+    }
+    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, 
clusterConnection);
+    this.secureBulkLoadManager.start();
+
+    // Health checker thread.
+    if (isHealthCheckerConfigured()) {
+      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
+      HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
+      healthCheckChore = new HealthCheckChore(sleepTime, this, 
getConfiguration());
+    }
+
+    this.walRoller = new LogRoller(this, this);
+    this.flushThroughputController = 
FlushThroughputControllerFactory.create(this, conf);
+
+    // Create the CompactedFileDischarger chore executorService. This chore 
helps to
+    // remove the compacted files
+    // that will no longer be used in reads.
+    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set 
this to
+    // 2 mins so that compacted files can be archived before the TTLCleaner 
runs
+    int cleanerInterval =
+    conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
+    this.compactedFileDischarger =
+    new CompactedHFilesDischarger(cleanerInterval, this, this);
+    choreService.scheduleChore(compactedFileDischarger);
+
     // Start executor services
-    this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
-      conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
-    this.service.startExecutorService(ExecutorType.RS_OPEN_META,
-      conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
-    this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
-      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 
3));
-    this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
-      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
-    this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
-      conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
+    this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
+        conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
+    this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
+        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
+    
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
+        conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 
3));
+    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
+        conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
+    this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
+        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
     if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, 
false)) {
-      this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
-        conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
+      this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
+          conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
     }
-    this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 
conf.getInt(
-       "hbase.regionserver.wal.max.splitters", 
SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
+    this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 
conf.getInt(
+    "hbase.regionserver.wal.max.splitters", 
SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
     // Start the threads for compacted files discharger
-    
this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
-      
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT,
 10));
+    
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
+        
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT,
 10));
     if 
(ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
-      
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
-        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
-          conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
+      
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
+          conf.getInt("hbase.regionserver.region.replica.flusher.threads",
+              conf.getInt("hbase.regionserver.executor.openregion.threads", 
3)));
     }
 
     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + 
".logRoller",
-        uncaughtExceptionHandler);
+    uncaughtExceptionHandler);
     this.cacheFlusher.start(uncaughtExceptionHandler);
 
     if (this.compactionChecker != null) 
choreService.scheduleChore(compactionChecker);
@@ -1965,19 +1972,7 @@ public class HRegionServer extends HasThread implements
     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
     Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + 
".leaseChecker",
-      uncaughtExceptionHandler);
-
-    if (this.replicationSourceHandler == this.replicationSinkHandler &&
-        this.replicationSourceHandler != null) {
-      this.replicationSourceHandler.startReplicationService();
-    } else {
-      if (this.replicationSourceHandler != null) {
-        this.replicationSourceHandler.startReplicationService();
-      }
-      if (this.replicationSinkHandler != null) {
-        this.replicationSinkHandler.startReplicationService();
-      }
-    }
+    uncaughtExceptionHandler);
 
     // Create the log splitting worker and start it
     // set a smaller retries to fast fail otherwise splitlogworker could be 
blocked for
@@ -1985,12 +1980,77 @@ public class HRegionServer extends HasThread implements
     // tasks even after current task is preempted after a split task times out.
     Configuration sinkConf = HBaseConfiguration.create(conf);
     sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take 
about 23 seconds
+        conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take 
about 23 seconds
     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 
seconds
+        conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 
seconds
     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
-    this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, 
walFactory);
-    splitLogWorker.start();
+    if (this.csm != null) {
+      // SplitLogWorker needs csm. If none, don't start this.
+      this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
+          this, walFactory);
+      splitLogWorker.start();
+    } else {
+      LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is 
null");
+    }
+
+    // Memstore services.
+    startHeapMemoryManager();
+    // Call it after starting HeapMemoryManager.
+    initializeMemStoreChunkCreator();
+  }
+
+  private void initializeThreads() throws IOException {
+    // Cache flushing thread.
+    this.cacheFlusher = new MemStoreFlusher(conf, this);
+
+    // Compaction thread
+    this.compactSplitThread = new CompactSplit(this);
+
+    // Background thread to check for compactions; needed if region has not 
gotten updates
+    // in a while. It will take care of not checking too frequently on 
store-by-store basis.
+    this.compactionChecker = new CompactionChecker(this, 
this.threadWakeFrequency, this);
+    this.periodicFlusher = new 
PeriodicMemStoreFlusher(this.threadWakeFrequency, this);
+    this.leases = new Leases(this.threadWakeFrequency);
+
+    // Create the thread to clean the moved regions list
+    movedRegionsCleaner = MovedRegionsCleaner.create(this);
+
+    if (this.nonceManager != null) {
+      // Create the scheduled chore that cleans up nonces.
+      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
+    }
+
+    // Setup the Quota Manager
+    rsQuotaManager = new RegionServerRpcQuotaManager(this);
+    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
+
+    if (QuotaUtil.isQuotaEnabled(conf)) {
+      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
+    }
+
+
+    boolean onlyMetaRefresh = false;
+    int storefileRefreshPeriod = conf.getInt(
+        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
+        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
+    if (storefileRefreshPeriod == 0) {
+      storefileRefreshPeriod = conf.getInt(
+          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
+          
StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
+      onlyMetaRefresh = true;
+    }
+    if (storefileRefreshPeriod > 0) {
+      this.storefileRefresher = new 
StorefileRefresherChore(storefileRefreshPeriod,
+          onlyMetaRefresh, this, this);
+    }
+    registerConfigurationObservers();
+  }
+
+  private void registerConfigurationObservers() {
+    // Registering the compactSplitThread object with the ConfigurationManager.
+    configurationManager.registerObserver(this.compactSplitThread);
+    configurationManager.registerObserver(this.rpcServices);
+    configurationManager.registerObserver(this);
   }
 
   /**
@@ -2058,14 +2118,15 @@ public class HRegionServer extends HasThread implements
       return false;
     }
     // Verify that all threads are alive
-    if (!(leases.isAlive()
-        && cacheFlusher.isAlive() && walRoller.isAlive()
-        && this.compactionChecker.isScheduled()
-        && this.periodicFlusher.isScheduled())) {
+    boolean healthy = (this.leases == null || this.leases.isAlive())
+        && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
+        && (this.walRoller == null || this.walRoller.isAlive())
+        && (this.compactionChecker == null || 
this.compactionChecker.isScheduled())
+        && (this.periodicFlusher == null || 
this.periodicFlusher.isScheduled());
+    if (!healthy) {
       stop("One or more threads are no longer alive -- stop");
-      return false;
     }
-    return true;
+    return healthy;
   }
 
   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
@@ -2088,7 +2149,9 @@ public class HRegionServer extends HasThread implements
       byte[] namespace = regionInfo.getTable().getNamespace();
       wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace);
     }
-    walRoller.addWAL(wal);
+    if (this.walRoller != null) {
+      this.walRoller.addWAL(wal);
+    }
     return wal;
   }
 
@@ -2307,9 +2370,10 @@ public class HRegionServer extends HasThread implements
     // RegionReplicaFlushHandler might reset this.
 
     // submit it to be handled by one of the handlers so that we do not block 
OpenRegionHandler
-    this.service.submit(
-      new RegionReplicaFlushHandler(this, clusterConnection,
-        rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, 
region));
+    if (this.executorService != null) {
+      this.executorService.submit(new RegionReplicaFlushHandler(this, 
clusterConnection,
+          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, 
region));
+    }
   }
 
   @Override
@@ -2432,7 +2496,7 @@ public class HRegionServer extends HasThread implements
     if (this.compactSplitThread != null) {
       this.compactSplitThread.join();
     }
-    if (this.service != null) this.service.shutdown();
+    if (this.executorService != null) this.executorService.shutdown();
     if (this.replicationSourceHandler != null &&
         this.replicationSourceHandler == this.replicationSinkHandler) {
       this.replicationSourceHandler.stopReplicationService();
@@ -2448,7 +2512,7 @@ public class HRegionServer extends HasThread implements
 
   /**
    * @return Return the object that implements the replication
-   * source service.
+   * source executorService.
    */
   @VisibleForTesting
   public ReplicationSourceService getReplicationSourceService() {
@@ -2457,7 +2521,7 @@ public class HRegionServer extends HasThread implements
 
   /**
    * @return Return the object that implements the replication
-   * sink service.
+   * sink executorService.
    */
   ReplicationSinkService getReplicationSinkService() {
     return replicationSinkHandler;
@@ -2569,6 +2633,7 @@ public class HRegionServer extends HasThread implements
    * @throws IOException
    */
   private RegionServerStartupResponse reportForDuty() throws IOException {
+    if (this.masterless) return 
RegionServerStartupResponse.getDefaultInstance();
     ServerName masterServerName = createRegionServerStatusStub(true);
     if (masterServerName == null) return null;
     RegionServerStartupResponse result = null;
@@ -2880,7 +2945,7 @@ public class HRegionServer extends HasThread implements
 
   @Override
   public ExecutorService getExecutorService() {
-    return service;
+    return executorService;
   }
 
   @Override
@@ -2898,10 +2963,10 @@ public class HRegionServer extends HasThread implements
   //
 
   /**
-   * Load the replication service objects, if any
+   * Load the replication executorService objects, if any
    */
-  private static void createNewReplicationInstance(Configuration conf,
-    HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) 
throws IOException{
+  private static void createNewReplicationInstance(Configuration conf, 
HRegionServer server,
+      FileSystem walFs, Path walDir, Path oldWALDir) throws IOException {
 
     if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) ||
         LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
@@ -2971,7 +3036,7 @@ public class HRegionServer extends HasThread implements
    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
    */
   public static void main(String[] args) throws Exception {
-    LOG.info("STARTING service " + HRegionServer.class.getSimpleName());
+    LOG.info("STARTING executorService " + 
HRegionServer.class.getSimpleName());
     VersionInfo.logVersion();
     Configuration conf = HBaseConfiguration.create();
     @SuppressWarnings("unchecked")
@@ -3145,7 +3210,7 @@ public class HRegionServer extends HasThread implements
     } else {
       crh = new CloseRegionHandler(this, this, hri, abort, sn);
     }
-    this.service.submit(crh);
+    this.executorService.submit(crh);
     return true;
   }
 
@@ -3395,8 +3460,7 @@ public class HRegionServer extends HasThread implements
 
   // This map will contains all the regions that we closed for a move.
   // We add the time it was moved as we don't want to keep too old information
-  protected Map<String, MovedRegionInfo> movedRegions =
-      new ConcurrentHashMap<>(3000);
+  protected Map<String, MovedRegionInfo> movedRegions = new 
ConcurrentHashMap<>(3000);
 
   // We need a timeout. If not there is a risk of giving a wrong information: 
this would double
   //  the number of network calls instead of reducing them.
@@ -3457,7 +3521,6 @@ public class HRegionServer extends HasThread implements
   /**
    * Creates a Chore thread to clean the moved region cache.
    */
-
   protected final static class MovedRegionsCleaner extends ScheduledChore 
implements Stoppable {
     private HRegionServer regionServer;
     Stoppable stoppable;
@@ -3606,7 +3669,7 @@ public class HRegionServer extends HasThread implements
       String serviceName = call.getServiceName();
       com.google.protobuf.Service service = 
coprocessorServiceHandlers.get(serviceName);
       if (service == null) {
-        throw new UnknownProtocolException(null, "No registered coprocessor 
service found for " +
+        throw new UnknownProtocolException(null, "No registered coprocessor 
executorService found for " +
             serviceName);
       }
       com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
@@ -3617,7 +3680,7 @@ public class HRegionServer extends HasThread implements
           serviceDesc.findMethodByName(methodName);
       if (methodDesc == null) {
         throw new UnknownProtocolException(service.getClass(), "Unknown method 
" + methodName +
-            " called on service " + serviceName);
+            " called on executorService " + serviceName);
       }
 
       com.google.protobuf.Message request =

http://git-wip-us.apache.org/repos/asf/hbase/blob/456057ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 045838a..28f73aa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -315,6 +315,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
 
   /**
+   * Services launched in RSRpcServices. By default they are on but you can 
use the below
+   * booleans to selectively enable/disable either Admin or Client Service 
(Rare is the case
+   * where you would ever turn off one or the other).
+   */
+  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
+      "hbase.regionserver.admin.executorService";
+  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
+      "hbase.regionserver.client.executorService";
+
+  /**
    * An Rpc callback for closing a RegionScanner.
    */
   private static final class RegionScannerCloseCallBack implements RpcCallback 
{
@@ -591,13 +601,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   /**
    * Mutate a list of rows atomically.
    *
-   * @param region
-   * @param actions
    * @param cellScanner if non-null, the mutation data -- the Cell content.
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param compareOp
    * @param comparator @throws IOException
    */
   private boolean checkAndRowMutate(final HRegion region, final 
List<ClientProtos.Action> actions,
@@ -649,12 +653,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   /**
    * Execute an append mutation.
    *
-   * @param region
-   * @param m
-   * @param cellScanner
    * @return result to return to client if default operation should be
    * bypassed as indicated by RegionObserver, null otherwise
-   * @throws IOException
    */
   private Result append(final HRegion region, final OperationQuota quota,
       final MutationProto mutation, final CellScanner cellScanner, long 
nonceGroup,
@@ -1426,17 +1426,31 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
   }
 
   /**
-   * @return list of blocking services and their security info classes that 
this server supports
+   * By default, put up an Admin and a Client Service.
+   * Set booleans <code>hbase.regionserver.admin.executorService</code> and
+   * <code>hbase.regionserver.client.executorService</code> if you want to 
enable/disable services.
+   * Default is that both are enabled.
+   * @return immutable list of blocking services and the security info classes 
that this server
+   * supports
    */
   protected List<BlockingServiceAndInterface> getServices() {
-    List<BlockingServiceAndInterface> bssi = new ArrayList<>(2);
-    bssi.add(new BlockingServiceAndInterface(
+    boolean admin =
+      getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
+    boolean client =
+      getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
+    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
+    if (client) {
+      bssi.add(new BlockingServiceAndInterface(
       ClientService.newReflectiveBlockingService(this),
       ClientService.BlockingInterface.class));
-    bssi.add(new BlockingServiceAndInterface(
+    }
+    if (admin) {
+      bssi.add(new BlockingServiceAndInterface(
       AdminService.newReflectiveBlockingService(this),
       AdminService.BlockingInterface.class));
-    return bssi;
+    }
+    return new org.apache.hadoop.hbase.shaded.com.google.common.collect.
+        
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
   }
 
   public InetSocketAddress getSocketAddress() {
@@ -1943,20 +1957,24 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
           }
           // If there is no action in progress, we can submit a specific 
handler.
           // Need to pass the expected version in the constructor.
-          if (region.isMetaRegion()) {
-            regionServer.service.submit(new OpenMetaHandler(
-              regionServer, regionServer, region, htd, masterSystemTime));
+          if (regionServer.executorService == null) {
+            LOG.info("No executor executorService; skipping open request");
           } else {
-            if (regionOpenInfo.getFavoredNodesCount() > 0) {
-              
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
-                  regionOpenInfo.getFavoredNodesList());
-            }
-            if (htd.getPriority() >= HConstants.ADMIN_QOS || 
region.getTable().isSystemTable()) {
-              regionServer.service.submit(new OpenPriorityRegionHandler(
-                regionServer, regionServer, region, htd, masterSystemTime));
+            if (region.isMetaRegion()) {
+              regionServer.executorService.submit(new OpenMetaHandler(
+              regionServer, regionServer, region, htd, masterSystemTime));
             } else {
-              regionServer.service.submit(new OpenRegionHandler(
+              if (regionOpenInfo.getFavoredNodesCount() > 0) {
+                
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
+                regionOpenInfo.getFavoredNodesList());
+              }
+              if (htd.getPriority() >= HConstants.ADMIN_QOS || 
region.getTable().isSystemTable()) {
+                regionServer.executorService.submit(new 
OpenPriorityRegionHandler(
+                regionServer, regionServer, region, htd, masterSystemTime));
+              } else {
+                regionServer.executorService.submit(new OpenRegionHandler(
                 regionServer, regionServer, region, htd, masterSystemTime));
+              }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/456057ef/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index ffda964..f8b9f6e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -263,7 +263,7 @@ public class TestRegionServerNoMaster {
     // Let's start the open handler
     TableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
 
-    getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 
-1));
+    getRS().executorService.submit(new OpenRegionHandler(getRS(), getRS(), 
hri, htd, -1));
 
     // The open handler should have removed the region from RIT but kept the 
region closed
     checkRegionIsClosed(HTU, getRS(), hri);

Reply via email to