This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f896c98c2356a52dfa2235d2cc02ae556ab17909 Merge: 6aff988 0a2457b Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Fri Mar 22 18:45:50 2019 -0400 Merge branch '1.9' .../org/apache/accumulo/core/conf/Property.java | 4 + .../java/org/apache/accumulo/master/Master.java | 178 +++++++++++---------- .../master/metrics/MasterMetricsFactory.java | 69 +++++++- .../master/metrics/fate/FateMetricValues.java | 176 ++++++++++++++++++++ .../accumulo/master/metrics/fate/FateMetrics.java | 146 +++++++++++++++++ .../master/metrics/fate/FateMetricsMBean.java | 27 ++++ .../master/metrics/fate/Metrics2FateMetrics.java | 128 +++++++++++++++ .../master/metrics/fate/FateMetricValuesTest.java | 73 +++++++++ .../resources/hadoop-metrics2-accumulo.properties | 59 +++++++ 9 files changed, 774 insertions(+), 86 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 9529b1a,ede2b8a..2d358e2 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -258,13 -322,17 +258,17 @@@ public enum Property + "completely quit. This delay gives it time before log recoveries begin."), MASTER_LEASE_RECOVERY_WAITING_PERIOD("master.lease.recovery.interval", "5s", PropertyType.TIMEDURATION, - "The amount of time to wait after requesting a WAL file to be recovered"), + "The amount of time to wait after requesting a write-ahead log to be recovered"), MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation", "org.apache.accumulo.server.master.recovery.HadoopLogCloser", PropertyType.CLASSNAME, - "A class that implements a mechansim to steal write access to a file"), + "A class that implements a mechanism to steal write access to a write-ahead log"), + MASTER_FATE_METRICS_ENABLED("master.fate.metrics.enabled", "false", PropertyType.BOOLEAN, + "Enable reporting of FATE metrics in JMX (and logging with Hadoop Metrics2"), + MASTER_FATE_METRICS_MIN_UPDATE_INTERVAL("master.fate.metrics.min.update.interval", "60s", + PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper to update interval"), MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT, - "The number of threads used to run FAult-Tolerant Executions. These are " - + "primarily table operations like merge."), + "The number of threads used to run fault-tolerant executions (FATE)." + + " These are primarily table operations like merge."), MASTER_REPLICATION_SCAN_INTERVAL("master.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep before scanning the status section of the " diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index a22471e,4b3d691..f8622de --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -126,9 -122,9 +126,8 @@@ import org.apache.accumulo.server.maste import org.apache.accumulo.server.master.state.TabletState; import org.apache.accumulo.server.master.state.ZooStore; import org.apache.accumulo.server.master.state.ZooTabletStateStore; - import org.apache.accumulo.server.metrics.Metrics; -import org.apache.accumulo.server.metrics.MetricsSystemHelper; import org.apache.accumulo.server.replication.ZooKeeperInitialization; -import org.apache.accumulo.server.rpc.RpcWrapper; +import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.accumulo.server.rpc.TServerUtils; @@@ -240,21 -234,21 +239,22 @@@ public class Maste static final boolean X = true; static final boolean O = false; // @formatter:off - static final boolean transitionOK[][] = { + static final boolean[][] transitionOK = { // INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP - /* INITIAL */ {X, X, O, O, O, O, X}, - /* HAVE_LOCK */ {O, X, X, X, O, O, X}, - /* SAFE_MODE */ {O, O, X, X, X, O, X}, - /* NORMAL */ {O, O, X, X, X, O, X}, - /* UNLOAD_METADATA_TABLETS */ {O, O, X, X, X, X, X}, - /* UNLOAD_ROOT_TABLET */ {O, O, O, X, X, X, X}, - /* STOP */ {O, O, O, O, O, X, X}}; + /* INITIAL */ {X, X, O, O, O, O, X}, + /* HAVE_LOCK */ {O, X, X, X, O, O, X}, + /* SAFE_MODE */ {O, O, X, X, X, O, X}, + /* NORMAL */ {O, O, X, X, X, O, X}, + /* UNLOAD_METADATA_TABLETS */ {O, O, X, X, X, X, X}, + /* UNLOAD_ROOT_TABLET */ {O, O, O, X, X, X, X}, + /* STOP */ {O, O, O, O, O, X, X}}; //@formatter:on synchronized void setMasterState(MasterState newState) { -- if (state.equals(newState)) ++ if (state.equals(newState)) { return; ++ } if (!transitionOK[state.ordinal()][newState.ordinal()]) { - log.error("Programmer error: master should not transition from " + state + " to " + newState); + log.error("Programmer error: master should not transition from {} to {}", state, newState); } MasterState oldState = state; state = newState; @@@ -262,15 -256,15 +262,11 @@@ if (newState == MasterState.STOP) { // Give the server a little time before shutdown so the client // thread requesting the stop can return -- SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() { -- @Override -- public void run() { -- // This frees the main thread and will cause the master to exit -- clientService.stop(); -- Master.this.nextEvent.event("stopped event loop"); -- } -- - }, 100l, 1000l); ++ SimpleTimer.getInstance(getConfiguration()).schedule(() -> { ++ // This frees the main thread and will cause the master to exit ++ clientService.stop(); ++ Master.this.nextEvent.event("stopped event loop"); + }, 100L, 1000L); } if (oldState != newState && (newState == MasterState.HAVE_LOCK)) { @@@ -314,10 -306,10 +310,11 @@@ } } -- if (location == null) ++ if (location == null) { throw new IllegalStateException("Failed to find root tablet"); ++ } - log.info("Upgrade setting root table location in zookeeper " + location); + log.info("Upgrade setting root table location in zookeeper {}", location); zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL); } } @@@ -404,17 -394,19 +401,18 @@@ } // create initial namespaces - String namespaces = ZooUtil.getRoot(getInstance()) + Constants.ZNAMESPACES; + String namespaces = getZooKeeperRoot() + Constants.ZNAMESPACES; zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP); - for (Pair<String,String> namespace : Iterables.concat( - Collections.singleton( - new Pair<>(Namespaces.ACCUMULO_NAMESPACE, Namespaces.ACCUMULO_NAMESPACE_ID)), - Collections.singleton( - new Pair<>(Namespaces.DEFAULT_NAMESPACE, Namespaces.DEFAULT_NAMESPACE_ID)))) { + for (Pair<String,NamespaceId> namespace : Iterables.concat( + Collections.singleton(new Pair<>(Namespace.ACCUMULO.name(), Namespace.ACCUMULO.id())), + Collections.singleton(new Pair<>(Namespace.DEFAULT.name(), Namespace.DEFAULT.id())))) { String ns = namespace.getFirst(); - String id = namespace.getSecond(); - log.debug("Upgrade creating namespace \"" + ns + "\" (ID: " + id + ")"); - if (!Namespaces.exists(getInstance(), id)) - TableManager.prepareNewNamespaceState(getInstance().getInstanceID(), id, ns, + NamespaceId id = namespace.getSecond(); + log.debug("Upgrade creating namespace \"{}\" (ID: {})", ns, id); - if (!Namespaces.exists(context, id)) ++ if (!Namespaces.exists(context, id)) { + TableManager.prepareNewNamespaceState(zoo, getInstanceID(), id, ns, NodeExistsPolicy.SKIP); ++ } } // create replication table in zk @@@ -645,38 -635,32 +643,39 @@@ return result; } - public void mustBeOnline(final String tableId) throws ThriftTableOperationException { - Tables.clearCache(getInstance()); - if (!Tables.getTableState(getInstance(), tableId).equals(TableState.ONLINE)) - throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, + public void mustBeOnline(final TableId tableId) throws ThriftTableOperationException { + Tables.clearCache(context); - if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE)) ++ if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE)) { + throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online"); ++ } } - public Master(ServerConfigurationFactory config, VolumeManager fs, String hostname) - throws IOException { - super(config); - this.serverConfig = config; - this.fs = fs; - this.hostname = hostname; + public ServerContext getContext() { + return context; + } - AccumuloConfiguration aconf = serverConfig.getConfiguration(); + public TableManager getTableManager() { + return context.getTableManager(); + } - log.info("Version " + Constants.VERSION); - log.info("Instance " + getInstance().getInstanceID()); - timeKeeper = new MasterTime(this); + public Master(ServerContext context) throws IOException { + this.context = context; + this.serverConfig = context.getServerConfFactory(); + this.fs = context.getVolumeManager(); + this.hostname = context.getHostname(); + + AccumuloConfiguration aconf = serverConfig.getSystemConfiguration(); + log.info("Version {}", Constants.VERSION); + log.info("Instance {}", getInstanceID()); + timeKeeper = new MasterTime(this); ThriftTransportPool.getInstance() .setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); - tserverSet = new LiveTServerSet(this, this); - this.tabletBalancer = aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, - TabletBalancer.class, new DefaultLoadBalancer()); - this.tabletBalancer.init(serverConfig); + tserverSet = new LiveTServerSet(context, this); + this.tabletBalancer = Property.createInstanceFromPropertyName(aconf, + Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()); + this.tabletBalancer.init(context); try { AccumuloVFSClassLoader.getContextManager() @@@ -733,13 -705,14 +732,14 @@@ return tserverSet.getConnection(server); } - public MergeInfo getMergeInfo(String tableId) { + public MergeInfo getMergeInfo(TableId tableId) { synchronized (mergeLock) { try { - String path = ZooUtil.getRoot(getInstance().getInstanceID()) + Constants.ZTABLES + "/" - + tableId + "/merge"; - if (!ZooReaderWriter.getInstance().exists(path)) + String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge"; - if (!context.getZooReaderWriter().exists(path)) ++ if (!context.getZooReaderWriter().exists(path)) { return new MergeInfo(); - byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat()); ++ } + byte[] data = context.getZooReaderWriter().getData(path, new Stat()); DataInputBuffer in = new DataInputBuffer(); in.reset(data, data.length); MergeInfo info = new MergeInfo(); @@@ -799,21 -774,21 +799,23 @@@ } MasterGoalState getMasterGoalState() { -- while (true) ++ while (true) { try { - byte[] data = ZooReaderWriter.getInstance() - .getData(ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_GOAL_STATE, null); + byte[] data = context.getZooReaderWriter() + .getData(getZooKeeperRoot() + Constants.ZMASTER_GOAL_STATE, null); return MasterGoalState.valueOf(new String(data)); } catch (Exception e) { - log.error("Problem getting real goal state from zookeeper: " + e); + log.error("Problem getting real goal state from zookeeper: ", e); sleepUninterruptibly(1, TimeUnit.SECONDS); } ++ } } public boolean hasCycled(long time) { for (TabletGroupWatcher watcher : watchers) { -- if (watcher.stats.lastScanFinished() < time) ++ if (watcher.stats.lastScanFinished() < time) { return false; ++ } } return true; @@@ -825,7 -806,7 +827,7 @@@ } } -- static enum TabletGoalState { ++ enum TabletGoalState { HOSTED(TUnloadTabletGoal.UNKNOWN), UNASSIGNED(TUnloadTabletGoal.UNASSIGNED), DELETED(TUnloadTabletGoal.DELETED), @@@ -850,12 -831,12 +852,14 @@@ case HAVE_LOCK: // fall-through intended case INITIAL: // fall-through intended case SAFE_MODE: -- if (tls.extent.isMeta()) ++ if (tls.extent.isMeta()) { return TabletGoalState.HOSTED; ++ } return TabletGoalState.UNASSIGNED; case UNLOAD_METADATA_TABLETS: -- if (tls.extent.isRootTablet()) ++ if (tls.extent.isRootTablet()) { return TabletGoalState.HOSTED; ++ } return TabletGoalState.UNASSIGNED; case UNLOAD_ROOT_TABLET: return TabletGoalState.UNASSIGNED; @@@ -867,9 -848,9 +871,10 @@@ } TabletGoalState getTableGoalState(KeyExtent extent) { - TableState tableState = TableManager.getInstance().getTableState(extent.getTableId()); - if (tableState == null) + TableState tableState = context.getTableManager().getTableState(extent.getTableId()); - if (tableState == null) ++ if (tableState == null) { return TabletGoalState.DELETED; ++ } switch (tableState) { case DELETING: return TabletGoalState.DELETED; @@@ -902,11 -883,11 +907,13 @@@ return TabletGoalState.HOSTED; case WAITING_FOR_CHOPPED: if (tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) { -- if (tls.chopped) ++ if (tls.chopped) { return TabletGoalState.UNASSIGNED; ++ } } else { -- if (tls.chopped && tls.walogs.isEmpty()) ++ if (tls.chopped && tls.walogs.isEmpty()) { return TabletGoalState.UNASSIGNED; ++ } } return TabletGoalState.HOSTED; @@@ -1035,30 -1016,30 +1042,33 @@@ int count = nonMetaDataTabletsAssignedOrHosted(); log.debug( String.format("There are %d non-metadata tablets assigned or hosted", count)); -- if (count == 0 && goodStats()) ++ if (count == 0 && goodStats()) { setMasterState(MasterState.UNLOAD_METADATA_TABLETS); ++ } } break; case UNLOAD_METADATA_TABLETS: { int count = assignedOrHosted(MetadataTable.ID); log.debug( String.format("There are %d metadata tablets assigned or hosted", count)); -- if (count == 0 && goodStats()) ++ if (count == 0 && goodStats()) { setMasterState(MasterState.UNLOAD_ROOT_TABLET); ++ } } break; - case UNLOAD_ROOT_TABLET: { + case UNLOAD_ROOT_TABLET: int count = assignedOrHosted(MetadataTable.ID); if (count > 0 && goodStats()) { log.debug(String.format("%d metadata tablets online", count)); setMasterState(MasterState.UNLOAD_ROOT_TABLET); } int root_count = assignedOrHosted(RootTable.ID); -- if (root_count > 0 && goodStats()) ++ if (root_count > 0 && goodStats()) { log.debug("The root tablet is still assigned or hosted"); ++ } if (count + root_count == 0 && goodStats()) { Set<TServerInstance> currentServers = tserverSet.getCurrentServers(); - log.debug("stopping " + currentServers.size() + " tablet servers"); + log.debug("stopping {} tablet servers", currentServers.size()); for (TServerInstance server : currentServers) { try { serversToShutdown.add(server); @@@ -1069,9 -1050,10 +1079,10 @@@ tserverSet.remove(server); } } -- if (currentServers.size() == 0) ++ if (currentServers.size() == 0) { setMasterState(MasterState.STOP); ++ } } - } break; default: break; @@@ -1134,11 -1117,12 +1145,12 @@@ } } if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) { - log.warn( - "Tablet server " + instance + " exceeded maximum hold time: attempting to kill it"); + log.warn("Tablet server {} exceeded maximum hold time: attempting to kill it", instance); try { TServerConnection connection = tserverSet.getConnection(instance); -- if (connection != null) ++ if (connection != null) { connection.fastHalt(masterLock); ++ } } catch (TException e) { log.error("{}", e.getMessage(), e); } @@@ -1189,39 -1173,39 +1201,37 @@@ // unresponsive tservers. sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS); } -- tp.submit(new Runnable() { -- @Override -- public void run() { ++ tp.submit(() -> { ++ try { ++ Thread t = Thread.currentThread(); ++ String oldName = t.getName(); try { -- Thread t = Thread.currentThread(); -- String oldName = t.getName(); -- try { -- t.setName("Getting status from " + server); -- TServerConnection connection = tserverSet.getConnection(server); -- if (connection == null) -- throw new IOException("No connection to " + server); -- TabletServerStatus status = connection.getTableMap(false); -- result.put(server, status); -- } finally { -- t.setName(oldName); ++ t.setName("Getting status from " + server); ++ TServerConnection connection1 = tserverSet.getConnection(server); ++ if (connection1 == null) { ++ throw new IOException("No connection to " + server); } -- } catch (Exception ex) { - log.error("unable to get tablet server status {} {}", server, ex.toString()); - log.debug("unable to get tablet server status {}", server, ex); - log.error("unable to get tablet server status " + server + " " + ex.toString()); - log.debug("unable to get tablet server status " + server, ex); -- if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { - log.warn("attempting to stop {}", server); - log.warn("attempting to stop " + server); -- try { -- TServerConnection connection = tserverSet.getConnection(server); -- if (connection != null) { -- connection.halt(masterLock); -- } -- } catch (TTransportException e) { -- // ignore: it's probably down -- } catch (Exception e) { - log.info("error talking to troublesome tablet server", e); - log.info("error talking to troublesome tablet server ", e); ++ TabletServerStatus status = connection1.getTableMap(false); ++ result.put(server, status); ++ } finally { ++ t.setName(oldName); ++ } ++ } catch (Exception ex) { ++ log.error("unable to get tablet server status {} {}", server, ex.toString()); ++ log.debug("unable to get tablet server status {}", server, ex); ++ if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) { ++ log.warn("attempting to stop {}", server); ++ try { ++ TServerConnection connection2 = tserverSet.getConnection(server); ++ if (connection2 != null) { ++ connection2.halt(masterLock); } -- badServers.remove(server); ++ } catch (TTransportException e1) { ++ // ignore: it's probably down ++ } catch (Exception e2) { ++ log.info("error talking to troublesome tablet server", e2); } ++ badServers.remove(server); } } }); @@@ -1347,14 -1310,16 +1357,8 @@@ fate = new Fate<>(this, store); fate.startTransactionRunners(threads); -- SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() { -- -- @Override -- public void run() { -- store.ageOff(); -- } -- }, 63000, 63000); - } catch (KeeperException e) { - throw new IOException(e); - } catch (InterruptedException e) { ++ SimpleTimer.getInstance(getConfiguration()).schedule(() -> store.ageOff(), 63000, 63000); + } catch (KeeperException | InterruptedException e) { throw new IOException(e); } @@@ -1387,94 -1366,77 +1391,98 @@@ sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } - // Start the daemon to scan the replication table and make units of work - replicationWorkDriver = new ReplicationDriver(this); - replicationWorkDriver.start(); + // if the replication name is ever set, then start replication services + final AtomicReference<TServer> replServer = new AtomicReference<>(); + SimpleTimer.getInstance(getConfiguration()).schedule(() -> { + try { + if (replServer.get() == null) { + if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) { + log.info(Property.REPLICATION_NAME.getKey() + " was set, starting repl services."); + replServer.set(setupReplication()); + } + } + } catch (UnknownHostException | KeeperException | InterruptedException e) { + log.error("Error occurred starting replication services. ", e); + } + }, 0, 5000); - // Start the daemon to assign work to tservers to replicate to our peers - try { - replicationWorkAssigner = new WorkDriver(this); - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Caught exception trying to initialize replication WorkDriver", e); - throw new RuntimeException(e); + // The master is fully initialized. Clients are allowed to connect now. + masterInitialized.set(true); + + while (clientService.isServing()) { + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); } - replicationWorkAssigner.start(); + log.info("Shutting down fate."); + fate.shutdown(); + + log.info("Shutting down timekeeping."); + timeKeeper.shutdown(); + + final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; + statusThread.join(remaining(deadline)); - if (replicationWorkAssigner != null) ++ if (replicationWorkAssigner != null) { + replicationWorkAssigner.join(remaining(deadline)); - if (replicationWorkDriver != null) ++ } ++ if (replicationWorkDriver != null) { + replicationWorkDriver.join(remaining(deadline)); ++ } + TServerUtils.stopTServer(replServer.get()); + + // Signal that we want it to stop, and wait for it to do so. + if (authenticationTokenKeyManager != null) { + authenticationTokenKeyManager.gracefulStop(); + authenticationTokenKeyManager.join(remaining(deadline)); + } + + // quit, even if the tablet servers somehow jam up and the watchers + // don't stop + for (TabletGroupWatcher watcher : watchers) { + watcher.join(remaining(deadline)); + } + log.info("exiting"); + } + private TServer setupReplication() + throws UnknownHostException, KeeperException, InterruptedException { // Start the replication coordinator which assigns tservers to service replication requests MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); + ReplicationCoordinator.Iface haReplicationProxy = HighlyAvailableServiceWrapper.service(impl, + this); // @formatter:off ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = - new ReplicationCoordinator.Processor<>( + new ReplicationCoordinator.Processor<>(TraceUtil.wrapService(haReplicationProxy)); // @formatter:on - RpcWrapper.service(impl, - new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl))); - ServerAddress replAddress = TServerUtils.startServer(this, hostname, + ServerAddress replAddress = TServerUtils.startServer(context, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); log.info("Started replication coordinator service at " + replAddress.address); + // Start the daemon to scan the replication table and make units of work + replicationWorkDriver = new ReplicationDriver(this); + replicationWorkDriver.start(); + + // Start the daemon to assign work to tservers to replicate to our peers + replicationWorkAssigner = new WorkDriver(this); + replicationWorkAssigner.start(); // Advertise that port we used so peers don't have to be told what it is - ZooReaderWriter.getInstance().putPersistentData( - ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, + context.getZooReaderWriter().putPersistentData( + getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); - // Register replication metrics + // Register metrics modules MasterMetricsFactory factory = new MasterMetricsFactory(getConfiguration(), this); - Metrics replicationMetrics = factory.createReplicationMetrics(); - try { - replicationMetrics.register(); - } catch (Exception e) { - log.error("Failed to register replication metrics", e); + + int failureCount = factory.register(); + + if (failureCount > 0) { + log.info("Failed to register {} metrics modules", failureCount); + } else { + log.info("All metrics modules registered"); } - - while (clientService.isServing()) { - sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - } - log.info("Shutting down fate."); - fate.shutdown(); - - log.info("Shutting down timekeeping."); - timeKeeper.shutdown(); - - final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; - statusThread.join(remaining(deadline)); - replicationWorkAssigner.join(remaining(deadline)); - replicationWorkDriver.join(remaining(deadline)); - replAddress.server.stop(); - // Signal that we want it to stop, and wait for it to do so. - if (authenticationTokenKeyManager != null) { - authenticationTokenKeyManager.gracefulStop(); - authenticationTokenKeyManager.join(remaining(deadline)); - } - - // quit, even if the tablet servers somehow jam up and the watchers - // don't stop - for (TabletGroupWatcher watcher : watchers) { - watcher.join(remaining(deadline)); - } - log.info("exiting"); + return replAddress.server; } private long remaining(long deadline) { @@@ -1498,12 -1460,12 +1506,7 @@@ @Override public void unableToMonitorLockNode(final Throwable e) { // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility -- Halt.halt(-1, new Runnable() { -- @Override -- public void run() { -- log.error("FATAL: No longer able to monitor master lock node", e); -- } -- }); ++ Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor master lock node", e)); } @@@ -1596,19 -1569,19 +1599,22 @@@ Set<TServerInstance> added) { // if we have deleted or added tservers, then adjust our dead server list if (!deleted.isEmpty() || !added.isEmpty()) { - DeadServerList obit = new DeadServerList( - ZooUtil.getRoot(getInstance()) + Constants.ZDEADTSERVERS); + DeadServerList obit = new DeadServerList(context, + getZooKeeperRoot() + Constants.ZDEADTSERVERS); if (added.size() > 0) { - log.info("New servers: " + added); - for (TServerInstance up : added) + log.info("New servers: {}", added); - for (TServerInstance up : added) ++ for (TServerInstance up : added) { obit.delete(up.hostPort()); ++ } } for (TServerInstance dead : deleted) { String cause = "unexpected failure"; -- if (serversToShutdown.contains(dead)) ++ if (serversToShutdown.contains(dead)) { cause = "clean shutdown"; // maybe an incorrect assumption -- if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) ++ } ++ if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) { obit.post(dead.hostPort(), cause); ++ } } Set<TServerInstance> unexpected = new HashSet<>(deleted); @@@ -1682,22 -1655,22 +1688,25 @@@ public void sessionExpired() {} @Override - public Set<String> onlineTables() { - Set<String> result = new HashSet<>(); + public Set<TableId> onlineTables() { + Set<TableId> result = new HashSet<>(); if (getMasterState() != MasterState.NORMAL) { -- if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS) ++ if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS) { result.add(MetadataTable.ID); -- if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET) ++ } ++ if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET) { result.add(RootTable.ID); ++ } return result; } - TableManager manager = TableManager.getInstance(); + TableManager manager = context.getTableManager(); - for (String tableId : Tables.getIdToNameMap(getInstance()).keySet()) { + for (TableId tableId : Tables.getIdToNameMap(context).keySet()) { TableState state = manager.getTableState(tableId); if (state != null) { -- if (state == TableState.ONLINE) ++ if (state == TableState.ONLINE) { result.add(tableId); ++ } } } return result; @@@ -1788,10 -1760,11 +1797,11 @@@ result.unassignedTablets = displayUnassigned(); result.serversShuttingDown = new HashSet<>(); synchronized (serversToShutdown) { -- for (TServerInstance server : serversToShutdown) ++ for (TServerInstance server : serversToShutdown) { result.serversShuttingDown.add(server.hostPort()); ++ } } - DeadServerList obit = new DeadServerList( - ZooUtil.getRoot(getInstance()) + Constants.ZDEADTSERVERS); + DeadServerList obit = new DeadServerList(context, getZooKeeperRoot() + Constants.ZDEADTSERVERS); result.deadTabletServers = obit.getList(); result.bulkImports = bulkImportStatus.getBulkLoadStatus(); return result; diff --cc server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java index 978f36c,3b6ad33..7001d82 --- a/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java @@@ -51,4 -108,13 +108,12 @@@ public class MasterMetricsFactory return new Metrics2ReplicationMetrics(master, metricsSystem); } + private Metrics createFateMetrics() { - String id = master.getInstance().getInstanceID(); + if (useOldMetrics) { - return new FateMetrics(id, fateMinUpdateInterval); ++ return new FateMetrics(master.getContext(), fateMinUpdateInterval); + } + - return new Metrics2FateMetrics(id, metricsSystem, fateMinUpdateInterval); ++ return new Metrics2FateMetrics(master.getContext(), metricsSystem, fateMinUpdateInterval); + } + } diff --cc server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java index 0000000,b776f57..b5bc0c7 mode 000000,100644..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java @@@ -1,0 -1,177 +1,176 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.master.metrics.fate; + + import java.util.List; + + import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.zookeeper.ZooUtil; + import org.apache.accumulo.fate.AdminUtil; + import org.apache.accumulo.fate.ZooStore; + import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; -import org.apache.accumulo.server.zookeeper.ZooReaderWriter; ++import org.apache.accumulo.server.ServerContext; + import org.apache.zookeeper.KeeperException; + import org.apache.zookeeper.data.Stat; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Immutable class that holds a snapshot of fate metric values - use builder to instantiate + * instance. + */ + class FateMetricValues { + + private static final Logger log = LoggerFactory.getLogger(FateMetrics.class); + + private final long updateTime; + private final long currentFateOps; + private final long zkFateChildOpsTotal; + private final long zkConnectionErrors; + + private FateMetricValues(final long updateTime, final long currentFateOps, + final long zkFateChildOpsTotal, final long zkConnectionErrors) { + this.updateTime = updateTime; + this.currentFateOps = currentFateOps; + this.zkFateChildOpsTotal = zkFateChildOpsTotal; + this.zkConnectionErrors = zkConnectionErrors; + } + + long getCurrentFateOps() { + return currentFateOps; + } + + long getZkFateChildOpsTotal() { + return zkFateChildOpsTotal; + } + + long getZkConnectionErrors() { + return zkConnectionErrors; + } + + /** + * Update FateMetricValues, populating with current values and the overwritting new values, this + * preserves previous values in case an error or exception prevents the values from being + * completely populated, this form may be more suitable for metric counters. + * - * @param instanceId - * Accumulo instanceId ++ * @param context ++ * the server's context + * @param currentValues + * the current fate metrics used as default + * @return populated metrics values + */ - static FateMetricValues updateFromZookeeper(final String instanceId, ++ static FateMetricValues updateFromZookeeper(final ServerContext context, + final FateMetricValues currentValues) { - return updateFromZookeeper(instanceId, FateMetricValues.builder().copy(currentValues)); ++ return updateFromZookeeper(context, FateMetricValues.builder().copy(currentValues)); + } + + /** + * Update the FATE metric values from zookeeepr - the builder is expected to have the desired + * default values (either 0, or the previous value). + * - * @param instanceId - * Accumulo instanceId ++ * @param context ++ * the server's context + * @param builder + * value builder, populated with defaults. + * @return an immutable instance of FateMetricsValues. + */ - private static FateMetricValues updateFromZookeeper(final String instanceId, ++ private static FateMetricValues updateFromZookeeper(final ServerContext context, + final FateMetricValues.Builder builder) { + + AdminUtil<String> admin = new AdminUtil<>(false); + + try { + - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zoo); ++ IZooReaderWriter zoo = context.getZooReaderWriter(); ++ ZooStore<String> zs = new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zoo); + + List<AdminUtil.TransactionStatus> currFates = admin.getTransactionStatus(zs, null, null); + builder.withCurrentFateOps(currFates.size()); + - Stat node = zoo.getZooKeeper().exists(ZooUtil.getRoot(instanceId) + Constants.ZFATE, false); ++ Stat node = zoo.getZooKeeper().exists(context.getZooKeeperRoot() + Constants.ZFATE, false); + builder.withZkFateChildOpsTotal(node.getCversion()); + + if (log.isTraceEnabled()) { + log.trace( + "ZkNodeStat: {czxid: {}, mzxid: {}, pzxid: {}, ctime: {}, mtime: {}, " + + "version: {}, cversion: {}, num children: {}", + node.getCzxid(), node.getMzxid(), node.getPzxid(), node.getCtime(), node.getMtime(), + node.getVersion(), node.getCversion(), node.getNumChildren()); + } + } catch (KeeperException ex) { + log.debug("Error connecting to ZooKeeper", ex); + builder.incrZkConnectionErrors(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + return builder.build(); + } + + @Override + public String toString() { + return "FateMetricValues{" + "updateTime=" + updateTime + ", currentFateOps=" + currentFateOps + + ", zkFateChildOpsTotal=" + zkFateChildOpsTotal + ", zkConnectionErrors=" + + zkConnectionErrors + '}'; + } + + public static Builder builder() { + return new Builder(); + } + + static class Builder { + + private long currentFateOps = 0; + private long zkFateChildOpsTotal = 0; + private long zkConnectionErrors = 0; + + Builder copy(final FateMetricValues prevValues) { + + if (prevValues == null) { + return new Builder(); + } + + return new Builder().withCurrentFateOps(prevValues.getCurrentFateOps()) + .withZkFateChildOpsTotal(prevValues.getZkFateChildOpsTotal()) + .withZkConnectionErrors(prevValues.getZkConnectionErrors()); + } + + Builder withCurrentFateOps(final long value) { + this.currentFateOps = value; + return this; + } + + Builder withZkFateChildOpsTotal(final long value) { + this.zkFateChildOpsTotal = value; + return this; + } + + Builder incrZkConnectionErrors() { + this.zkConnectionErrors += 1L; + return this; + } + + Builder withZkConnectionErrors(final long value) { + this.zkConnectionErrors = value; + return this; + } + + FateMetricValues build() { + return new FateMetricValues(System.currentTimeMillis(), currentFateOps, zkFateChildOpsTotal, + zkConnectionErrors); + } + } + } diff --cc server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java index 0000000,934d803..de7cb8c mode 000000,100644..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java @@@ -1,0 -1,144 +1,146 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.master.metrics.fate; + + import java.lang.management.ManagementFactory; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + + import javax.management.MBeanServer; + import javax.management.ObjectName; + ++import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.server.metrics.Metrics; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Basic implementation of fate metrics that publish to JMX when legacy metrics enabled. For logging + * in addition to JMX, use the hadoop metrics2 implementation. The measurement type and values + * provided are: + * <ul> + * <li>gauge - current count of FATE transactions in progress</li> + * <li>gauge - last zookeeper id that modified FATE root path to provide estimate of fate + * transaction liveliness</li> + * <li>counter - the number of zookeeper connection errors since process started.</li> + * </ul> + * Implementation notes: + * <p> + * The fate operation estimate is based on zookeeper Stat structure and the property of pzxid. From + * the zookeeper developer's guide: pzxid is "The zxid of the change that last modified children of + * this znode." The pzxid should then change each time a FATE transaction is created or deleted - + * and the zookeeper id (zxid) is expected to continuously increase because the zookeeper id is used + * by zookeeper for ordering operations. + */ + public class FateMetrics implements Metrics, FateMetricsMBean { + + private static final Logger log = LoggerFactory.getLogger(FateMetrics.class); + + // limit calls to update fate counters to guard against hammering zookeeper. + private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(10); + + private volatile long minimumRefreshDelay; + + private final AtomicReference<FateMetricValues> metricValues; + + private volatile long lastUpdate = 0; + - private final String instanceId; ++ private final ServerContext context; + + private ObjectName objectName = null; + + private volatile boolean enabled = false; + - public FateMetrics(final String instanceId, final long minimumRefreshDelay) { ++ public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { + - this.instanceId = instanceId; ++ this.context = context; + + this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); + + metricValues = new AtomicReference<>(FateMetricValues.builder().build()); + + try { + objectName = new ObjectName( + "accumulo.server.metrics:service=FateMetrics,name=FateMetricsMBean,instance=" + + Thread.currentThread().getName()); + } catch (Exception e) { + log.error("Exception setting MBean object name", e); + } + } + + @Override + public void register() throws Exception { + // Register this object with the MBeanServer + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - if (null == objectName) ++ if (null == objectName) { + throw new IllegalArgumentException("MBean object name must be set."); ++ } + mbs.registerMBean(this, objectName); + enabled = true; + } + + @Override + public void add(String name, long time) { + throw new UnsupportedOperationException("add() is not implemented"); + } + + /** + * Update the metric values from zookeeper after minimumRefreshDelay has expired. + */ + public synchronized FateMetricValues snapshot() { + + FateMetricValues current = metricValues.get(); + + long now = System.currentTimeMillis(); + + if ((lastUpdate + minimumRefreshDelay) > now) { + return current; + } + - FateMetricValues updates = FateMetricValues.updateFromZookeeper(instanceId, current); ++ FateMetricValues updates = FateMetricValues.updateFromZookeeper(context, current); + + metricValues.set(updates); + + lastUpdate = now; + + return updates; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public long getCurrentFateOps() { + snapshot(); + return metricValues.get().getCurrentFateOps(); + } + + @Override + public long getZkFateChildOpsTotal() { + snapshot(); + return metricValues.get().getZkFateChildOpsTotal(); + } + + @Override + public long getZKConnectionErrorsTotal() { + snapshot(); + return metricValues.get().getZkConnectionErrors(); + } + + } diff --cc server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java index 0000000,09997ec..2bf80ad mode 000000,100644..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java @@@ -1,0 -1,127 +1,128 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.master.metrics.fate; + + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + ++import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.server.metrics.Metrics; + import org.apache.accumulo.server.metrics.MetricsSystemHelper; + import org.apache.hadoop.metrics2.MetricsCollector; + import org.apache.hadoop.metrics2.MetricsRecordBuilder; + import org.apache.hadoop.metrics2.MetricsSource; + import org.apache.hadoop.metrics2.MetricsSystem; + import org.apache.hadoop.metrics2.impl.MsInfo; + import org.apache.hadoop.metrics2.lib.Interns; + import org.apache.hadoop.metrics2.lib.MetricsRegistry; + import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class Metrics2FateMetrics implements Metrics, MetricsSource { + + private static final Logger log = LoggerFactory.getLogger(Metrics2FateMetrics.class); + + // limit calls to update fate counters to guard against hammering zookeeper. + private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(10); + + private volatile long minimumRefreshDelay; + + public static final String NAME = MASTER_NAME + ",sub=Fate"; + public static final String DESCRIPTION = "Fate Metrics"; + public static final String CONTEXT = "master"; + public static final String RECORD = "fate"; + public static final String CUR_FATE_OPS = "currentFateOps"; + public static final String TOTAL_FATE_OPS = "totalFateOps"; + public static final String TOTAL_ZK_CONN_ERRORS = "totalZkConnErrors"; + - private final String instanceId; ++ private final ServerContext context; + private final MetricsSystem metricsSystem; + private final MetricsRegistry registry; + private final MutableGaugeLong currentFateOps; + private final MutableGaugeLong zkChildFateOpsTotal; + private final MutableGaugeLong zkConnectionErrorsTotal; + + private final AtomicReference<FateMetricValues> metricValues; + + private volatile long lastUpdate = 0; + - public Metrics2FateMetrics(final String instanceId, MetricsSystem metricsSystem, ++ public Metrics2FateMetrics(final ServerContext context, MetricsSystem metricsSystem, + final long minimumRefreshDelay) { + - this.instanceId = instanceId; ++ this.context = context; + + this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); + - metricValues = new AtomicReference<>(FateMetricValues.updateFromZookeeper(instanceId, null)); ++ metricValues = new AtomicReference<>(FateMetricValues.updateFromZookeeper(context, null)); + + this.metricsSystem = metricsSystem; + this.registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION)); + this.registry.tag(MsInfo.ProcessName, MetricsSystemHelper.getProcessName()); + + currentFateOps = registry.newGauge(CUR_FATE_OPS, "Current number of FATE Ops", 0L); + zkChildFateOpsTotal = registry.newGauge(TOTAL_FATE_OPS, "Total FATE Ops", 0L); + zkConnectionErrorsTotal = registry.newGauge(TOTAL_ZK_CONN_ERRORS, "Total ZK Connection Errors", + 0L); + + } + + @Override + public void register() { + metricsSystem.register(NAME, DESCRIPTION, this); + } + + @Override + public void add(String name, long time) { + throw new UnsupportedOperationException("add() is not implemented"); + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + + log.trace("getMetrics called with collector: {}", collector); + + FateMetricValues fateMetrics = metricValues.get(); + + long now = System.currentTimeMillis(); + + if ((lastUpdate + minimumRefreshDelay) < now) { + - fateMetrics = FateMetricValues.updateFromZookeeper(instanceId, fateMetrics); ++ fateMetrics = FateMetricValues.updateFromZookeeper(context, fateMetrics); + + metricValues.set(fateMetrics); + + lastUpdate = now; + + // update individual gauges that are reported. + currentFateOps.set(fateMetrics.getCurrentFateOps()); + zkChildFateOpsTotal.set(fateMetrics.getZkFateChildOpsTotal()); + zkConnectionErrorsTotal.set(fateMetrics.getZkConnectionErrors()); + + } + + // create the metrics record and publish to the registry. + MetricsRecordBuilder builder = collector.addRecord(RECORD).setContext(CONTEXT); + registry.snapshot(builder, all); + + } + }