This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit f896c98c2356a52dfa2235d2cc02ae556ab17909
Merge: 6aff988 0a2457b
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Fri Mar 22 18:45:50 2019 -0400

    Merge branch '1.9'

 .../org/apache/accumulo/core/conf/Property.java    |   4 +
 .../java/org/apache/accumulo/master/Master.java    | 178 +++++++++++----------
 .../master/metrics/MasterMetricsFactory.java       |  69 +++++++-
 .../master/metrics/fate/FateMetricValues.java      | 176 ++++++++++++++++++++
 .../accumulo/master/metrics/fate/FateMetrics.java  | 146 +++++++++++++++++
 .../master/metrics/fate/FateMetricsMBean.java      |  27 ++++
 .../master/metrics/fate/Metrics2FateMetrics.java   | 128 +++++++++++++++
 .../master/metrics/fate/FateMetricValuesTest.java  |  73 +++++++++
 .../resources/hadoop-metrics2-accumulo.properties  |  59 +++++++
 9 files changed, 774 insertions(+), 86 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 9529b1a,ede2b8a..2d358e2
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -258,13 -322,17 +258,17 @@@ public enum Property 
            + "completely quit. This delay gives it time before log recoveries 
begin."),
    MASTER_LEASE_RECOVERY_WAITING_PERIOD("master.lease.recovery.interval", "5s",
        PropertyType.TIMEDURATION,
 -      "The amount of time to wait after requesting a WAL file to be 
recovered"),
 +      "The amount of time to wait after requesting a write-ahead log to be 
recovered"),
    MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation",
        "org.apache.accumulo.server.master.recovery.HadoopLogCloser", 
PropertyType.CLASSNAME,
 -      "A class that implements a mechansim to steal write access to a file"),
 +      "A class that implements a mechanism to steal write access to a 
write-ahead log"),
+   MASTER_FATE_METRICS_ENABLED("master.fate.metrics.enabled", "false", 
PropertyType.BOOLEAN,
+       "Enable reporting of FATE metrics in JMX (and logging with Hadoop 
Metrics2"),
+   
MASTER_FATE_METRICS_MIN_UPDATE_INTERVAL("master.fate.metrics.min.update.interval",
 "60s",
+       PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper 
to update interval"),
    MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", 
PropertyType.COUNT,
 -      "The number of threads used to run FAult-Tolerant Executions. These are 
"
 -          + "primarily table operations like merge."),
 +      "The number of threads used to run fault-tolerant executions (FATE)."
 +          + " These are primarily table operations like merge."),
    MASTER_REPLICATION_SCAN_INTERVAL("master.replication.status.scan.interval", 
"30s",
        PropertyType.TIMEDURATION,
        "Amount of time to sleep before scanning the status section of the "
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index a22471e,4b3d691..f8622de
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -126,9 -122,9 +126,8 @@@ import org.apache.accumulo.server.maste
  import org.apache.accumulo.server.master.state.TabletState;
  import org.apache.accumulo.server.master.state.ZooStore;
  import org.apache.accumulo.server.master.state.ZooTabletStateStore;
- import org.apache.accumulo.server.metrics.Metrics;
 -import org.apache.accumulo.server.metrics.MetricsSystemHelper;
  import org.apache.accumulo.server.replication.ZooKeeperInitialization;
 -import org.apache.accumulo.server.rpc.RpcWrapper;
 +import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
  import org.apache.accumulo.server.rpc.ServerAddress;
  import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
  import org.apache.accumulo.server.rpc.TServerUtils;
@@@ -240,21 -234,21 +239,22 @@@ public class Maste
    static final boolean X = true;
    static final boolean O = false;
    // @formatter:off
 -  static final boolean transitionOK[][] = {
 +  static final boolean[][] transitionOK = {
        //                            INITIAL HAVE_LOCK SAFE_MODE NORMAL 
UNLOAD_META UNLOAD_ROOT STOP
 -      /* INITIAL */                 {X,     X,        O,        O,      O,    
     O,          X},
 -      /* HAVE_LOCK */               {O,     X,        X,        X,      O,    
     O,          X},
 -      /* SAFE_MODE */               {O,     O,        X,        X,      X,    
     O,          X},
 -      /* NORMAL */                  {O,     O,        X,        X,      X,    
     O,          X},
 -      /* UNLOAD_METADATA_TABLETS */ {O,     O,        X,        X,      X,    
     X,          X},
 -      /* UNLOAD_ROOT_TABLET */      {O,     O,        O,        X,      X,    
     X,          X},
 -      /* STOP */                    {O,     O,        O,        O,      O,    
     X,          X}};
 +      /* INITIAL */                 {X, X, O, O, O, O, X},
 +      /* HAVE_LOCK */               {O, X, X, X, O, O, X},
 +      /* SAFE_MODE */               {O, O, X, X, X, O, X},
 +      /* NORMAL */                  {O, O, X, X, X, O, X},
 +      /* UNLOAD_METADATA_TABLETS */ {O, O, X, X, X, X, X},
 +      /* UNLOAD_ROOT_TABLET */      {O, O, O, X, X, X, X},
 +      /* STOP */                    {O, O, O, O, O, X, X}};
    //@formatter:on
    synchronized void setMasterState(MasterState newState) {
--    if (state.equals(newState))
++    if (state.equals(newState)) {
        return;
++    }
      if (!transitionOK[state.ordinal()][newState.ordinal()]) {
 -      log.error("Programmer error: master should not transition from " + 
state + " to " + newState);
 +      log.error("Programmer error: master should not transition from {} to 
{}", state, newState);
      }
      MasterState oldState = state;
      state = newState;
@@@ -262,15 -256,15 +262,11 @@@
      if (newState == MasterState.STOP) {
        // Give the server a little time before shutdown so the client
        // thread requesting the stop can return
--      SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() {
--        @Override
--        public void run() {
--          // This frees the main thread and will cause the master to exit
--          clientService.stop();
--          Master.this.nextEvent.event("stopped event loop");
--        }
--
 -      }, 100l, 1000l);
++      SimpleTimer.getInstance(getConfiguration()).schedule(() -> {
++        // This frees the main thread and will cause the master to exit
++        clientService.stop();
++        Master.this.nextEvent.event("stopped event loop");
 +      }, 100L, 1000L);
      }
  
      if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
@@@ -314,10 -306,10 +310,11 @@@
          }
        }
  
--      if (location == null)
++      if (location == null) {
          throw new IllegalStateException("Failed to find root tablet");
++      }
  
 -      log.info("Upgrade setting root table location in zookeeper " + 
location);
 +      log.info("Upgrade setting root table location in zookeeper {}", 
location);
        zoo.putPersistentData(dirZPath, location.toString().getBytes(), 
NodeExistsPolicy.FAIL);
      }
    }
@@@ -404,17 -394,19 +401,18 @@@
          }
  
          // create initial namespaces
 -        String namespaces = ZooUtil.getRoot(getInstance()) + 
Constants.ZNAMESPACES;
 +        String namespaces = getZooKeeperRoot() + Constants.ZNAMESPACES;
          zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
 -        for (Pair<String,String> namespace : Iterables.concat(
 -            Collections.singleton(
 -                new Pair<>(Namespaces.ACCUMULO_NAMESPACE, 
Namespaces.ACCUMULO_NAMESPACE_ID)),
 -            Collections.singleton(
 -                new Pair<>(Namespaces.DEFAULT_NAMESPACE, 
Namespaces.DEFAULT_NAMESPACE_ID)))) {
 +        for (Pair<String,NamespaceId> namespace : Iterables.concat(
 +            Collections.singleton(new Pair<>(Namespace.ACCUMULO.name(), 
Namespace.ACCUMULO.id())),
 +            Collections.singleton(new Pair<>(Namespace.DEFAULT.name(), 
Namespace.DEFAULT.id())))) {
            String ns = namespace.getFirst();
 -          String id = namespace.getSecond();
 -          log.debug("Upgrade creating namespace \"" + ns + "\" (ID: " + id + 
")");
 -          if (!Namespaces.exists(getInstance(), id))
 -            
TableManager.prepareNewNamespaceState(getInstance().getInstanceID(), id, ns,
 +          NamespaceId id = namespace.getSecond();
 +          log.debug("Upgrade creating namespace \"{}\" (ID: {})", ns, id);
-           if (!Namespaces.exists(context, id))
++          if (!Namespaces.exists(context, id)) {
 +            TableManager.prepareNewNamespaceState(zoo, getInstanceID(), id, 
ns,
                  NodeExistsPolicy.SKIP);
++          }
          }
  
          // create replication table in zk
@@@ -645,38 -635,32 +643,39 @@@
      return result;
    }
  
 -  public void mustBeOnline(final String tableId) throws 
ThriftTableOperationException {
 -    Tables.clearCache(getInstance());
 -    if (!Tables.getTableState(getInstance(), 
tableId).equals(TableState.ONLINE))
 -      throw new ThriftTableOperationException(tableId, null, 
TableOperation.MERGE,
 +  public void mustBeOnline(final TableId tableId) throws 
ThriftTableOperationException {
 +    Tables.clearCache(context);
-     if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE))
++    if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE)) {
 +      throw new ThriftTableOperationException(tableId.canonical(), null, 
TableOperation.MERGE,
            TableOperationExceptionType.OFFLINE, "table is not online");
++    }
    }
  
 -  public Master(ServerConfigurationFactory config, VolumeManager fs, String 
hostname)
 -      throws IOException {
 -    super(config);
 -    this.serverConfig = config;
 -    this.fs = fs;
 -    this.hostname = hostname;
 +  public ServerContext getContext() {
 +    return context;
 +  }
  
 -    AccumuloConfiguration aconf = serverConfig.getConfiguration();
 +  public TableManager getTableManager() {
 +    return context.getTableManager();
 +  }
  
 -    log.info("Version " + Constants.VERSION);
 -    log.info("Instance " + getInstance().getInstanceID());
 -    timeKeeper = new MasterTime(this);
 +  public Master(ServerContext context) throws IOException {
 +    this.context = context;
 +    this.serverConfig = context.getServerConfFactory();
 +    this.fs = context.getVolumeManager();
 +    this.hostname = context.getHostname();
 +
 +    AccumuloConfiguration aconf = serverConfig.getSystemConfiguration();
  
 +    log.info("Version {}", Constants.VERSION);
 +    log.info("Instance {}", getInstanceID());
 +    timeKeeper = new MasterTime(this);
      ThriftTransportPool.getInstance()
          .setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 -    tserverSet = new LiveTServerSet(this, this);
 -    this.tabletBalancer = 
aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER,
 -        TabletBalancer.class, new DefaultLoadBalancer());
 -    this.tabletBalancer.init(serverConfig);
 +    tserverSet = new LiveTServerSet(context, this);
 +    this.tabletBalancer = Property.createInstanceFromPropertyName(aconf,
 +        Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new 
DefaultLoadBalancer());
 +    this.tabletBalancer.init(context);
  
      try {
        AccumuloVFSClassLoader.getContextManager()
@@@ -733,13 -705,14 +732,14 @@@
      return tserverSet.getConnection(server);
    }
  
 -  public MergeInfo getMergeInfo(String tableId) {
 +  public MergeInfo getMergeInfo(TableId tableId) {
      synchronized (mergeLock) {
        try {
 -        String path = ZooUtil.getRoot(getInstance().getInstanceID()) + 
Constants.ZTABLES + "/"
 -            + tableId + "/merge";
 -        if (!ZooReaderWriter.getInstance().exists(path))
 +        String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId 
+ "/merge";
-         if (!context.getZooReaderWriter().exists(path))
++        if (!context.getZooReaderWriter().exists(path)) {
            return new MergeInfo();
 -        byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());
++        }
 +        byte[] data = context.getZooReaderWriter().getData(path, new Stat());
          DataInputBuffer in = new DataInputBuffer();
          in.reset(data, data.length);
          MergeInfo info = new MergeInfo();
@@@ -799,21 -774,21 +799,23 @@@
    }
  
    MasterGoalState getMasterGoalState() {
--    while (true)
++    while (true) {
        try {
 -        byte[] data = ZooReaderWriter.getInstance()
 -            .getData(ZooUtil.getRoot(getInstance()) + 
Constants.ZMASTER_GOAL_STATE, null);
 +        byte[] data = context.getZooReaderWriter()
 +            .getData(getZooKeeperRoot() + Constants.ZMASTER_GOAL_STATE, null);
          return MasterGoalState.valueOf(new String(data));
        } catch (Exception e) {
 -        log.error("Problem getting real goal state from zookeeper: " + e);
 +        log.error("Problem getting real goal state from zookeeper: ", e);
          sleepUninterruptibly(1, TimeUnit.SECONDS);
        }
++    }
    }
  
    public boolean hasCycled(long time) {
      for (TabletGroupWatcher watcher : watchers) {
--      if (watcher.stats.lastScanFinished() < time)
++      if (watcher.stats.lastScanFinished() < time) {
          return false;
++      }
      }
  
      return true;
@@@ -825,7 -806,7 +827,7 @@@
      }
    }
  
--  static enum TabletGoalState {
++  enum TabletGoalState {
      HOSTED(TUnloadTabletGoal.UNKNOWN),
      UNASSIGNED(TUnloadTabletGoal.UNASSIGNED),
      DELETED(TUnloadTabletGoal.DELETED),
@@@ -850,12 -831,12 +852,14 @@@
        case HAVE_LOCK: // fall-through intended
        case INITIAL: // fall-through intended
        case SAFE_MODE:
--        if (tls.extent.isMeta())
++        if (tls.extent.isMeta()) {
            return TabletGoalState.HOSTED;
++        }
          return TabletGoalState.UNASSIGNED;
        case UNLOAD_METADATA_TABLETS:
--        if (tls.extent.isRootTablet())
++        if (tls.extent.isRootTablet()) {
            return TabletGoalState.HOSTED;
++        }
          return TabletGoalState.UNASSIGNED;
        case UNLOAD_ROOT_TABLET:
          return TabletGoalState.UNASSIGNED;
@@@ -867,9 -848,9 +871,10 @@@
    }
  
    TabletGoalState getTableGoalState(KeyExtent extent) {
 -    TableState tableState = 
TableManager.getInstance().getTableState(extent.getTableId());
 -    if (tableState == null)
 +    TableState tableState = 
context.getTableManager().getTableState(extent.getTableId());
-     if (tableState == null)
++    if (tableState == null) {
        return TabletGoalState.DELETED;
++    }
      switch (tableState) {
        case DELETING:
          return TabletGoalState.DELETED;
@@@ -902,11 -883,11 +907,13 @@@
                return TabletGoalState.HOSTED;
              case WAITING_FOR_CHOPPED:
                if 
(tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) {
--                if (tls.chopped)
++                if (tls.chopped) {
                    return TabletGoalState.UNASSIGNED;
++                }
                } else {
--                if (tls.chopped && tls.walogs.isEmpty())
++                if (tls.chopped && tls.walogs.isEmpty()) {
                    return TabletGoalState.UNASSIGNED;
++                }
                }
  
                return TabletGoalState.HOSTED;
@@@ -1035,30 -1016,30 +1042,33 @@@
                    int count = nonMetaDataTabletsAssignedOrHosted();
                    log.debug(
                        String.format("There are %d non-metadata tablets 
assigned or hosted", count));
--                  if (count == 0 && goodStats())
++                  if (count == 0 && goodStats()) {
                      setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
++                  }
                  }
                    break;
                  case UNLOAD_METADATA_TABLETS: {
                    int count = assignedOrHosted(MetadataTable.ID);
                    log.debug(
                        String.format("There are %d metadata tablets assigned 
or hosted", count));
--                  if (count == 0 && goodStats())
++                  if (count == 0 && goodStats()) {
                      setMasterState(MasterState.UNLOAD_ROOT_TABLET);
++                  }
                  }
                    break;
 -                case UNLOAD_ROOT_TABLET: {
 +                case UNLOAD_ROOT_TABLET:
                    int count = assignedOrHosted(MetadataTable.ID);
                    if (count > 0 && goodStats()) {
                      log.debug(String.format("%d metadata tablets online", 
count));
                      setMasterState(MasterState.UNLOAD_ROOT_TABLET);
                    }
                    int root_count = assignedOrHosted(RootTable.ID);
--                  if (root_count > 0 && goodStats())
++                  if (root_count > 0 && goodStats()) {
                      log.debug("The root tablet is still assigned or hosted");
++                  }
                    if (count + root_count == 0 && goodStats()) {
                      Set<TServerInstance> currentServers = 
tserverSet.getCurrentServers();
 -                    log.debug("stopping " + currentServers.size() + " tablet 
servers");
 +                    log.debug("stopping {} tablet servers", 
currentServers.size());
                      for (TServerInstance server : currentServers) {
                        try {
                          serversToShutdown.add(server);
@@@ -1069,9 -1050,10 +1079,10 @@@
                          tserverSet.remove(server);
                        }
                      }
--                    if (currentServers.size() == 0)
++                    if (currentServers.size() == 0) {
                        setMasterState(MasterState.STOP);
++                    }
                    }
 -                }
                    break;
                  default:
                    break;
@@@ -1134,11 -1117,12 +1145,12 @@@
          }
        }
        if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 
1) {
 -        log.warn(
 -            "Tablet server " + instance + " exceeded maximum hold time: 
attempting to kill it");
 +        log.warn("Tablet server {} exceeded maximum hold time: attempting to 
kill it", instance);
          try {
            TServerConnection connection = tserverSet.getConnection(instance);
--          if (connection != null)
++          if (connection != null) {
              connection.fastHalt(masterLock);
++          }
          } catch (TException e) {
            log.error("{}", e.getMessage(), e);
          }
@@@ -1189,39 -1173,39 +1201,37 @@@
          // unresponsive tservers.
          sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), 
TimeUnit.MILLISECONDS);
        }
--      tp.submit(new Runnable() {
--        @Override
--        public void run() {
++      tp.submit(() -> {
++        try {
++          Thread t = Thread.currentThread();
++          String oldName = t.getName();
            try {
--            Thread t = Thread.currentThread();
--            String oldName = t.getName();
--            try {
--              t.setName("Getting status from " + server);
--              TServerConnection connection = tserverSet.getConnection(server);
--              if (connection == null)
--                throw new IOException("No connection to " + server);
--              TabletServerStatus status = connection.getTableMap(false);
--              result.put(server, status);
--            } finally {
--              t.setName(oldName);
++            t.setName("Getting status from " + server);
++            TServerConnection connection1 = tserverSet.getConnection(server);
++            if (connection1 == null) {
++              throw new IOException("No connection to " + server);
              }
--          } catch (Exception ex) {
-             log.error("unable to get tablet server status {} {}", server, 
ex.toString());
-             log.debug("unable to get tablet server status {}", server, ex);
 -            log.error("unable to get tablet server status " + server + " " + 
ex.toString());
 -            log.debug("unable to get tablet server status " + server, ex);
--            if (badServers.get(server).incrementAndGet() > 
MAX_BAD_STATUS_COUNT) {
-               log.warn("attempting to stop {}", server);
 -              log.warn("attempting to stop " + server);
--              try {
--                TServerConnection connection = 
tserverSet.getConnection(server);
--                if (connection != null) {
--                  connection.halt(masterLock);
--                }
--              } catch (TTransportException e) {
--                // ignore: it's probably down
--              } catch (Exception e) {
-                 log.info("error talking to troublesome tablet server", e);
 -                log.info("error talking to troublesome tablet server ", e);
++            TabletServerStatus status = connection1.getTableMap(false);
++            result.put(server, status);
++          } finally {
++            t.setName(oldName);
++          }
++        } catch (Exception ex) {
++          log.error("unable to get tablet server status {} {}", server, 
ex.toString());
++          log.debug("unable to get tablet server status {}", server, ex);
++          if (badServers.get(server).incrementAndGet() > 
MAX_BAD_STATUS_COUNT) {
++            log.warn("attempting to stop {}", server);
++            try {
++              TServerConnection connection2 = 
tserverSet.getConnection(server);
++              if (connection2 != null) {
++                connection2.halt(masterLock);
                }
--              badServers.remove(server);
++            } catch (TTransportException e1) {
++              // ignore: it's probably down
++            } catch (Exception e2) {
++              log.info("error talking to troublesome tablet server", e2);
              }
++            badServers.remove(server);
            }
          }
        });
@@@ -1347,14 -1310,16 +1357,8 @@@
        fate = new Fate<>(this, store);
        fate.startTransactionRunners(threads);
  
--      SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() {
--
--        @Override
--        public void run() {
--          store.ageOff();
--        }
--      }, 63000, 63000);
 -    } catch (KeeperException e) {
 -      throw new IOException(e);
 -    } catch (InterruptedException e) {
++      SimpleTimer.getInstance(getConfiguration()).schedule(() -> 
store.ageOff(), 63000, 63000);
 +    } catch (KeeperException | InterruptedException e) {
        throw new IOException(e);
      }
  
@@@ -1387,94 -1366,77 +1391,98 @@@
        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
      }
  
 -    // Start the daemon to scan the replication table and make units of work
 -    replicationWorkDriver = new ReplicationDriver(this);
 -    replicationWorkDriver.start();
 +    // if the replication name is ever set, then start replication services
 +    final AtomicReference<TServer> replServer = new AtomicReference<>();
 +    SimpleTimer.getInstance(getConfiguration()).schedule(() -> {
 +      try {
 +        if (replServer.get() == null) {
 +          if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
 +            log.info(Property.REPLICATION_NAME.getKey() + " was set, starting 
repl services.");
 +            replServer.set(setupReplication());
 +          }
 +        }
 +      } catch (UnknownHostException | KeeperException | InterruptedException 
e) {
 +        log.error("Error occurred starting replication services. ", e);
 +      }
 +    }, 0, 5000);
  
 -    // Start the daemon to assign work to tservers to replicate to our peers
 -    try {
 -      replicationWorkAssigner = new WorkDriver(this);
 -    } catch (AccumuloException | AccumuloSecurityException e) {
 -      log.error("Caught exception trying to initialize replication 
WorkDriver", e);
 -      throw new RuntimeException(e);
 +    // The master is fully initialized. Clients are allowed to connect now.
 +    masterInitialized.set(true);
 +
 +    while (clientService.isServing()) {
 +      sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
      }
 -    replicationWorkAssigner.start();
 +    log.info("Shutting down fate.");
 +    fate.shutdown();
 +
 +    log.info("Shutting down timekeeping.");
 +    timeKeeper.shutdown();
 +
 +    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
 +    statusThread.join(remaining(deadline));
-     if (replicationWorkAssigner != null)
++    if (replicationWorkAssigner != null) {
 +      replicationWorkAssigner.join(remaining(deadline));
-     if (replicationWorkDriver != null)
++    }
++    if (replicationWorkDriver != null) {
 +      replicationWorkDriver.join(remaining(deadline));
++    }
 +    TServerUtils.stopTServer(replServer.get());
 +
 +    // Signal that we want it to stop, and wait for it to do so.
 +    if (authenticationTokenKeyManager != null) {
 +      authenticationTokenKeyManager.gracefulStop();
 +      authenticationTokenKeyManager.join(remaining(deadline));
 +    }
 +
 +    // quit, even if the tablet servers somehow jam up and the watchers
 +    // don't stop
 +    for (TabletGroupWatcher watcher : watchers) {
 +      watcher.join(remaining(deadline));
 +    }
 +    log.info("exiting");
 +  }
  
 +  private TServer setupReplication()
 +      throws UnknownHostException, KeeperException, InterruptedException {
      // Start the replication coordinator which assigns tservers to service 
replication requests
      MasterReplicationCoordinator impl = new 
MasterReplicationCoordinator(this);
 +    ReplicationCoordinator.Iface haReplicationProxy = 
HighlyAvailableServiceWrapper.service(impl,
 +        this);
      // @formatter:off
      ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> 
replicationCoordinatorProcessor =
 -      new ReplicationCoordinator.Processor<>(
 +            new 
ReplicationCoordinator.Processor<>(TraceUtil.wrapService(haReplicationProxy));
      // @formatter:on
 -            RpcWrapper.service(impl,
 -                new 
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl)));
 -    ServerAddress replAddress = TServerUtils.startServer(this, hostname,
 +    ServerAddress replAddress = TServerUtils.startServer(context, hostname,
          Property.MASTER_REPLICATION_COORDINATOR_PORT, 
replicationCoordinatorProcessor,
          "Master Replication Coordinator", "Replication Coordinator", null,
          Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
          Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
  
      log.info("Started replication coordinator service at " + 
replAddress.address);
 +    // Start the daemon to scan the replication table and make units of work
 +    replicationWorkDriver = new ReplicationDriver(this);
 +    replicationWorkDriver.start();
 +
 +    // Start the daemon to assign work to tservers to replicate to our peers
 +    replicationWorkAssigner = new WorkDriver(this);
 +    replicationWorkAssigner.start();
  
      // Advertise that port we used so peers don't have to be told what it is
 -    ZooReaderWriter.getInstance().putPersistentData(
 -        ZooUtil.getRoot(getInstance()) + 
Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
 +    context.getZooReaderWriter().putPersistentData(
 +        getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
          replAddress.address.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
  
-     // Register replication metrics
+     // Register metrics modules
      MasterMetricsFactory factory = new 
MasterMetricsFactory(getConfiguration(), this);
-     Metrics replicationMetrics = factory.createReplicationMetrics();
-     try {
-       replicationMetrics.register();
-     } catch (Exception e) {
-       log.error("Failed to register replication metrics", e);
+ 
+     int failureCount = factory.register();
+ 
+     if (failureCount > 0) {
+       log.info("Failed to register {} metrics modules", failureCount);
+     } else {
+       log.info("All metrics modules registered");
      }
 -
 -    while (clientService.isServing()) {
 -      sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 -    }
 -    log.info("Shutting down fate.");
 -    fate.shutdown();
 -
 -    log.info("Shutting down timekeeping.");
 -    timeKeeper.shutdown();
 -
 -    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
 -    statusThread.join(remaining(deadline));
 -    replicationWorkAssigner.join(remaining(deadline));
 -    replicationWorkDriver.join(remaining(deadline));
 -    replAddress.server.stop();
 -    // Signal that we want it to stop, and wait for it to do so.
 -    if (authenticationTokenKeyManager != null) {
 -      authenticationTokenKeyManager.gracefulStop();
 -      authenticationTokenKeyManager.join(remaining(deadline));
 -    }
 -
 -    // quit, even if the tablet servers somehow jam up and the watchers
 -    // don't stop
 -    for (TabletGroupWatcher watcher : watchers) {
 -      watcher.join(remaining(deadline));
 -    }
 -    log.info("exiting");
 +    return replAddress.server;
    }
  
    private long remaining(long deadline) {
@@@ -1498,12 -1460,12 +1506,7 @@@
      @Override
      public void unableToMonitorLockNode(final Throwable e) {
        // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
--      Halt.halt(-1, new Runnable() {
--        @Override
--        public void run() {
--          log.error("FATAL: No longer able to monitor master lock node", e);
--        }
--      });
++      Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor master 
lock node", e));
  
      }
  
@@@ -1596,19 -1569,19 +1599,22 @@@
        Set<TServerInstance> added) {
      // if we have deleted or added tservers, then adjust our dead server list
      if (!deleted.isEmpty() || !added.isEmpty()) {
 -      DeadServerList obit = new DeadServerList(
 -          ZooUtil.getRoot(getInstance()) + Constants.ZDEADTSERVERS);
 +      DeadServerList obit = new DeadServerList(context,
 +          getZooKeeperRoot() + Constants.ZDEADTSERVERS);
        if (added.size() > 0) {
 -        log.info("New servers: " + added);
 -        for (TServerInstance up : added)
 +        log.info("New servers: {}", added);
-         for (TServerInstance up : added)
++        for (TServerInstance up : added) {
            obit.delete(up.hostPort());
++        }
        }
        for (TServerInstance dead : deleted) {
          String cause = "unexpected failure";
--        if (serversToShutdown.contains(dead))
++        if (serversToShutdown.contains(dead)) {
            cause = "clean shutdown"; // maybe an incorrect assumption
--        if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP))
++        }
++        if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) {
            obit.post(dead.hostPort(), cause);
++        }
        }
  
        Set<TServerInstance> unexpected = new HashSet<>(deleted);
@@@ -1682,22 -1655,22 +1688,25 @@@
    public void sessionExpired() {}
  
    @Override
 -  public Set<String> onlineTables() {
 -    Set<String> result = new HashSet<>();
 +  public Set<TableId> onlineTables() {
 +    Set<TableId> result = new HashSet<>();
      if (getMasterState() != MasterState.NORMAL) {
--      if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)
++      if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS) {
          result.add(MetadataTable.ID);
--      if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET)
++      }
++      if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET) {
          result.add(RootTable.ID);
++      }
        return result;
      }
 -    TableManager manager = TableManager.getInstance();
 +    TableManager manager = context.getTableManager();
  
 -    for (String tableId : Tables.getIdToNameMap(getInstance()).keySet()) {
 +    for (TableId tableId : Tables.getIdToNameMap(context).keySet()) {
        TableState state = manager.getTableState(tableId);
        if (state != null) {
--        if (state == TableState.ONLINE)
++        if (state == TableState.ONLINE) {
            result.add(tableId);
++        }
        }
      }
      return result;
@@@ -1788,10 -1760,11 +1797,11 @@@
      result.unassignedTablets = displayUnassigned();
      result.serversShuttingDown = new HashSet<>();
      synchronized (serversToShutdown) {
--      for (TServerInstance server : serversToShutdown)
++      for (TServerInstance server : serversToShutdown) {
          result.serversShuttingDown.add(server.hostPort());
++      }
      }
 -    DeadServerList obit = new DeadServerList(
 -        ZooUtil.getRoot(getInstance()) + Constants.ZDEADTSERVERS);
 +    DeadServerList obit = new DeadServerList(context, getZooKeeperRoot() + 
Constants.ZDEADTSERVERS);
      result.deadTabletServers = obit.getList();
      result.bulkImports = bulkImportStatus.getBulkLoadStatus();
      return result;
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
index 978f36c,3b6ad33..7001d82
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
@@@ -51,4 -108,13 +108,12 @@@ public class MasterMetricsFactory 
      return new Metrics2ReplicationMetrics(master, metricsSystem);
    }
  
+   private Metrics createFateMetrics() {
 -    String id = master.getInstance().getInstanceID();
+     if (useOldMetrics) {
 -      return new FateMetrics(id, fateMinUpdateInterval);
++      return new FateMetrics(master.getContext(), fateMinUpdateInterval);
+     }
+ 
 -    return new Metrics2FateMetrics(id, metricsSystem, fateMinUpdateInterval);
++    return new Metrics2FateMetrics(master.getContext(), metricsSystem, 
fateMinUpdateInterval);
+   }
+ 
  }
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java
index 0000000,b776f57..b5bc0c7
mode 000000,100644..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java
@@@ -1,0 -1,177 +1,176 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.master.metrics.fate;
+ 
+ import java.util.List;
+ 
+ import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.zookeeper.ZooUtil;
+ import org.apache.accumulo.fate.AdminUtil;
+ import org.apache.accumulo.fate.ZooStore;
+ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 -import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
++import org.apache.accumulo.server.ServerContext;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.data.Stat;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Immutable class that holds a snapshot of fate metric values - use builder 
to instantiate
+  * instance.
+  */
+ class FateMetricValues {
+ 
+   private static final Logger log = 
LoggerFactory.getLogger(FateMetrics.class);
+ 
+   private final long updateTime;
+   private final long currentFateOps;
+   private final long zkFateChildOpsTotal;
+   private final long zkConnectionErrors;
+ 
+   private FateMetricValues(final long updateTime, final long currentFateOps,
+       final long zkFateChildOpsTotal, final long zkConnectionErrors) {
+     this.updateTime = updateTime;
+     this.currentFateOps = currentFateOps;
+     this.zkFateChildOpsTotal = zkFateChildOpsTotal;
+     this.zkConnectionErrors = zkConnectionErrors;
+   }
+ 
+   long getCurrentFateOps() {
+     return currentFateOps;
+   }
+ 
+   long getZkFateChildOpsTotal() {
+     return zkFateChildOpsTotal;
+   }
+ 
+   long getZkConnectionErrors() {
+     return zkConnectionErrors;
+   }
+ 
+   /**
+    * Update FateMetricValues, populating with current values and the 
overwritting new values, this
+    * preserves previous values in case an error or exception prevents the 
values from being
+    * completely populated, this form may be more suitable for metric counters.
+    *
 -   * @param instanceId
 -   *          Accumulo instanceId
++   * @param context
++   *          the server's context
+    * @param currentValues
+    *          the current fate metrics used as default
+    * @return populated metrics values
+    */
 -  static FateMetricValues updateFromZookeeper(final String instanceId,
++  static FateMetricValues updateFromZookeeper(final ServerContext context,
+       final FateMetricValues currentValues) {
 -    return updateFromZookeeper(instanceId, 
FateMetricValues.builder().copy(currentValues));
++    return updateFromZookeeper(context, 
FateMetricValues.builder().copy(currentValues));
+   }
+ 
+   /**
+    * Update the FATE metric values from zookeeepr - the builder is expected 
to have the desired
+    * default values (either 0, or the previous value).
+    *
 -   * @param instanceId
 -   *          Accumulo instanceId
++   * @param context
++   *          the server's context
+    * @param builder
+    *          value builder, populated with defaults.
+    * @return an immutable instance of FateMetricsValues.
+    */
 -  private static FateMetricValues updateFromZookeeper(final String instanceId,
++  private static FateMetricValues updateFromZookeeper(final ServerContext 
context,
+       final FateMetricValues.Builder builder) {
+ 
+     AdminUtil<String> admin = new AdminUtil<>(false);
+ 
+     try {
+ 
 -      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 -      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, zoo);
++      IZooReaderWriter zoo = context.getZooReaderWriter();
++      ZooStore<String> zs = new ZooStore<>(context.getZooKeeperRoot() + 
Constants.ZFATE, zoo);
+ 
+       List<AdminUtil.TransactionStatus> currFates = 
admin.getTransactionStatus(zs, null, null);
+       builder.withCurrentFateOps(currFates.size());
+ 
 -      Stat node = zoo.getZooKeeper().exists(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, false);
++      Stat node = zoo.getZooKeeper().exists(context.getZooKeeperRoot() + 
Constants.ZFATE, false);
+       builder.withZkFateChildOpsTotal(node.getCversion());
+ 
+       if (log.isTraceEnabled()) {
+         log.trace(
+             "ZkNodeStat: {czxid: {}, mzxid: {}, pzxid: {}, ctime: {}, mtime: 
{}, "
+                 + "version: {}, cversion: {}, num children: {}",
+             node.getCzxid(), node.getMzxid(), node.getPzxid(), 
node.getCtime(), node.getMtime(),
+             node.getVersion(), node.getCversion(), node.getNumChildren());
+       }
+     } catch (KeeperException ex) {
+       log.debug("Error connecting to ZooKeeper", ex);
+       builder.incrZkConnectionErrors();
+     } catch (InterruptedException ex) {
+       Thread.currentThread().interrupt();
+     }
+ 
+     return builder.build();
+   }
+ 
+   @Override
+   public String toString() {
+     return "FateMetricValues{" + "updateTime=" + updateTime + ", 
currentFateOps=" + currentFateOps
+         + ", zkFateChildOpsTotal=" + zkFateChildOpsTotal + ", 
zkConnectionErrors="
+         + zkConnectionErrors + '}';
+   }
+ 
+   public static Builder builder() {
+     return new Builder();
+   }
+ 
+   static class Builder {
+ 
+     private long currentFateOps = 0;
+     private long zkFateChildOpsTotal = 0;
+     private long zkConnectionErrors = 0;
+ 
+     Builder copy(final FateMetricValues prevValues) {
+ 
+       if (prevValues == null) {
+         return new Builder();
+       }
+ 
+       return new Builder().withCurrentFateOps(prevValues.getCurrentFateOps())
+           .withZkFateChildOpsTotal(prevValues.getZkFateChildOpsTotal())
+           .withZkConnectionErrors(prevValues.getZkConnectionErrors());
+     }
+ 
+     Builder withCurrentFateOps(final long value) {
+       this.currentFateOps = value;
+       return this;
+     }
+ 
+     Builder withZkFateChildOpsTotal(final long value) {
+       this.zkFateChildOpsTotal = value;
+       return this;
+     }
+ 
+     Builder incrZkConnectionErrors() {
+       this.zkConnectionErrors += 1L;
+       return this;
+     }
+ 
+     Builder withZkConnectionErrors(final long value) {
+       this.zkConnectionErrors = value;
+       return this;
+     }
+ 
+     FateMetricValues build() {
+       return new FateMetricValues(System.currentTimeMillis(), currentFateOps, 
zkFateChildOpsTotal,
+           zkConnectionErrors);
+     }
+   }
+ }
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
index 0000000,934d803..de7cb8c
mode 000000,100644..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
@@@ -1,0 -1,144 +1,146 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.master.metrics.fate;
+ 
+ import java.lang.management.ManagementFactory;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import javax.management.MBeanServer;
+ import javax.management.ObjectName;
+ 
++import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.metrics.Metrics;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Basic implementation of fate metrics that publish to JMX when legacy 
metrics enabled. For logging
+  * in addition to JMX, use the hadoop metrics2 implementation. The 
measurement type and values
+  * provided are:
+  * <ul>
+  * <li>gauge - current count of FATE transactions in progress</li>
+  * <li>gauge - last zookeeper id that modified FATE root path to provide 
estimate of fate
+  * transaction liveliness</li>
+  * <li>counter - the number of zookeeper connection errors since process 
started.</li>
+  * </ul>
+  * Implementation notes:
+  * <p>
+  * The fate operation estimate is based on zookeeper Stat structure and the 
property of pzxid. From
+  * the zookeeper developer's guide: pzxid is "The zxid of the change that 
last modified children of
+  * this znode." The pzxid should then change each time a FATE transaction is 
created or deleted -
+  * and the zookeeper id (zxid) is expected to continuously increase because 
the zookeeper id is used
+  * by zookeeper for ordering operations.
+  */
+ public class FateMetrics implements Metrics, FateMetricsMBean {
+ 
+   private static final Logger log = 
LoggerFactory.getLogger(FateMetrics.class);
+ 
+   // limit calls to update fate counters to guard against hammering zookeeper.
+   private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(10);
+ 
+   private volatile long minimumRefreshDelay;
+ 
+   private final AtomicReference<FateMetricValues> metricValues;
+ 
+   private volatile long lastUpdate = 0;
+ 
 -  private final String instanceId;
++  private final ServerContext context;
+ 
+   private ObjectName objectName = null;
+ 
+   private volatile boolean enabled = false;
+ 
 -  public FateMetrics(final String instanceId, final long minimumRefreshDelay) 
{
++  public FateMetrics(final ServerContext context, final long 
minimumRefreshDelay) {
+ 
 -    this.instanceId = instanceId;
++    this.context = context;
+ 
+     this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, 
minimumRefreshDelay);
+ 
+     metricValues = new AtomicReference<>(FateMetricValues.builder().build());
+ 
+     try {
+       objectName = new ObjectName(
+           
"accumulo.server.metrics:service=FateMetrics,name=FateMetricsMBean,instance="
+               + Thread.currentThread().getName());
+     } catch (Exception e) {
+       log.error("Exception setting MBean object name", e);
+     }
+   }
+ 
+   @Override
+   public void register() throws Exception {
+     // Register this object with the MBeanServer
+     MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 -    if (null == objectName)
++    if (null == objectName) {
+       throw new IllegalArgumentException("MBean object name must be set.");
++    }
+     mbs.registerMBean(this, objectName);
+     enabled = true;
+   }
+ 
+   @Override
+   public void add(String name, long time) {
+     throw new UnsupportedOperationException("add() is not implemented");
+   }
+ 
+   /**
+    * Update the metric values from zookeeper after minimumRefreshDelay has 
expired.
+    */
+   public synchronized FateMetricValues snapshot() {
+ 
+     FateMetricValues current = metricValues.get();
+ 
+     long now = System.currentTimeMillis();
+ 
+     if ((lastUpdate + minimumRefreshDelay) > now) {
+       return current;
+     }
+ 
 -    FateMetricValues updates = 
FateMetricValues.updateFromZookeeper(instanceId, current);
++    FateMetricValues updates = FateMetricValues.updateFromZookeeper(context, 
current);
+ 
+     metricValues.set(updates);
+ 
+     lastUpdate = now;
+ 
+     return updates;
+   }
+ 
+   @Override
+   public boolean isEnabled() {
+     return enabled;
+   }
+ 
+   @Override
+   public long getCurrentFateOps() {
+     snapshot();
+     return metricValues.get().getCurrentFateOps();
+   }
+ 
+   @Override
+   public long getZkFateChildOpsTotal() {
+     snapshot();
+     return metricValues.get().getZkFateChildOpsTotal();
+   }
+ 
+   @Override
+   public long getZKConnectionErrorsTotal() {
+     snapshot();
+     return metricValues.get().getZkConnectionErrors();
+   }
+ 
+ }
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java
index 0000000,09997ec..2bf80ad
mode 000000,100644..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java
@@@ -1,0 -1,127 +1,128 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.master.metrics.fate;
+ 
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
++import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.server.metrics.Metrics;
+ import org.apache.accumulo.server.metrics.MetricsSystemHelper;
+ import org.apache.hadoop.metrics2.MetricsCollector;
+ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+ import org.apache.hadoop.metrics2.MetricsSource;
+ import org.apache.hadoop.metrics2.MetricsSystem;
+ import org.apache.hadoop.metrics2.impl.MsInfo;
+ import org.apache.hadoop.metrics2.lib.Interns;
+ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class Metrics2FateMetrics implements Metrics, MetricsSource {
+ 
+   private static final Logger log = 
LoggerFactory.getLogger(Metrics2FateMetrics.class);
+ 
+   // limit calls to update fate counters to guard against hammering zookeeper.
+   private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(10);
+ 
+   private volatile long minimumRefreshDelay;
+ 
+   public static final String NAME = MASTER_NAME + ",sub=Fate";
+   public static final String DESCRIPTION = "Fate Metrics";
+   public static final String CONTEXT = "master";
+   public static final String RECORD = "fate";
+   public static final String CUR_FATE_OPS = "currentFateOps";
+   public static final String TOTAL_FATE_OPS = "totalFateOps";
+   public static final String TOTAL_ZK_CONN_ERRORS = "totalZkConnErrors";
+ 
 -  private final String instanceId;
++  private final ServerContext context;
+   private final MetricsSystem metricsSystem;
+   private final MetricsRegistry registry;
+   private final MutableGaugeLong currentFateOps;
+   private final MutableGaugeLong zkChildFateOpsTotal;
+   private final MutableGaugeLong zkConnectionErrorsTotal;
+ 
+   private final AtomicReference<FateMetricValues> metricValues;
+ 
+   private volatile long lastUpdate = 0;
+ 
 -  public Metrics2FateMetrics(final String instanceId, MetricsSystem 
metricsSystem,
++  public Metrics2FateMetrics(final ServerContext context, MetricsSystem 
metricsSystem,
+       final long minimumRefreshDelay) {
+ 
 -    this.instanceId = instanceId;
++    this.context = context;
+ 
+     this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, 
minimumRefreshDelay);
+ 
 -    metricValues = new 
AtomicReference<>(FateMetricValues.updateFromZookeeper(instanceId, null));
++    metricValues = new 
AtomicReference<>(FateMetricValues.updateFromZookeeper(context, null));
+ 
+     this.metricsSystem = metricsSystem;
+     this.registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION));
+     this.registry.tag(MsInfo.ProcessName, 
MetricsSystemHelper.getProcessName());
+ 
+     currentFateOps = registry.newGauge(CUR_FATE_OPS, "Current number of FATE 
Ops", 0L);
+     zkChildFateOpsTotal = registry.newGauge(TOTAL_FATE_OPS, "Total FATE Ops", 
0L);
+     zkConnectionErrorsTotal = registry.newGauge(TOTAL_ZK_CONN_ERRORS, "Total 
ZK Connection Errors",
+         0L);
+ 
+   }
+ 
+   @Override
+   public void register() {
+     metricsSystem.register(NAME, DESCRIPTION, this);
+   }
+ 
+   @Override
+   public void add(String name, long time) {
+     throw new UnsupportedOperationException("add() is not implemented");
+   }
+ 
+   @Override
+   public boolean isEnabled() {
+     return true;
+   }
+ 
+   @Override
+   public void getMetrics(MetricsCollector collector, boolean all) {
+ 
+     log.trace("getMetrics called with collector: {}", collector);
+ 
+     FateMetricValues fateMetrics = metricValues.get();
+ 
+     long now = System.currentTimeMillis();
+ 
+     if ((lastUpdate + minimumRefreshDelay) < now) {
+ 
 -      fateMetrics = FateMetricValues.updateFromZookeeper(instanceId, 
fateMetrics);
++      fateMetrics = FateMetricValues.updateFromZookeeper(context, 
fateMetrics);
+ 
+       metricValues.set(fateMetrics);
+ 
+       lastUpdate = now;
+ 
+       // update individual gauges that are reported.
+       currentFateOps.set(fateMetrics.getCurrentFateOps());
+       zkChildFateOpsTotal.set(fateMetrics.getZkFateChildOpsTotal());
+       zkConnectionErrorsTotal.set(fateMetrics.getZkConnectionErrors());
+ 
+     }
+ 
+     // create the metrics record and publish to the registry.
+     MetricsRecordBuilder builder = 
collector.addRecord(RECORD).setContext(CONTEXT);
+     registry.snapshot(builder, all);
+ 
+   }
+ }

Reply via email to