This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new d24cdb5  Improve master metrics rported (#1536)
d24cdb5 is described below

commit d24cdb5059299bb4b61ea0917e94d01a2679f6c1
Author: EdColeman <d...@etcoleman.com>
AuthorDate: Fri Mar 6 15:16:32 2020 -0500

    Improve master metrics rported (#1536)
    
    Adds FATE transaction state and op types reported via hadoop metrics2
    Improves metrics tests
    Add IT to verify that enabling legacy metrics does not crash master
    Added ZooReaderWriter getInstance() for testing.
---
 .../accumulo/server/zookeeper/ZooReaderWriter.java |  23 ++
 server/master/pom.xml                              |   4 +
 .../master/metrics/MasterMetricsFactory.java       |  14 +-
 .../master/metrics/ReplicationMetricsMBean.java    |   6 +-
 .../master/metrics/fate/FateHadoop2Metrics.java    | 210 ++++++++++++
 ...{FateMetrics.java => FateLegacyJMXMetrics.java} |  31 +-
 ...csMBean.java => FateLegacyJMXMetricsMBean.java} |   8 +-
 .../master/metrics/fate/FateMetricSnapshot.java    | 250 +++++++++++++++
 .../master/metrics/fate/FateMetricValues.java      | 177 ----------
 .../master/metrics/fate/Metrics2FateMetrics.java   | 127 --------
 ...ValuesTest.java => FateMetricSnapshotTest.java} |  12 +-
 .../master/metrics/fate/FateMetricsTest.java       | 327 +++++++++++++++++++
 .../master/metrics/fate/InMemTestCollector.java    | 146 +++++++++
 .../metrics/fate/ZooKeeperTestingServer.java       | 146 +++++++++
 .../src/test/resources/conf/accumulo-site.xml      | 116 +++++++
 .../src/test/resources/conf/generic_logger.xml     |  83 +++++
 .../src/test/resources/conf/monitor_logger.xml     |  64 ++++
 .../resources/hadoop-metrics2-accumulo.properties  |  56 ++++
 .../accumulo/test/functional/GcMetricsIT.java      |  18 +-
 .../accumulo/test/functional/LegacyMetricsIT.java  | 108 +++++++
 .../accumulo/test/functional/MasterMetricsIT.java  | 355 +++++++++++++++++++++
 .../accumulo/test/metrics/MetricsFileTailer.java   |  97 ++++++
 22 files changed, 2040 insertions(+), 338 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
index 2a9b908..3fe85f6 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
@@ -22,6 +22,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ZooReaderWriter extends 
org.apache.accumulo.fate.zookeeper.ZooReaderWriter {
   private static final String SCHEME = "digest";
   private static final String USER = "accumulo";
@@ -41,4 +43,25 @@ public class ZooReaderWriter extends 
org.apache.accumulo.fate.zookeeper.ZooReade
     return instance;
   }
 
+  /**
+   * This method exposes the zookeeper connection parameters when 
instantiating a cached singleton
+   * ZooReaderWriter instance for later use in testing. The instance will only 
be created once,
+   * otherwise the previously cached instance is returned.
+   *
+   * @param zooConnString
+   *          a zookeeper connection string of host:port pair of server(s)
+   * @param timeInMillis
+   *          the zooKeeper connection timeout
+   * @param secret
+   *          the Accumulo instance secret
+   * @return a ZooReaderWriter instance either created or previously cached.
+   */
+  @VisibleForTesting
+  public static synchronized ZooReaderWriter getInstance(String zooConnString, 
int timeInMillis,
+      String secret) {
+    if (instance == null) {
+      instance = new ZooReaderWriter(zooConnString, timeInMillis, secret);
+    }
+    return instance;
+  }
 }
diff --git a/server/master/pom.xml b/server/master/pom.xml
index 2dcb71d..107f126 100644
--- a/server/master/pom.xml
+++ b/server/master/pom.xml
@@ -61,6 +61,10 @@
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.htrace</groupId>
       <artifactId>htrace-core</artifactId>
     </dependency>
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
index 3b6ad33..ce7a97b 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java
@@ -21,8 +21,8 @@ import static java.util.Objects.requireNonNull;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.master.Master;
-import org.apache.accumulo.master.metrics.fate.FateMetrics;
-import org.apache.accumulo.master.metrics.fate.Metrics2FateMetrics;
+import org.apache.accumulo.master.metrics.fate.FateHadoop2Metrics;
+import org.apache.accumulo.master.metrics.fate.FateLegacyJMXMetrics;
 import org.apache.accumulo.server.metrics.Metrics;
 import org.apache.accumulo.server.metrics.MetricsSystemHelper;
 import org.apache.hadoop.metrics2.MetricsSystem;
@@ -109,12 +109,18 @@ public class MasterMetricsFactory {
   }
 
   private Metrics createFateMetrics() {
+
     String id = master.getInstance().getInstanceID();
+
     if (useOldMetrics) {
-      return new FateMetrics(id, fateMinUpdateInterval);
+      if (enableFateMetrics) {
+        log.warn(
+            "Enhanced FATE metrics (Transaction and OpType counts) unavailable 
when using legacy metrics.");
+      }
+      return new FateLegacyJMXMetrics(id, fateMinUpdateInterval);
     }
 
-    return new Metrics2FateMetrics(id, metricsSystem, fateMinUpdateInterval);
+    return new FateHadoop2Metrics(id, metricsSystem, fateMinUpdateInterval);
   }
 
 }
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
index a29e5b4..675321a 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetricsMBean.java
@@ -27,7 +27,7 @@ public interface ReplicationMetricsMBean {
    *
    * @return The number of files pending replication across all targets
    */
-  public int getNumFilesPendingReplication();
+  int getNumFilesPendingReplication();
 
   /**
    * The total number of threads available to replicate data to peers. Each 
TabletServer has a
@@ -36,7 +36,7 @@ public interface ReplicationMetricsMBean {
    *
    * @return The number of threads available to replicate data across the 
instance
    */
-  public int getMaxReplicationThreads();
+  int getMaxReplicationThreads();
 
   /**
    * Peers are systems which data can be replicated to. This is the number of 
peers that are
@@ -45,6 +45,6 @@ public interface ReplicationMetricsMBean {
    *
    * @return The number of peers/targets which are defined for data to be 
replicated to.
    */
-  public int getNumConfiguredPeers();
+  int getNumConfiguredPeers();
 
 }
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateHadoop2Metrics.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateHadoop2Metrics.java
new file mode 100644
index 0000000..3eccc0e
--- /dev/null
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateHadoop2Metrics.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.metrics.fate;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.accumulo.server.metrics.MetricsSystemHelper;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MsInfo;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FateHadoop2Metrics implements Metrics, MetricsSource {
+
+  private static final Logger log = 
LoggerFactory.getLogger(FateHadoop2Metrics.class);
+
+  // limit calls to update fate counters to guard against hammering zookeeper.
+  private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(10);
+
+  private volatile long minimumRefreshDelay;
+
+  // metrics tag / labels
+  public static final String NAME = MASTER_NAME + ",sub=Fate";
+  public static final String DESCRIPTION = "Fate Metrics";
+  public static final String CONTEXT = "master";
+  public static final String RECORD = "fate";
+
+  // metric value names
+  public static final String CUR_FATE_OPS = "currentFateOps";
+  public static final String TOTAL_FATE_OPS = "totalFateOps";
+  public static final String TOTAL_ZK_CONN_ERRORS = "totalZkConnErrors";
+  private static final String FATE_TX_STATE_METRIC_PREFIX = "FateTxState_";
+  private static final String FATE_OP_TYPE_METRIC_PREFIX = "FateTxOpType_";
+
+  // metric values
+  private final MutableGaugeLong currentFateOps;
+  private final MutableGaugeLong zkChildFateOpsTotal;
+  private final MutableGaugeLong zkConnectionErrorsTotal;
+  private final Map<String,MutableGaugeLong> fateTypeCounts = new TreeMap<>();
+  private final Map<String,MutableGaugeLong> fateOpCounts = new TreeMap<>();
+
+  private FateMetricSnapshot metricSnapshot;
+
+  private final String instanceId;
+  private final MetricsSystem metricsSystem;
+  private final MetricsRegistry registry;
+
+  private final Lock metricsValuesLock = new ReentrantLock();
+  private volatile long lastUpdate = 0;
+
+  private final ZooReaderWriter zoo;
+
+  public FateHadoop2Metrics(final String instanceId, final MetricsSystem 
metricsSystem,
+      final long minimumRefreshDelay) {
+
+    this.instanceId = instanceId;
+
+    zoo = ZooReaderWriter.getInstance();
+
+    this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, 
minimumRefreshDelay);
+
+    this.metricsSystem = metricsSystem;
+    this.registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION));
+    this.registry.tag(MsInfo.ProcessName, 
MetricsSystemHelper.getProcessName());
+
+    currentFateOps = registry.newGauge(CUR_FATE_OPS, "Current number of FATE 
Ops", 0L);
+    zkChildFateOpsTotal = registry.newGauge(TOTAL_FATE_OPS, "Total FATE Ops", 
0L);
+    zkConnectionErrorsTotal =
+        registry.newGauge(TOTAL_ZK_CONN_ERRORS, "Total ZK Connection Errors", 
0L);
+
+  }
+
+  /**
+   * For testing only: allow refresh delay to be set to any value, over riding 
the enforced minimum.
+   *
+   * @param minimumRefreshDelay
+   *          set new min refresh value, in seconds.
+   */
+  void overrideRefresh(final long minimumRefreshDelay) {
+    long delay = Math.max(0, minimumRefreshDelay);
+    this.minimumRefreshDelay = TimeUnit.SECONDS.toMillis(delay);
+  }
+
+  @Override
+  public void register() {
+    metricsSystem.register(NAME, DESCRIPTION, this);
+  }
+
+  @Override
+  public void add(String name, long time) {
+    throw new UnsupportedOperationException("add() is not implemented");
+  }
+
+  @Override
+  public boolean isEnabled() {
+    return true;
+  }
+
+  protected void prepareMetrics() {
+
+    metricsValuesLock.lock();
+    try {
+
+      long now = System.currentTimeMillis();
+
+      if ((lastUpdate + minimumRefreshDelay) < now) {
+
+        log.trace("Update fate metrics, lastUpdate: {}, now {}", lastUpdate, 
now);
+
+        metricSnapshot = FateMetricSnapshot.getFromZooKeeper(instanceId, zoo);
+        lastUpdate = now;
+
+        recordValues();
+      }
+    } finally {
+      metricsValuesLock.unlock();
+    }
+  }
+
+  /**
+   * Update the metrics gauges from the measured values. This method assumes 
that concurrent access
+   * is controlled externally to this method using the metricsValueLock, and 
that the lock has been
+   * acquired before calling this method.
+   */
+  private void recordValues() {
+
+    // update individual gauges that are reported.
+    currentFateOps.set(metricSnapshot.getCurrentFateOps());
+    zkChildFateOpsTotal.set(metricSnapshot.getZkFateChildOpsTotal());
+    zkConnectionErrorsTotal.set(metricSnapshot.getZkConnectionErrors());
+
+    // the number FATE Tx states (NEW< IN_PROGRESS...) are fixed - the 
underlying
+    // getTxStateCounters call will return a current valid count for each 
possible state.
+    Map<String,Long> states = metricSnapshot.getTxStateCounters();
+
+    states.forEach((key, value) -> {
+      fateTypeCounts.computeIfAbsent(key,
+          v -> registry.newGauge(metricNameHelper(FATE_TX_STATE_METRIC_PREFIX, 
key),
+              "By transaction state count for " + key, value))
+          .set(value);
+    });
+
+    // the op types are dynamic and the metric gauges generated when first 
seen. After
+    // that the values need to be cleared and set any new values present. This 
is so
+    // that the metrics system will report "known" values once seen. In 
operation, the
+    // number of types will be a fairly small set and should populate with 
normal operations.
+
+    // clear current values.
+    fateOpCounts.forEach((key, value) -> value.set(0));
+
+    // update new counts, create new gauge if first time seen.
+    Map<String,Long> opTypes = metricSnapshot.getOpTypeCounters();
+
+    log.trace("OP Counts Before: prev {}, updates {}", fateOpCounts, opTypes);
+
+    opTypes.forEach((key, value) -> {
+      fateOpCounts.computeIfAbsent(key,
+          guage -> 
registry.newGauge(metricNameHelper(FATE_OP_TYPE_METRIC_PREFIX, key),
+              "By transaction op type count for " + key, value))
+          .set(value);
+    });
+
+    log.trace("OP Counts After: prev {}, updates {}", fateOpCounts, opTypes);
+
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+
+    prepareMetrics();
+
+    recordValues();
+
+    // create the metrics record and publish to the registry.
+    MetricsRecordBuilder builder = 
collector.addRecord(RECORD).setContext(CONTEXT);
+    registry.snapshot(builder, all);
+
+  }
+
+  private String metricNameHelper(final String prefix, final String name) {
+    return prefix + name;
+  }
+
+}
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateLegacyJMXMetrics.java
similarity index 84%
rename from 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
rename to 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateLegacyJMXMetrics.java
index 934d803..00f908d 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetrics.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateLegacyJMXMetrics.java
@@ -24,6 +24,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,16 +46,16 @@ import org.slf4j.LoggerFactory;
  * and the zookeeper id (zxid) is expected to continuously increase because 
the zookeeper id is used
  * by zookeeper for ordering operations.
  */
-public class FateMetrics implements Metrics, FateMetricsMBean {
+public class FateLegacyJMXMetrics implements Metrics, 
FateLegacyJMXMetricsMBean {
 
-  private static final Logger log = LoggerFactory.getLogger(FateMetrics.class);
+  private static final Logger log = 
LoggerFactory.getLogger(FateLegacyJMXMetrics.class);
 
   // limit calls to update fate counters to guard against hammering zookeeper.
   private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(10);
 
-  private volatile long minimumRefreshDelay;
+  private final long minimumRefreshDelay;
 
-  private final AtomicReference<FateMetricValues> metricValues;
+  private final AtomicReference<FateMetricSnapshot> metricValues;
 
   private volatile long lastUpdate = 0;
 
@@ -64,21 +65,26 @@ public class FateMetrics implements Metrics, 
FateMetricsMBean {
 
   private volatile boolean enabled = false;
 
-  public FateMetrics(final String instanceId, final long minimumRefreshDelay) {
+  private final ZooReaderWriter zoo;
+
+  public FateLegacyJMXMetrics(final String instanceId, final long 
minimumRefreshDelay) {
 
     this.instanceId = instanceId;
 
+    zoo = ZooReaderWriter.getInstance();
+
     this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, 
minimumRefreshDelay);
 
-    metricValues = new AtomicReference<>(FateMetricValues.builder().build());
+    metricValues = new AtomicReference<>(FateMetricSnapshot.builder().build());
 
     try {
       objectName = new ObjectName(
-          
"accumulo.server.metrics:service=FateMetrics,name=FateMetricsMBean,instance="
+          
"accumulo.server.metrics:service=FateLegacyJMXMetrics,name=FateLegacyJMXMetricsMBean,instance="
               + Thread.currentThread().getName());
     } catch (Exception e) {
       log.error("Exception setting MBean object name", e);
     }
+
   }
 
   @Override
@@ -99,23 +105,18 @@ public class FateMetrics implements Metrics, 
FateMetricsMBean {
   /**
    * Update the metric values from zookeeper after minimumRefreshDelay has 
expired.
    */
-  public synchronized FateMetricValues snapshot() {
-
-    FateMetricValues current = metricValues.get();
+  public synchronized void snapshot() {
 
     long now = System.currentTimeMillis();
-
     if ((lastUpdate + minimumRefreshDelay) > now) {
-      return current;
+      return;
     }
 
-    FateMetricValues updates = 
FateMetricValues.updateFromZookeeper(instanceId, current);
+    FateMetricSnapshot updates = 
FateMetricSnapshot.getFromZooKeeper(instanceId, zoo);
 
     metricValues.set(updates);
 
     lastUpdate = now;
-
-    return updates;
   }
 
   @Override
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricsMBean.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateLegacyJMXMetricsMBean.java
similarity index 84%
rename from 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricsMBean.java
rename to 
server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateLegacyJMXMetricsMBean.java
index bb0d14b..c619cc6 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricsMBean.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateLegacyJMXMetricsMBean.java
@@ -16,12 +16,12 @@
  */
 package org.apache.accumulo.master.metrics.fate;
 
-public interface FateMetricsMBean {
+public interface FateLegacyJMXMetricsMBean {
 
-  public long getCurrentFateOps();
+  long getCurrentFateOps();
 
-  public long getZkFateChildOpsTotal();
+  long getZkFateChildOpsTotal();
 
-  public long getZKConnectionErrorsTotal();
+  long getZKConnectionErrorsTotal();
 
 }
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricSnapshot.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricSnapshot.java
new file mode 100644
index 0000000..0e612ce
--- /dev/null
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricSnapshot.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.metrics.fate;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.ReadOnlyTStore;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Immutable class that holds a snapshot of fate metric values - use builder 
to instantiate
+ * instance.
+ */
+class FateMetricSnapshot {
+
+  private static final Logger log = 
LoggerFactory.getLogger(FateMetricSnapshot.class);
+
+  private final long updateTime;
+  private final long currentFateOps;
+  private final long zkFateChildOpsTotal;
+  private final long zkConnectionErrors;
+
+  private final Map<String,Long> txStateCounters;
+  private final Map<String,Long> opTypeCounters;
+
+  private FateMetricSnapshot(final long updateTime, final long currentFateOps,
+      final long zkFateChildOpsTotal, final long zkConnectionErrors,
+      final Map<String,Long> txStateCounters, final Map<String,Long> 
opTypeCounters) {
+    this.updateTime = updateTime;
+    this.currentFateOps = currentFateOps;
+    this.zkFateChildOpsTotal = zkFateChildOpsTotal;
+    this.zkConnectionErrors = zkConnectionErrors;
+    this.txStateCounters = txStateCounters;
+    this.opTypeCounters = opTypeCounters;
+  }
+
+  /**
+   * The FATE transaction stores the transaction type as a debug string in the 
transaction zknode.
+   * This method returns a map of counters of the current occurrences of each 
operation type that is
+   * IN_PROGRESS.
+   *
+   * @return a map of operation type counters.
+   */
+  public static FateMetricSnapshot getFromZooKeeper(final String instanceId,
+      final ZooReaderWriter zoo) {
+
+    Builder builder = FateMetricSnapshot.builder();
+    AdminUtil<String> admin = new AdminUtil<>(false);
+
+    try {
+
+      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, zoo);
+
+      List<AdminUtil.TransactionStatus> currFates = 
admin.getTransactionStatus(zs, null, null);
+      builder.withCurrentFateOps(currFates.size());
+
+      Stat node = zoo.getZooKeeper().exists(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, false);
+      builder.withZkFateChildOpsTotal(node.getCversion());
+
+      if (log.isTraceEnabled()) {
+        log.trace(
+            "ZkNodeStat: {czxid: {}, mzxid: {}, pzxid: {}, ctime: {}, mtime: 
{}, "
+                + "version: {}, cversion: {}, num children: {}",
+            node.getCzxid(), node.getMzxid(), node.getPzxid(), 
node.getCtime(), node.getMtime(),
+            node.getVersion(), node.getCversion(), node.getNumChildren());
+      }
+
+      // states are enumerated - create new map with counts initialized to 0.
+      Map<String,Long> states = new TreeMap<>();
+      for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) {
+        states.put(t.name(), 0L);
+      }
+
+      // op types are dynamic, no count initialization needed - clearing prev 
values will
+      // need to be handled by the caller - this is just the counts for 
current op types.
+      Map<String,Long> opTypeCounters = new TreeMap<>();
+
+      for (AdminUtil.TransactionStatus tx : currFates) {
+
+        String stateName = tx.getStatus().name();
+
+        // incr count for state
+        states.merge(stateName, 1L, Long::sum);
+
+        // incr count for op type for for in_progress transactions.
+        if (ReadOnlyTStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) {
+          String opType = tx.getDebug();
+          if (opType == null || opType.isEmpty()) {
+            opType = "UNKNOWN";
+          }
+          opTypeCounters.merge(opType, 1L, Long::sum);
+        }
+      }
+
+      builder.withTxStateCounters(states);
+      builder.withOpTypeCounters(opTypeCounters);
+
+    } catch (KeeperException ex) {
+      log.debug("Error connecting to ZooKeeper", ex);
+      builder.incrZkConnectionErrors();
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    return builder.build();
+  }
+
+  long getCurrentFateOps() {
+    return currentFateOps;
+  }
+
+  long getZkFateChildOpsTotal() {
+    return zkFateChildOpsTotal;
+  }
+
+  long getZkConnectionErrors() {
+    return zkConnectionErrors;
+  }
+
+  /**
+   * Provides counters for transaction states (NEW, IN_PROGRESS, FAILED,...).
+   *
+   * @return a map of transaction status counters.
+   */
+  Map<String,Long> getTxStateCounters() {
+    return txStateCounters;
+  }
+
+  /**
+   * The FATE transaction stores the transaction type as a debug string in the 
transaction zknode.
+   * This method returns a map of counters of the current occurrences of each 
operation type that is
+   * IN_PROGRESS.
+   *
+   * @return a map of operation type counters.
+   */
+  Map<String,Long> getOpTypeCounters() {
+    return opTypeCounters;
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", FateMetricSnapshot.class.getSimpleName() + 
"[", "]")
+        .add("updateTime=" + updateTime).add("currentFateOps=" + 
currentFateOps)
+        .add("zkFateChildOpsTotal=" + zkFateChildOpsTotal)
+        .add("zkConnectionErrors=" + 
zkConnectionErrors).add("txStateCounters=" + txStateCounters)
+        .add("opTypeCounters=" + opTypeCounters).toString();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  static class Builder {
+
+    private long currentFateOps = 0;
+    private long zkFateChildOpsTotal = 0;
+    private long zkConnectionErrors = 0;
+
+    private final Map<String,Long> txStateCounters;
+    private Map<String,Long> opTypeCounters;
+
+    Builder() {
+
+      // states are enumerated - create new map with counts initialized to 0.
+      txStateCounters = new TreeMap<>();
+      for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) {
+        txStateCounters.put(t.name(), 0L);
+      }
+
+      opTypeCounters = Collections.emptyMap();
+    }
+
+    Builder copy(final FateMetricSnapshot v) {
+
+      // if null, return default, empty snapshot
+      if (Objects.isNull(v)) {
+        return this;
+      }
+
+      withCurrentFateOps(v.getCurrentFateOps());
+      withZkFateChildOpsTotal(v.getZkFateChildOpsTotal());
+      withZkConnectionErrors(v.getZkConnectionErrors());
+      withTxStateCounters(v.getTxStateCounters());
+      withOpTypeCounters(v.getOpTypeCounters());
+      return this;
+    }
+
+    Builder withCurrentFateOps(final long value) {
+      this.currentFateOps = value;
+      return this;
+    }
+
+    Builder withZkFateChildOpsTotal(final long value) {
+      this.zkFateChildOpsTotal = value;
+      return this;
+    }
+
+    Builder incrZkConnectionErrors() {
+      this.zkConnectionErrors += 1L;
+      return this;
+    }
+
+    Builder withZkConnectionErrors(final long value) {
+      this.zkConnectionErrors = value;
+      return this;
+    }
+
+    Builder withTxStateCounters(final Map<String,Long> txStateCounters) {
+      this.txStateCounters.putAll(txStateCounters);
+      return this;
+    }
+
+    Builder withOpTypeCounters(final Map<String,Long> opTypeCounters) {
+      this.opTypeCounters = new TreeMap<>(opTypeCounters);
+      return this;
+    }
+
+    FateMetricSnapshot build() {
+      return new FateMetricSnapshot(System.currentTimeMillis(), 
currentFateOps, zkFateChildOpsTotal,
+          zkConnectionErrors, txStateCounters, opTypeCounters);
+    }
+  }
+}
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java
deleted file mode 100644
index b776f57..0000000
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/FateMetricValues.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.metrics.fate;
-
-import java.util.List;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.AdminUtil;
-import org.apache.accumulo.fate.ZooStore;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Immutable class that holds a snapshot of fate metric values - use builder 
to instantiate
- * instance.
- */
-class FateMetricValues {
-
-  private static final Logger log = LoggerFactory.getLogger(FateMetrics.class);
-
-  private final long updateTime;
-  private final long currentFateOps;
-  private final long zkFateChildOpsTotal;
-  private final long zkConnectionErrors;
-
-  private FateMetricValues(final long updateTime, final long currentFateOps,
-      final long zkFateChildOpsTotal, final long zkConnectionErrors) {
-    this.updateTime = updateTime;
-    this.currentFateOps = currentFateOps;
-    this.zkFateChildOpsTotal = zkFateChildOpsTotal;
-    this.zkConnectionErrors = zkConnectionErrors;
-  }
-
-  long getCurrentFateOps() {
-    return currentFateOps;
-  }
-
-  long getZkFateChildOpsTotal() {
-    return zkFateChildOpsTotal;
-  }
-
-  long getZkConnectionErrors() {
-    return zkConnectionErrors;
-  }
-
-  /**
-   * Update FateMetricValues, populating with current values and the 
overwritting new values, this
-   * preserves previous values in case an error or exception prevents the 
values from being
-   * completely populated, this form may be more suitable for metric counters.
-   *
-   * @param instanceId
-   *          Accumulo instanceId
-   * @param currentValues
-   *          the current fate metrics used as default
-   * @return populated metrics values
-   */
-  static FateMetricValues updateFromZookeeper(final String instanceId,
-      final FateMetricValues currentValues) {
-    return updateFromZookeeper(instanceId, 
FateMetricValues.builder().copy(currentValues));
-  }
-
-  /**
-   * Update the FATE metric values from zookeeepr - the builder is expected to 
have the desired
-   * default values (either 0, or the previous value).
-   *
-   * @param instanceId
-   *          Accumulo instanceId
-   * @param builder
-   *          value builder, populated with defaults.
-   * @return an immutable instance of FateMetricsValues.
-   */
-  private static FateMetricValues updateFromZookeeper(final String instanceId,
-      final FateMetricValues.Builder builder) {
-
-    AdminUtil<String> admin = new AdminUtil<>(false);
-
-    try {
-
-      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, zoo);
-
-      List<AdminUtil.TransactionStatus> currFates = 
admin.getTransactionStatus(zs, null, null);
-      builder.withCurrentFateOps(currFates.size());
-
-      Stat node = zoo.getZooKeeper().exists(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, false);
-      builder.withZkFateChildOpsTotal(node.getCversion());
-
-      if (log.isTraceEnabled()) {
-        log.trace(
-            "ZkNodeStat: {czxid: {}, mzxid: {}, pzxid: {}, ctime: {}, mtime: 
{}, "
-                + "version: {}, cversion: {}, num children: {}",
-            node.getCzxid(), node.getMzxid(), node.getPzxid(), 
node.getCtime(), node.getMtime(),
-            node.getVersion(), node.getCversion(), node.getNumChildren());
-      }
-    } catch (KeeperException ex) {
-      log.debug("Error connecting to ZooKeeper", ex);
-      builder.incrZkConnectionErrors();
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-
-    return builder.build();
-  }
-
-  @Override
-  public String toString() {
-    return "FateMetricValues{" + "updateTime=" + updateTime + ", 
currentFateOps=" + currentFateOps
-        + ", zkFateChildOpsTotal=" + zkFateChildOpsTotal + ", 
zkConnectionErrors="
-        + zkConnectionErrors + '}';
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  static class Builder {
-
-    private long currentFateOps = 0;
-    private long zkFateChildOpsTotal = 0;
-    private long zkConnectionErrors = 0;
-
-    Builder copy(final FateMetricValues prevValues) {
-
-      if (prevValues == null) {
-        return new Builder();
-      }
-
-      return new Builder().withCurrentFateOps(prevValues.getCurrentFateOps())
-          .withZkFateChildOpsTotal(prevValues.getZkFateChildOpsTotal())
-          .withZkConnectionErrors(prevValues.getZkConnectionErrors());
-    }
-
-    Builder withCurrentFateOps(final long value) {
-      this.currentFateOps = value;
-      return this;
-    }
-
-    Builder withZkFateChildOpsTotal(final long value) {
-      this.zkFateChildOpsTotal = value;
-      return this;
-    }
-
-    Builder incrZkConnectionErrors() {
-      this.zkConnectionErrors += 1L;
-      return this;
-    }
-
-    Builder withZkConnectionErrors(final long value) {
-      this.zkConnectionErrors = value;
-      return this;
-    }
-
-    FateMetricValues build() {
-      return new FateMetricValues(System.currentTimeMillis(), currentFateOps, 
zkFateChildOpsTotal,
-          zkConnectionErrors);
-    }
-  }
-}
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java
deleted file mode 100644
index 3bbe94e..0000000
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/fate/Metrics2FateMetrics.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.metrics.fate;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.accumulo.server.metrics.Metrics;
-import org.apache.accumulo.server.metrics.MetricsSystemHelper;
-import org.apache.hadoop.metrics2.MetricsCollector;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.impl.MsInfo;
-import org.apache.hadoop.metrics2.lib.Interns;
-import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Metrics2FateMetrics implements Metrics, MetricsSource {
-
-  private static final Logger log = 
LoggerFactory.getLogger(Metrics2FateMetrics.class);
-
-  // limit calls to update fate counters to guard against hammering zookeeper.
-  private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(10);
-
-  private volatile long minimumRefreshDelay;
-
-  public static final String NAME = MASTER_NAME + ",sub=Fate";
-  public static final String DESCRIPTION = "Fate Metrics";
-  public static final String CONTEXT = "master";
-  public static final String RECORD = "fate";
-  public static final String CUR_FATE_OPS = "currentFateOps";
-  public static final String TOTAL_FATE_OPS = "totalFateOps";
-  public static final String TOTAL_ZK_CONN_ERRORS = "totalZkConnErrors";
-
-  private final String instanceId;
-  private final MetricsSystem metricsSystem;
-  private final MetricsRegistry registry;
-  private final MutableGaugeLong currentFateOps;
-  private final MutableGaugeLong zkChildFateOpsTotal;
-  private final MutableGaugeLong zkConnectionErrorsTotal;
-
-  private final AtomicReference<FateMetricValues> metricValues;
-
-  private volatile long lastUpdate = 0;
-
-  public Metrics2FateMetrics(final String instanceId, MetricsSystem 
metricsSystem,
-      final long minimumRefreshDelay) {
-
-    this.instanceId = instanceId;
-
-    this.minimumRefreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, 
minimumRefreshDelay);
-
-    metricValues = new 
AtomicReference<>(FateMetricValues.updateFromZookeeper(instanceId, null));
-
-    this.metricsSystem = metricsSystem;
-    this.registry = new MetricsRegistry(Interns.info(NAME, DESCRIPTION));
-    this.registry.tag(MsInfo.ProcessName, 
MetricsSystemHelper.getProcessName());
-
-    currentFateOps = registry.newGauge(CUR_FATE_OPS, "Current number of FATE 
Ops", 0L);
-    zkChildFateOpsTotal = registry.newGauge(TOTAL_FATE_OPS, "Total FATE Ops", 
0L);
-    zkConnectionErrorsTotal =
-        registry.newGauge(TOTAL_ZK_CONN_ERRORS, "Total ZK Connection Errors", 
0L);
-
-  }
-
-  @Override
-  public void register() {
-    metricsSystem.register(NAME, DESCRIPTION, this);
-  }
-
-  @Override
-  public void add(String name, long time) {
-    throw new UnsupportedOperationException("add() is not implemented");
-  }
-
-  @Override
-  public boolean isEnabled() {
-    return true;
-  }
-
-  @Override
-  public void getMetrics(MetricsCollector collector, boolean all) {
-
-    log.trace("getMetrics called with collector: {}", collector);
-
-    FateMetricValues fateMetrics = metricValues.get();
-
-    long now = System.currentTimeMillis();
-
-    if ((lastUpdate + minimumRefreshDelay) < now) {
-
-      fateMetrics = FateMetricValues.updateFromZookeeper(instanceId, 
fateMetrics);
-
-      metricValues.set(fateMetrics);
-
-      lastUpdate = now;
-
-      // update individual gauges that are reported.
-      currentFateOps.set(fateMetrics.getCurrentFateOps());
-      zkChildFateOpsTotal.set(fateMetrics.getZkFateChildOpsTotal());
-      zkConnectionErrorsTotal.set(fateMetrics.getZkConnectionErrors());
-
-    }
-
-    // create the metrics record and publish to the registry.
-    MetricsRecordBuilder builder = 
collector.addRecord(RECORD).setContext(CONTEXT);
-    registry.snapshot(builder, all);
-
-  }
-}
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricValuesTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricSnapshotTest.java
similarity index 86%
rename from 
server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricValuesTest.java
rename to 
server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricSnapshotTest.java
index c5cdae4..6c27cca 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricValuesTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricSnapshotTest.java
@@ -20,12 +20,12 @@ import static org.junit.Assert.assertEquals;
 
 import org.junit.Test;
 
-public class FateMetricValuesTest {
+public class FateMetricSnapshotTest {
 
   @Test
   public void defaultValueTest() {
 
-    FateMetricValues v = FateMetricValues.builder().build();
+    FateMetricSnapshot v = FateMetricSnapshot.builder().build();
 
     assertEquals(0, v.getCurrentFateOps());
     assertEquals(0, v.getZkFateChildOpsTotal());
@@ -35,18 +35,18 @@ public class FateMetricValuesTest {
   @Test
   public void valueTest() {
 
-    FateMetricValues.Builder builder = FateMetricValues.builder();
+    FateMetricSnapshot.Builder builder = FateMetricSnapshot.builder();
 
-    FateMetricValues v =
+    FateMetricSnapshot v =
         
builder.withCurrentFateOps(1).withZkFateChildOpsTotal(2).withZkConnectionErrors(3).build();
 
     assertEquals(1, v.getCurrentFateOps());
     assertEquals(2, v.getZkFateChildOpsTotal());
     assertEquals(3, v.getZkConnectionErrors());
 
-    FateMetricValues.Builder builder2 = builder.copy(v);
+    FateMetricSnapshot.Builder builder2 = builder.copy(v);
 
-    FateMetricValues v2 = builder2.withCurrentFateOps(11).build();
+    FateMetricSnapshot v2 = builder2.withCurrentFateOps(11).build();
 
     assertEquals(11, v2.getCurrentFateOps());
     assertEquals(2, v2.getZkFateChildOpsTotal());
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricsTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricsTest.java
new file mode 100644
index 0000000..2275d04
--- /dev/null
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/FateMetricsTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.master.metrics.fate;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.ReadOnlyTStore;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test FATE metrics using stubs and in-memory version of supporting 
infrastructure - a test
+ * zookeeper server is used an the FATE repos are stubs, but this should 
represent the metrics
+ * collection execution without needed to stand up a mini cluster to exercise 
these execution paths.
+ */
+public class FateMetricsTest {
+
+  private static final Logger log = 
LoggerFactory.getLogger(FateMetricsTest.class);
+  public static final String INSTANCE_ID = "1234";
+  public static final String MOCK_ZK_ROOT = "/accumulo/" + INSTANCE_ID;
+  public static final String A_FAKE_SECRET = "aPasswd";
+
+  private static ZooKeeperTestingServer szk = null;
+
+  private ZooStore<Master> zooStore = null;
+  private ZooKeeper zookeeper = null;
+
+  private final MetricsSystem ms = DefaultMetricsSystem.initialize("Accumulo");
+
+  private static ZooReaderWriter zooReaderWriter;
+
+  @BeforeClass
+  public static void setupZk() {
+    // using default zookeeper port - we don't have a full configuration
+    szk = new ZooKeeperTestingServer();
+    szk.initPaths(MOCK_ZK_ROOT);
+
+    // populate ZooReaderWriter cache
+    zooReaderWriter = ZooReaderWriter.getInstance(szk.getConn(), 30_000, 
A_FAKE_SECRET);
+  }
+
+  @AfterClass
+  public static void shutdownZK() throws Exception {
+    szk.close();
+  }
+
+  Master master;
+
+  /**
+   * Instantiate a test zookeeper and setup mocks for Master and Context. The 
test zookeeper is used
+   * create a ZooReaderWriter. The zookeeper used in tests needs to be the one 
from the
+   * zooReaderWriter, not the test server, because the zooReaderWriter sets up 
ACLs.
+   *
+   * @throws Exception
+   *           any exception is a test failure.
+   */
+  @Before
+  public void init() throws Exception {
+
+    zookeeper = ZooReaderWriter.getInstance().getZooKeeper();
+
+    clear(MOCK_ZK_ROOT);
+
+    zooStore = new ZooStore<>(MOCK_ZK_ROOT + Constants.ZFATE, 
ZooReaderWriter.getInstance());
+
+    master = EasyMock.createMock(Master.class);
+    ServerConfigurationFactory cf = 
EasyMock.createMock(ServerConfigurationFactory.class);
+
+    Instance instance = EasyMock.createMock(Instance.class);
+    EasyMock.expect(cf.getInstance()).andReturn(instance).anyTimes();
+
+    EasyMock.expect(instance.getZooKeepers()).andReturn(szk.getConn());
+    EasyMock.expect(instance.getZooKeepersSessionTimeOut()).andReturn(10_000);
+
+    AccumuloConfiguration accumuloConfiguration = 
EasyMock.createMock(AccumuloConfiguration.class);
+
+    
EasyMock.expect(accumuloConfiguration.get(Property.INSTANCE_SECRET)).andReturn(A_FAKE_SECRET);
+
+    EasyMock.replay(master, cf, instance, accumuloConfiguration);
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    clear(MOCK_ZK_ROOT);
+  }
+
+  /**
+   * Validate that the expected metrics values are present in the metrics 
collector output
+   */
+  @Test
+  public void noFates() {
+
+    FateHadoop2Metrics metrics = new FateHadoop2Metrics(INSTANCE_ID, ms, 10);
+    metrics.overrideRefresh(0);
+
+    InMemTestCollector collector = new InMemTestCollector();
+
+    metrics.getMetrics(collector, true);
+
+    log.debug("Collector: {}", collector);
+
+    assertTrue(collector.contains("currentFateOps"));
+    assertTrue(collector.contains("totalFateOps"));
+    assertTrue(collector.contains("totalZkConnErrors"));
+
+    // Transaction STATES - defined by TStatus.
+    assertTrue(collector.contains("FateTxState_NEW"));
+    assertTrue(collector.contains("FateTxState_IN_PROGRESS"));
+    assertTrue(collector.contains("FateTxState_FAILED_IN_PROGRESS"));
+    assertTrue(collector.contains("FateTxState_FAILED"));
+    assertTrue(collector.contains("FateTxState_SUCCESSFUL"));
+    assertTrue(collector.contains("FateTxState_UNKNOWN"));
+
+    // metrics derived from operation types when see - none should have been 
seen.
+    assertFalse(collector.contains("FateTxOpType_FakeOp"));
+
+    assertEquals(0L, collector.getValue("FateTxState_IN_PROGRESS"));
+    assertEquals(0L, collector.getValue("currentFateOps"));
+
+    EasyMock.verify(master);
+
+  }
+
+  /**
+   * Seed a fake FAKE fate that has a reserved transaction id. This sets a 
"new" status, but the
+   * repo and debug props are not yet set. Verify the the metric collection 
handles partial
+   * transaction states.
+   */
+  @Test
+  public void fateNewStatus() {
+
+    long tx1Id = zooStore.create();
+    log.debug("ZooStore tx1 id {}", tx1Id);
+
+    FateHadoop2Metrics metrics = new FateHadoop2Metrics(INSTANCE_ID, ms, 10);
+    metrics.overrideRefresh(0);
+
+    InMemTestCollector collector = new InMemTestCollector();
+
+    metrics.getMetrics(collector, true);
+
+    log.debug("Collector: {}", collector);
+
+    assertTrue(collector.contains("FateTxState_NEW"));
+    assertEquals(1L, collector.getValue("FateTxState_NEW"));
+    assertEquals(1L, collector.getValue("currentFateOps"));
+
+    assertTrue(collector.contains("FateTxState_IN_PROGRESS"));
+    assertEquals(0L, collector.getValue("FateTxState_IN_PROGRESS"));
+
+    EasyMock.verify(master);
+
+  }
+
+  /**
+   * Seeds the zoo store with a "fake" repo operation with a step, and sets 
the prop_debug field.
+   * This emulates the actions performed with {@link 
org.apache.accumulo.fate.Fate} for what is
+   * expected in zookeeeper / the zoo store for an IN_PROGRESS transaction.
+   *
+   * @throws Exception
+   *           any exception is a test failure.
+   */
+  @Test
+  public void oneInProgress() throws Exception {
+
+    long tx1Id = seedTransaction();
+
+    log.debug("FATE tx: {}", prettyStat(
+        zookeeper.exists(MOCK_ZK_ROOT + "/fate/" + String.format("tx_%016x", 
tx1Id), false)));
+
+    FateHadoop2Metrics metrics = new FateHadoop2Metrics(INSTANCE_ID, ms, 10);
+    metrics.overrideRefresh(0);
+
+    InMemTestCollector collector = new InMemTestCollector();
+
+    metrics.getMetrics(collector, true);
+
+    log.debug("Collector: {}", collector);
+
+    assertTrue(collector.contains("FateTxState_IN_PROGRESS"));
+    assertEquals(1L, collector.getValue("FateTxState_IN_PROGRESS"));
+    assertEquals(1L, collector.getValue("FateTxOpType_FakeOp"));
+
+    EasyMock.verify(master);
+  }
+
+  private long seedTransaction() throws Exception {
+
+    long txId = zooStore.create();
+    zooStore.reserve(txId);
+
+    zooStore.setStatus(txId, ReadOnlyTStore.TStatus.IN_PROGRESS);
+
+    Repo<Master> repo = new FakeOp();
+    zooStore.push(txId, repo);
+
+    Repo<Master> step = new FakeOpStep1();
+    zooStore.push(txId, step);
+
+    zooStore.setProperty(txId, "debug", repo.getDescription());
+
+    zooStore.unreserve(txId, 50);
+
+    return txId;
+  }
+
+  /**
+   * builds on the "in progress" transaction - when a transaction completes, 
the op type metric
+   * should not reflect the previous operation that was "in progress".
+   *
+   * @throws Exception
+   *           any exception is a test failure.
+   */
+  @Test
+  public void typeClears() throws Exception {
+    long txId = seedTransaction();
+
+    zooStore.reserve(txId);
+    zooStore.setStatus(txId, ReadOnlyTStore.TStatus.SUCCESSFUL);
+    zooStore.unreserve(txId, 50);
+
+    FateHadoop2Metrics metrics = new FateHadoop2Metrics(INSTANCE_ID, ms, 10);
+    metrics.overrideRefresh(0);
+
+    InMemTestCollector collector = new InMemTestCollector();
+
+    metrics.getMetrics(collector, true);
+
+    assertEquals(0L, collector.getValue("FateTxState_IN_PROGRESS"));
+    assertEquals(1L, collector.getValue("FateTxState_SUCCESSFUL"));
+    assertNull(collector.getValue("FateTxOpType_FakeOp"));
+
+  }
+
+  private static class FakeOp extends MasterRepo {
+    private static final long serialVersionUID = -1L;
+
+    @Override
+    public Repo<Master> call(long tid, Master environment) {
+      return null;
+    }
+  }
+
+  private static class FakeOpStep1 extends MasterRepo {
+    private static final long serialVersionUID = -1L;
+
+    @Override
+    public Repo<Master> call(long tid, Master environment) {
+      return null;
+    }
+  }
+
+  String prettyStat(final Stat stat) {
+
+    if (stat == null) {
+      return "{Stat:[null]}";
+    }
+
+    return "{Stat:[" + "czxid:" + stat.getCzxid() + ", mzxid:" + 
stat.getMzxid() + ", ctime: "
+        + stat.getCtime() + ", mtime: " + stat.getMtime() + ", version: " + 
stat.getVersion()
+        + ", cversion: " + stat.getCversion() + ", aversion: " + 
stat.getAversion()
+        + ", eph owner: " + stat.getEphemeralOwner() + ", dataLength: " + 
stat.getDataLength()
+        + ", numChildren: " + stat.getNumChildren() + ", pzxid: " + 
stat.getPzxid() + "}";
+
+  }
+
+  private void clear(String path) throws Exception {
+
+    log.debug("clean up - search: {}", path);
+
+    List<String> children = zookeeper.getChildren(path, false);
+
+    if (children.isEmpty()) {
+      log.debug("clean up - delete path: {}", path);
+
+      if (!path.equals(MOCK_ZK_ROOT)) {
+        zookeeper.delete(path, -1);
+      }
+      return;
+    }
+
+    for (String cp : children) {
+      clear(path + "/" + cp);
+    }
+  }
+}
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/InMemTestCollector.java
 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/InMemTestCollector.java
new file mode 100644
index 0000000..3e2a0b4
--- /dev/null
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/InMemTestCollector.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.master.metrics.fate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemTestCollector implements MetricsCollector {
+
+  private static final Logger log = 
LoggerFactory.getLogger(InMemTestCollector.class);
+
+  final private Map<String,Number> measurements = new HashMap<>();
+
+  final MetricsRecordBuilder builder = new FakeMetricsBuilder(measurements);
+
+  @Override
+  public MetricsRecordBuilder addRecord(String s) {
+    log.debug("RecordBuilder: Adding string: {}", s);
+    return builder;
+  }
+
+  @Override
+  public MetricsRecordBuilder addRecord(MetricsInfo metricsInfo) {
+    log.debug("FC: Add info {}", metricsInfo);
+    return builder;
+  }
+
+  boolean contains(final String name) {
+    return measurements.containsKey(name);
+  }
+
+  Number getValue(final String name) {
+    return measurements.get(name);
+  }
+
+  @Override
+  public String toString() {
+    return "InMemTestCollector{" + "measurements=" + measurements + '}';
+  }
+
+  private static class FakeMetricsBuilder extends MetricsRecordBuilder {
+
+    final Map<String,Number> measurements;
+
+    FakeMetricsBuilder(Map<String,Number> measurements) {
+      this.measurements = measurements;
+    }
+
+    @Override
+    public MetricsRecordBuilder tag(MetricsInfo metricsInfo, String s) {
+      log.debug("Add tag {}: {}", metricsInfo, s);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder add(MetricsTag metricsTag) {
+      log.debug("Add tag {}", metricsTag);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder add(AbstractMetric abstractMetric) {
+      log.debug("Add abstractMetric: {}", abstractMetric);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder setContext(String s) {
+      log.debug("Set context: {}", s);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addCounter(MetricsInfo metricsInfo, int i) {
+      log.debug("add int counter: {}: {}", metricsInfo, i);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addCounter(MetricsInfo metricsInfo, long l) {
+      log.debug("add long counter: {}: {}", metricsInfo, l);
+      measurements.put(metricsInfo.name(), l);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, int i) {
+      log.debug("add int gauge: {}: {}", metricsInfo, i);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, long l) {
+      log.debug("add long gauge: {}: {}", metricsInfo.name(), l);
+      measurements.put(metricsInfo.name(), l);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, float v) {
+      log.debug("add float gauge: {}: {}", metricsInfo, v);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, double v) {
+      log.debug("add double gauge: {}: {}", metricsInfo, v);
+      return this;
+    }
+
+    @Override
+    public MetricsCollector endRecord() {
+      log.debug("end record called");
+      return super.endRecord();
+    }
+
+    @Override
+    public MetricsCollector parent() {
+      return null;
+    }
+  }
+}
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/ZooKeeperTestingServer.java
 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/ZooKeeperTestingServer.java
new file mode 100644
index 0000000..c6e3746
--- /dev/null
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/metrics/fate/ZooKeeperTestingServer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.master.metrics.fate;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.SecureRandom;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to create a running zookeeper server for internal 
tests. The zookeeper port
+ * is randomly assigned in case multiple instances are created by concurrent 
tests.
+ */
+public class ZooKeeperTestingServer {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ZooKeeperTestingServer.class);
+
+  private TestingServer zkServer;
+  private final ZooKeeper zoo;
+
+  private static final Random rand = new SecureRandom();
+
+  /**
+   * Instantiate a running zookeeper server - this call will block until the 
server is ready for
+   * client connections. It will try three times, with a 5 second pause to 
connect.
+   */
+  public ZooKeeperTestingServer() {
+    this(getPort());
+  }
+
+  private ZooKeeperTestingServer(int port) {
+
+    try {
+
+      Path tmpDir = Files.createTempDirectory("zk_test");
+
+      CountDownLatch connectionLatch = new CountDownLatch(1);
+
+      // using a random port. The test server allows for auto port
+      // generation, but not with specifying the tmp dir path too.
+      // so, generate our own.
+      boolean started = false;
+      int retry = 0;
+      while (!started && retry++ < 3) {
+
+        try {
+
+          zkServer = new TestingServer(port, tmpDir.toFile());
+          zkServer.start();
+
+          started = true;
+        } catch (Exception ex) {
+          log.trace("zookeeper test server start failed attempt {}", retry);
+        }
+      }
+
+      log.info("zookeeper connection string:'{}'", 
zkServer.getConnectString());
+
+      zoo = new ZooKeeper(zkServer.getConnectString(), 5_000, watchedEvent -> {
+        if (watchedEvent.getState() == 
Watcher.Event.KeeperState.SyncConnected) {
+          connectionLatch.countDown();
+        }
+      });
+
+      connectionLatch.await();
+
+    } catch (Exception ex) {
+      throw new IllegalStateException("Failed to start testing zookeeper", ex);
+    }
+
+  }
+
+  /**
+   * Returns an random integer between 50_000 and 65_000 (typical ephemeral 
port range for linux is
+   * listed as 49,152 to 65,535
+   *
+   * @return a random port with the linux ephemeral port range.
+   */
+  private static int getPort() {
+    final int minPort = 50_000;
+    final int maxPort = 65_000;
+    return rand.nextInt((maxPort - minPort) + 1) + minPort;
+  }
+
+  public ZooKeeper getZooKeeper() {
+    return zoo;
+  }
+
+  public String getConn() {
+    return zkServer.getConnectString();
+  }
+
+  public void initPaths(String s) {
+    try {
+
+      String[] paths = s.split("/");
+
+      String slash = "/";
+      String path = "";
+
+      for (String p : paths) {
+        if (p.length() > 0) {
+          path = path + slash + p;
+          log.debug("building default paths, creating node {}", path);
+          zoo.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        }
+      }
+
+    } catch (Exception ex) {
+      throw new IllegalStateException("Failed to create accumulo initial 
paths: " + s, ex);
+    }
+  }
+
+  public void close() throws IOException {
+    if (zkServer != null) {
+      zkServer.stop();
+    }
+  }
+
+}
diff --git a/server/master/src/test/resources/conf/accumulo-site.xml 
b/server/master/src/test/resources/conf/accumulo-site.xml
new file mode 100644
index 0000000..23b47bf
--- /dev/null
+++ b/server/master/src/test/resources/conf/accumulo-site.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+       <!--
+  Put your site-specific accumulo configurations here.
+
+  The available configuration values along with their defaults
+  are documented in docs/config.html
+
+  Unless you are simply testing at your workstation, you will most 
+  definitely need to change the three entries below.
+       -->
+
+  <property>
+    <name>instance.zookeeper.host</name>
+    <value>localhost:2181</value>
+    <description>comma separated list of zookeeper servers</description>
+  </property>
+
+  <property>
+    <name>instance.secret</name>
+    <value>DEFAULT</value>
+    <description>A secret unique to a given instance that all servers must 
know in order to communicate with one another.
+      Change it before initialization. To
+      change it later use ./bin/accumulo 
org.apache.accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd],
+      and then update this file.
+    </description>
+  </property>
+
+  <property>
+    <name>tserver.memory.maps.max</name>
+    <value>80M</value>
+  </property>
+
+  <property>
+    <name>tserver.cache.data.size</name>
+    <value>7M</value>
+  </property>
+
+  <property>
+    <name>tserver.cache.index.size</name>
+    <value>20M</value>
+  </property>
+
+  <property>
+    <name>trace.password</name>
+      <!-- 
+        change this to the root user's password, and/or change the user below 
+       -->
+    <value>secret</value>
+  </property>
+
+  <property>
+    <name>trace.user</name>
+    <value>root</value>
+  </property>
+
+  <property>
+    <name>tserver.sort.buffer.size</name>
+    <value>50M</value>
+  </property>
+
+  <property>
+    <name>tserver.walog.max.size</name>
+    <value>100M</value>
+  </property>
+
+  <property>
+    <name>general.classpaths</name>
+    <!--
+       Add the following for hadoop-2.0
+       $HADOOP_PREFIX/share/hadoop/common/.*.jar,
+       $HADOOP_PREFIX/share/hadoop/common/lib/.*.jar,
+       $HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,
+       $HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,
+       $HADOOP_PREFIX/share/hadoop/yarn/.*.jar,
+    -->
+    <value>
+      $ACCUMULO_HOME/server/target/classes/,
+      $ACCUMULO_HOME/lib/accumulo-server.jar,
+      $ACCUMULO_HOME/core/target/classes/,
+      $ACCUMULO_HOME/lib/accumulo-core.jar,
+      $ACCUMULO_HOME/start/target/classes/,
+      $ACCUMULO_HOME/lib/accumulo-start.jar,
+      $ACCUMULO_HOME/fate/target/classes/,
+      $ACCUMULO_HOME/lib/accumulo-fate.jar,
+      $ACCUMULO_HOME/proxy/target/classes/,
+      $ACCUMULO_HOME/lib/accumulo-proxy.jar,
+      $ACCUMULO_HOME/lib/[^.].*.jar,
+      $ZOOKEEPER_HOME/zookeeper[^.].*.jar,
+      $HADOOP_CONF_DIR,
+      $HADOOP_PREFIX/[^.].*.jar,
+      $HADOOP_PREFIX/lib/[^.].*.jar,
+    </value>
+    <description>Classpaths that accumulo checks for updates and class files.
+      When using the Security Manager, please remove the ".../target/classes/" 
values.
+    </description>
+  </property>
+</configuration>
diff --git a/server/master/src/test/resources/conf/generic_logger.xml 
b/server/master/src/test/resources/conf/generic_logger.xml
new file mode 100644
index 0000000..db79efe
--- /dev/null
+++ b/server/master/src/test/resources/conf/generic_logger.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+
+  <!-- Write out everything at the DEBUG level to the debug log -->
+  <appender name="A2" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           
value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.debug.log"/>
+     <param name="MaxFileSize"    value="1000MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="DEBUG"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: 
%m%n"/>
+     </layout>
+  </appender>
+
+  <!--  Write out INFO and higher to the regular log -->
+  <appender name="A3" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           
value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.log"/>
+     <param name="MaxFileSize"    value="1000MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="INFO"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: 
%m%n"/>
+     </layout>
+  </appender>
+
+  <!-- Send all logging data to a centralized logger -->
+  <appender name="N1" class="org.apache.log4j.net.SocketAppender">
+     <param name="remoteHost"     
value="${org.apache.accumulo.core.host.log}"/>
+     <param name="port"           
value="${org.apache.accumulo.core.host.log.port}"/>
+     <param name="application"    
value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
+     <param name="Threshold"      value="WARN"/>
+  </appender>
+
+  <!--  If the centralized logger is down, buffer the log events, but drop 
them if it stays down -->
+  <appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
+     <appender-ref ref="N1" />
+  </appender>
+
+  <!-- Log accumulo events to the debug, normal and remote logs. -->
+  <logger name="org.apache.accumulo" additivity="false">
+     <level value="DEBUG"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+     <appender-ref ref="ASYNC" />
+  </logger>
+
+  <logger name="org.apache.accumulo.core.file.rfile.bcfile">
+     <level value="INFO"/>
+  </logger>
+
+  <logger name="org.mortbay.log">
+     <level value="WARN"/>
+  </logger>
+
+  <logger name="org.apache.zookeeper">
+     <level value="ERROR"/>
+  </logger>
+
+  <!-- Log non-accumulo events to the debug and normal logs. -->
+  <root>
+     <level value="INFO"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+  </root>
+
+</log4j:configuration>
diff --git a/server/master/src/test/resources/conf/monitor_logger.xml 
b/server/master/src/test/resources/conf/monitor_logger.xml
new file mode 100644
index 0000000..91a7671
--- /dev/null
+++ b/server/master/src/test/resources/conf/monitor_logger.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+
+  <!-- Write out everything at the DEBUG level to the debug log -->
+  <appender name="A2" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           
value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.debug.log"/>
+     <param name="MaxFileSize"    value="100MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="DEBUG"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: 
%X{application} %m%n"/>
+     </layout>
+  </appender>
+
+  <!--  Write out INFO and higher to the regular log -->
+  <appender name="A3" class="org.apache.log4j.RollingFileAppender">
+     <param name="File"           
value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.log"/>
+     <param name="MaxFileSize"    value="100MB"/>
+     <param name="MaxBackupIndex" value="10"/>
+     <param name="Threshold"      value="INFO"/>
+     <layout class="org.apache.log4j.PatternLayout">
+       <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: 
%X{application} %m%n"/>
+     </layout>
+  </appender>
+
+  <!-- Keep the last few log messages for display to the user -->
+  <appender name="GUI" class="org.apache.accumulo.server.monitor.LogService">
+     <param name="keep"           value="40"/>
+     <param name="Threshold"      value="WARN"/>
+  </appender>
+
+  <!-- Log accumulo messages to debug, normal and GUI -->
+  <logger name="org.apache.accumulo" additivity="false">
+     <level value="DEBUG"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+     <appender-ref ref="GUI" />
+  </logger>
+
+  <!-- Log non-accumulo messages to debug, normal logs. -->
+  <root>
+     <level value="INFO"/>
+     <appender-ref ref="A2" />
+     <appender-ref ref="A3" />
+  </root>
+
+</log4j:configuration>
diff --git 
a/server/master/src/test/resources/hadoop-metrics2-accumulo.properties 
b/server/master/src/test/resources/hadoop-metrics2-accumulo.properties
new file mode 100644
index 0000000..e869144
--- /dev/null
+++ b/server/master/src/test/resources/hadoop-metrics2-accumulo.properties
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Poll collectors every thirty seconds
+*.period=5
+
+#
+# MetricSink configuration
+#
+# Uncomment a sink (and configuration) to enable it. By default,
+# no sinks are enabled.
+#
+
+#
+# Configure file logging
+#
+
+# File sink for all metrics
+accumulo.sink.file-all.class=org.apache.hadoop.metrics2.sink.FileSink
+accumulo.sink.file-all.filename=./target/it.all.metrics
+
+accumulo.sink.file-gc.class=org.apache.hadoop.metrics2.sink.FileSink
+accumulo.sink.file-gc.context=accgc
+accumulo.sink.file-gc.filename=./target/accgc.metrics
+accumulo.sink.file-gc.period=5
+
+# File sink for tserver metrics
+# accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink
+# accumulo.sink.file-tserver.context=tserver
+# accumulo.sink.file-tserver.filename=tserver.metrics
+
+# File sink for master metrics
+accumulo.sink.file-master.class=org.apache.hadoop.metrics2.sink.FileSink
+accumulo.sink.file-master.context=master
+accumulo.sink.file-master.filename=./target/master.metrics
+accumulo.sink.file-master.period=5
+
+accumulo.jmx.master.context=master
+
+# File sink for thrift server metrics
+# accumulo.sink.file-thrift.class=org.apache.hadoop.metrics2.sink.FileSink
+# accumulo.sink.file-thrift.context=thrift
+# accumulo.sink.file-thrift.filename=thrift.metrics
+
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java
index 9bde363..04100a1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java
@@ -26,7 +26,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.gc.metrics.GcMetrics;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.metrics.MetricsFileTailer;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +50,12 @@ public class GcMetricsIT extends AccumuloClusterHarness {
       "AccGcWalErrors", "AccGcWalFinished", "AccGcWalInUse", 
"AccGcWalStarted"};
 
   @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_LEGACY_METRICS, "false");
+    cfg.setProperty(Property.GC_METRICS_ENABLED, "true");
+  }
+
+  @Override
   protected int defaultTimeoutSeconds() {
     return 4 * 60;
   }
@@ -55,8 +63,14 @@ public class GcMetricsIT extends AccumuloClusterHarness {
   @Test
   public void gcMetricsPublished() {
 
-    if 
(!cluster.getSiteConfiguration().getBoolean(Property.GC_METRICS_ENABLED)) {
-      log.info("gc metrics are disabled - set GC_METRICS_ENABLED to run test");
+    boolean gcMetricsEnabled =
+        cluster.getSiteConfiguration().getBoolean(Property.GC_METRICS_ENABLED);
+    boolean useLegacyMetrics =
+        
cluster.getSiteConfiguration().getBoolean(Property.GENERAL_LEGACY_METRICS);
+
+    if (!gcMetricsEnabled || useLegacyMetrics) {
+      log.info("gc metrics are disabled with GC_METRICS_ENABLED={}, 
GENERAL_LEGACY_METRICS={}",
+          gcMetricsEnabled, useLegacyMetrics);
       return;
     }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/LegacyMetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/LegacyMetricsIT.java
new file mode 100644
index 0000000..961c859
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/LegacyMetricsIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic functional test to verify enabling legacy metrics does not kill 
master.
+ */
+public class LegacyMetricsIT extends AccumuloClusterHarness {
+
+  private static final Logger log = 
LoggerFactory.getLogger(LegacyMetricsIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_LEGACY_METRICS, "true");
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  /**
+   * Validates that the expected metrics are published - this excludes the 
dynamic metrics derived
+   * from operation types.
+   */
+  @Test
+  public void useMaster() {
+
+    boolean legacyMetricsEnabled =
+        
cluster.getSiteConfiguration().getBoolean(Property.GENERAL_LEGACY_METRICS);
+
+    assertTrue(legacyMetricsEnabled);
+
+    List<String> tables = new ArrayList<>();
+
+    // number of tables / concurrent compactions used during testing.
+    int tableCount = 4;
+    for (int i = 0; i < tableCount; i++) {
+      String uniqueName = getUniqueNames(1)[0] + "_" + i;
+      tables.add(uniqueName);
+      try {
+        getConnector().tableOperations().create(uniqueName);
+      } catch (AccumuloException | AccumuloSecurityException | 
TableExistsException ex) {
+        log.debug("Failed to create table: {}", uniqueName, ex);
+        fail("failed to create table: " + uniqueName);
+      }
+    }
+
+    // use calls that should need the master
+
+    try {
+
+      getConnector().instanceOperations().waitForBalance();
+
+      Map<String,String> configs = 
getConnector().instanceOperations().getSystemConfiguration();
+      assertFalse("master config should not be empty", configs.isEmpty());
+
+    } catch (AccumuloException | AccumuloSecurityException ex) {
+      fail("Could not get config from master");
+    }
+
+    // clean-up cancel running compactions
+    for (String t : tables) {
+      try {
+        log.debug("Delete test table: {}", t);
+        getConnector().tableOperations().delete(t);
+      } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException ex) {
+        log.debug("Exception thrown deleting table during test clean-up", ex);
+      }
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MasterMetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/MasterMetricsIT.java
new file mode 100644
index 0000000..367d0e0
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MasterMetricsIT.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.MetricsFileTailer;
+import org.apache.accumulo.test.util.SlowOps;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Functional test that uses a hadoop metrics 2 file sink to read published 
metrics for
+ * verification.
+ */
+public class MasterMetricsIT extends AccumuloClusterHarness {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MasterMetricsIT.class);
+
+  private static final int NUM_TAIL_ATTEMPTS = 20;
+  private static final long TAIL_DELAY = 5_000;
+
+  // number of tables / concurrent compactions used during testing.
+  private final int tableCount = 4;
+
+  private long maxWait;
+
+  private static final Set<String> REQUIRED_METRIC_KEYS =
+      new HashSet<>(Arrays.asList("currentFateOps", "totalFateOps", 
"totalZkConnErrors",
+          "FateTxState_NEW", "FateTxState_IN_PROGRESS", 
"FateTxState_FAILED_IN_PROGRESS",
+          "FateTxState_FAILED", "FateTxState_SUCCESSFUL", 
"FateTxState_UNKNOWN"));
+
+  private static final Set<String> OPTIONAL_METRIC_KEYS =
+      new HashSet<>(Collections.singletonList("FateTxOpType_CompactRange"));
+
+  private MetricsFileTailer metricsTail = null;
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_LEGACY_METRICS, "false");
+    cfg.setProperty(Property.MASTER_FATE_METRICS_ENABLED, "true");
+    cfg.setProperty(Property.MASTER_FATE_METRICS_MIN_UPDATE_INTERVAL, "5s");
+  }
+
+  @Before
+  public void setup() {
+
+    if (testDisabled()) {
+      return;
+    }
+
+    maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : 
((defaultTimeoutSeconds() * 1000) / 2);
+
+    metricsTail = new MetricsFileTailer("accumulo.sink.file-master");
+    Thread t1 = new Thread(metricsTail);
+    t1.start();
+
+  }
+
+  @After
+  public void cleanup() {
+    if (metricsTail != null) {
+      metricsTail.close();
+    }
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  /**
+   * Validates that the expected metrics are published - this excludes the 
dynamic metrics derived
+   * from operation types.
+   */
+  @Test
+  public void metricsPublished() {
+
+    if (testDisabled()) {
+      log.info("Skipping test - master metrics not enabled.");
+      return;
+    }
+
+    // throw away first update - could be from previous test (possible with 
cluster
+    // restarts in each test)
+    MetricsFileTailer.LineUpdate firstUpdate =
+        metricsTail.waitForUpdate(-1, NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    firstUpdate =
+        metricsTail.waitForUpdate(firstUpdate.getLastUpdate(), 
NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    Map<String,Long> firstSeenMap = parseLine(firstUpdate.getLine());
+
+    log.debug("Line received: {}", firstUpdate.getLine());
+    log.info("Expected metrics count: {}", REQUIRED_METRIC_KEYS.size());
+    log.info("Received metrics count: {},  values:{}", firstSeenMap.size(), 
firstSeenMap);
+
+    assertTrue(lookForExpectedKeys(firstSeenMap));
+    sanity(firstSeenMap);
+
+    MetricsFileTailer.LineUpdate nextUpdate =
+        metricsTail.waitForUpdate(firstUpdate.getLastUpdate(), 
NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    Map<String,Long> updateSeenMap = parseLine(nextUpdate.getLine());
+
+    log.debug("Line received:{}", nextUpdate.getLine());
+    log.trace("Mapped values:{}", updateSeenMap);
+
+    assertTrue(lookForExpectedKeys(updateSeenMap));
+    sanity(updateSeenMap);
+
+    validate(firstSeenMap, updateSeenMap);
+  }
+
+  /**
+   * Run a few compactions - this should trigger the a dynamic op type to be 
included in the
+   * metrics.
+   */
+  @Test
+  public void compactionMetrics() {
+
+    if (testDisabled()) {
+      log.info("Skipping test - MASTER_FATE_METRICS_ENABLED is not enabled");
+      return;
+    }
+
+    MetricsFileTailer.LineUpdate firstUpdate =
+        metricsTail.waitForUpdate(-1, NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    List<SlowOps> tables = new ArrayList<>();
+
+    for (int i = 0; i < tableCount; i++) {
+      String uniqueName = getUniqueNames(1)[0] + "_" + i;
+      SlowOps gen = new SlowOps(getConnector(), uniqueName, maxWait, 
tableCount);
+      tables.add(gen);
+      gen.startCompactTask();
+    }
+
+    // check file tailer here....
+    MetricsFileTailer.LineUpdate nextUpdate =
+        metricsTail.waitForUpdate(firstUpdate.getLastUpdate(), 
NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    log.info("Received metrics {}", nextUpdate);
+
+    Map<String,String> results = blockForRequiredTables();
+
+    assertFalse(results.isEmpty());
+    log.info("IN_PROGRESS: {}", results.get("FateTxState_IN_PROGRESS"));
+
+    assertTrue(Long.parseLong(results.get("FateTxState_IN_PROGRESS")) >= 
tableCount);
+    assertTrue(Long.parseLong(results.get("FateTxOpType_CompactRange")) >= 
tableCount);
+
+    for (String k : OPTIONAL_METRIC_KEYS) {
+      assertTrue(results.containsKey(k));
+      assertTrue(Long.parseLong(results.get(k)) >= tableCount);
+    }
+
+    // clean-up cancel running compactions
+    for (SlowOps t : tables) {
+      try {
+        getConnector().tableOperations().cancelCompaction(t.getTableName());
+        // block if compaction still running
+        boolean cancelled = t.blockWhileCompactionRunning();
+        if (!cancelled) {
+          log.info("Failed to cancel compaction during multiple compaction 
test clean-up for {}",
+              t.getTableName());
+        }
+      } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException ex) {
+        log.debug("Exception thrown during multiple table test clean-up", ex);
+      }
+    }
+
+    for (SlowOps t : tables) {
+      try {
+        log.debug("delete table {}", t.getTableName());
+        getConnector().tableOperations().delete(t.getTableName());
+      } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
+        // empty
+      }
+    }
+    // wait for one more metrics update after compactions cancelled.
+    MetricsFileTailer.LineUpdate update =
+        metricsTail.waitForUpdate(0L, NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    metricsTail.waitForUpdate(update.getLastUpdate(), NUM_TAIL_ATTEMPTS, 
TAIL_DELAY);
+
+    results = metricsTail.parseLine("");
+
+    log.info("Received metrics {}", results);
+
+  }
+
+  private Map<String,String> blockForRequiredTables() {
+
+    MetricsFileTailer.LineUpdate update =
+        metricsTail.waitForUpdate(0L, NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+    for (int i = 0; i < 20; i++) {
+
+      update = metricsTail.waitForUpdate(update.getLastUpdate(), 
NUM_TAIL_ATTEMPTS, TAIL_DELAY);
+
+      log.info("Received metrics update {}", update);
+
+      Map<String,String> results = metricsTail.parseLine("");
+
+      if (results != null && results.size() > 0
+          && Long.parseLong(results.get("currentFateOps")) >= tableCount) {
+        log.info("Found required number of fate operations");
+        return results;
+      }
+      try {
+        Thread.sleep(10_000);
+      } catch (InterruptedException iex) {
+        Thread.currentThread().interrupt();
+        return Collections.emptyMap();
+      }
+
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Validate metrics for consistency with in a run cycle.
+   *
+   * @param values
+   *          map of values from one run cycle.
+   */
+  private void sanity(final Map<String,Long> values) {
+
+    assertTrue(values.get("currentFateOps") <= values.get("totalFateOps"));
+
+    long total = values.entrySet().stream().filter(x -> 
x.getKey().startsWith("FateTxState_"))
+        .mapToLong(Map.Entry::getValue).sum();
+
+    assertTrue(total >= values.get("currentFateOps"));
+  }
+
+  /**
+   * A series of sanity checks for the metrics between different update 
cycles, some values should
+   * be at least different, and some of the checks can include ordering.
+   *
+   * @param firstSeen
+   *          map of first metric update
+   * @param nextSeen
+   *          map of a later metric update.
+   */
+  private void validate(Map<String,Long> firstSeen, Map<String,Long> nextSeen) 
{
+    // total fate ops should not decrease.
+    log.debug("Total fate ops.  Before:{}, Update:{}", 
firstSeen.get("totalFateOps"),
+        nextSeen.get("totalFateOps"));
+    assertTrue(firstSeen.get("totalFateOps") <= nextSeen.get("totalFateOps"));
+  }
+
+  /**
+   * The hadoop metrics file sink published records as a line with comma 
separated key=value pairs.
+   * This method parses the line and extracts the key, value pair from metrics 
that start with AccGc
+   * and returns them in a sort map.
+   *
+   * @param line
+   *          a line from the metrics system file sink.
+   * @return a map of the metrics that match REQUIRED_METRICS_KEYS
+   */
+  private Map<String,Long> parseLine(final String line) {
+
+    if (line == null) {
+      return Collections.emptyMap();
+    }
+
+    Map<String,Long> m = new TreeMap<>();
+
+    String[] csvTokens = line.split(",");
+
+    for (String token : csvTokens) {
+      token = token.trim();
+      String[] parts = token.split("=");
+      if (REQUIRED_METRIC_KEYS.contains(parts[0])) {
+        m.put(parts[0], Long.parseLong(parts[1]));
+      }
+    }
+    return m;
+  }
+
+  private boolean lookForExpectedKeys(final Map<String,Long> received) {
+
+    for (String e : REQUIRED_METRIC_KEYS) {
+      if (!received.containsKey(e)) {
+        log.info("Couldn't find {}", e);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * FATE metrics only valid when MASTER_FATE_METRICS_ENABLED=true and 
GENERAL_LEGACY_METRICS=flase
+   *
+   * @return true if test should run
+   */
+  private boolean testDisabled() {
+
+    boolean fateMetricsEnabled =
+        
cluster.getSiteConfiguration().getBoolean(Property.MASTER_FATE_METRICS_ENABLED);
+
+    boolean useLegacyMetrics =
+        
cluster.getSiteConfiguration().getBoolean(Property.GENERAL_LEGACY_METRICS);
+
+    if (!fateMetricsEnabled || useLegacyMetrics) {
+
+      log.info("master fate metrics are disabled - 
MASTER_FATE_METRICS_ENABLED={}, "
+          + "GENERAL_LEGACY_METRICS={}", fateMetricsEnabled, useLegacyMetrics);
+
+      return true;
+    }
+
+    return false;
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java
index ea829f4..1929f19 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java
@@ -19,7 +19,11 @@ package org.apache.accumulo.test.metrics;
 import java.io.File;
 import java.io.RandomAccessFile;
 import java.net.URL;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -178,6 +182,49 @@ public class MetricsFileTailer implements Runnable, 
AutoCloseable {
   }
 
   /**
+   * The hadoop metrics file sink published records as a line with comma 
separated key=value pairs.
+   * This method parses the line and extracts the key, value pair from metrics 
that start with an
+   * optional prefix and returns them in a sort map. If the prefix is null or 
empty, all keys are
+   * accepted.
+   *
+   * @param prefix
+   *          optional filter - include metrics that start with provided 
value..
+   * @return a map of the metrics that start with AccGc
+   */
+  public Map<String,String> parseLine(final String prefix) {
+
+    String line = getLast();
+
+    if (line == null) {
+      return Collections.emptyMap();
+    }
+
+    Map<String,String> m = new TreeMap<>();
+
+    String[] csvTokens = line.split(",");
+
+    for (String token : csvTokens) {
+      token = token.trim();
+      if (filter(prefix, token)) {
+        String[] parts = token.split("=");
+        m.put(parts[0], parts[1]);
+      }
+    }
+    return m;
+  }
+
+  private boolean filter(final String prefix, final String candidate) {
+    if (candidate == null) {
+      return false;
+    }
+
+    if (prefix == null || prefix.isEmpty()) {
+      return true;
+    }
+    return candidate.startsWith(prefix);
+  }
+
+  /**
    * A loop that polls for changes and when the file changes, put the last 
line in a buffer that can
    * be retrieved by clients using getLast().
    */
@@ -245,4 +292,54 @@ public class MetricsFileTailer implements Runnable, 
AutoCloseable {
   public void close() {
     running.set(Boolean.FALSE);
   }
+
+  // utilities to block, waiting for update - call from process thread
+
+  public static class LineUpdate {
+    private final long lastUpdate;
+    private final String line;
+
+    public LineUpdate(long lastUpdate, String line) {
+      this.lastUpdate = lastUpdate;
+      this.line = line;
+    }
+
+    public long getLastUpdate() {
+      return lastUpdate;
+    }
+
+    public String getLine() {
+      return line;
+    }
+
+    @Override
+    public String toString() {
+      return "LineUpdate{" + "lastUpdate=" + lastUpdate + ", line='" + line + 
'\'' + '}';
+    }
+  }
+
+  public LineUpdate waitForUpdate(final long prevUpdate, final int 
maxAttempts, final long delay) {
+
+    for (int count = 0; count < maxAttempts; count++) {
+
+      String line = getLast();
+      long currUpdate = getLastUpdate();
+
+      if (line != null && (currUpdate != prevUpdate)) {
+        return new LineUpdate(getLastUpdate(), line);
+      }
+
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(ex);
+      }
+    }
+    // not found - throw exception.
+    throw new IllegalStateException(
+        String.format("File source update not received after %d tries in %d 
sec", maxAttempts,
+            TimeUnit.MILLISECONDS.toSeconds(delay * maxAttempts)));
+  }
+
 }

Reply via email to