This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f867bd  Update gc metrics reporting to use hadoop metrics2 (#1381)
8f867bd is described below

commit 8f867bd642c7558f4183de22bea9945d9e6a4d09
Author: EdColeman <d...@etcoleman.com>
AuthorDate: Tue Oct 8 14:58:44 2019 -0400

    Update gc metrics reporting to use hadoop metrics2 (#1381)
    
    * Update gc metrics reporting to use hadoop metrics2
    
    - Publish current gc metrics to hadoop 2 metrics reporting system.
    - Add gc run counter that increments on each gc cycle run
    - Add metric to track time required for gc post op (compact, flush, none)
    
    * address pull request comments
    
    * address review comments
---
 .../org/apache/accumulo/core/conf/Property.java    |   7 +-
 .../apache/accumulo/server/metrics/Metrics.java    |  14 ++
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  21 ++
 .../apache/accumulo/gc/metrics/GcCycleMetrics.java | 121 ++++++++++
 .../org/apache/accumulo/gc/metrics/GcMetrics.java  | 128 +++++++++++
 .../accumulo/gc/metrics/GcMetricsFactory.java      |  60 +++++
 .../accumulo/test/functional/GcMetricsIT.java      | 220 ++++++++++++++++++
 .../accumulo/test/metrics/MetricsFileTailer.java   | 255 +++++++++++++++++++++
 .../test/metrics/MetricsTestSinkProperties.java    |  28 +++
 .../test/metrics/MetricsFileTailerTest.java        | 106 +++++++++
 .../resources/hadoop-metrics2-accumulo.properties  |  11 +-
 11 files changed, 961 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 7be7355..8243134 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -260,7 +260,7 @@ public enum Property {
   MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation",
       "org.apache.accumulo.server.master.recovery.HadoopLogCloser", 
PropertyType.CLASSNAME,
       "A class that implements a mechanism to steal write access to a 
write-ahead log"),
-  MASTER_FATE_METRICS_ENABLED("master.fate.metrics.enabled", "false", 
PropertyType.BOOLEAN,
+  MASTER_FATE_METRICS_ENABLED("master.fate.metrics.enabled", "true", 
PropertyType.BOOLEAN,
       "Enable reporting of FATE metrics in JMX (and logging with Hadoop 
Metrics2"),
   
MASTER_FATE_METRICS_MIN_UPDATE_INTERVAL("master.fate.metrics.min.update.interval",
 "60s",
       PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper 
to update interval"),
@@ -529,8 +529,9 @@ public enum Property {
       "When the gc runs it can make a lot of changes to the metadata, on 
completion, "
           + " to force the changes to be written to disk, the metadata and 
root tables can be flushed"
           + " and possibly compacted. Legal values are: compact - which both 
flushes and compacts the"
-          + " metadata; flush - which flushes only (compactions may be 
triggered if required); or none."
-          + " Since 2.0, the default is flush. Previously the default action 
was a full compaction."),
+          + " metadata; flush - which flushes only (compactions may be 
triggered if required); or none"),
+  GC_METRICS_ENABLED("gc.metrics.enabled", "true", PropertyType.BOOLEAN,
+      "Enable detailed gc metrics reporting with hadoop metrics."),
 
   // properties that are specific to the monitor server behavior
   MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX,
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java 
b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java
index eccaaf5..c1ed9ed 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java
@@ -29,6 +29,20 @@ import org.apache.hadoop.metrics2.source.JvmMetricsInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Accumulo will search for a file named hadoop-metrics-accumulo.properties on 
the Accumulo
+ * classpath to configute the hadoop metrics2 system. The hadoop metrics 
system publishes to jmx and
+ * can be configured, via a configuration file, to publish to other metric 
collection systems
+ * (files,...)
+ * <p>
+ * A note on naming: The naming for jmx vs the hadoop metrics systems are 
slightly different. Hadoop
+ * metrics records will start with CONTEXT.RECORD, for example, 
accgc.AccGcCycleMetrics. The context
+ * parameter value is also used by the configuration file for sink 
configuration.
+ * <p>
+ * In JMX, the hierarchy is: 
Hadoop..Accumulo..[jmxName]..[processName]..attributes..[name]
+ * <p>
+ * For jvm metrics, the hierarchy is 
Hadoop..Accumulo..JvmMetrics..attributes..[name]
+ */
 public abstract class Metrics implements MetricsSource {
 
   private static String processName = "Unknown";
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 62d9f62..16d4a78 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -71,6 +71,8 @@ import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.gc.metrics.GcCycleMetrics;
+import org.apache.accumulo.gc.metrics.GcMetricsFactory;
 import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerConstants;
@@ -117,6 +119,8 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
   private GCStatus status =
       new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), 
new GcCycleStats());
 
+  private GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();
+
   public static void main(String[] args) throws Exception {
     try (SimpleGarbageCollector gc = new SimpleGarbageCollector(new 
ServerOpts(), args)) {
       gc.runServer();
@@ -128,6 +132,14 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
 
     final AccumuloConfiguration conf = getConfiguration();
 
+    boolean gcMetricsRegistered = new GcMetricsFactory(conf).register(this);
+
+    if (gcMetricsRegistered) {
+      log.info("gc metrics modules registered with metrics system");
+    } else {
+      log.info("Failed to register gc metrics module");
+    }
+
     final long gcDelay = conf.getTimeInMillis(Property.GC_CYCLE_DELAY);
     final String useFullCompaction = conf.get(Property.GC_USE_FULL_COMPACTION);
 
@@ -506,6 +518,7 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
 
             status.current.finished = System.currentTimeMillis();
             status.last = status.current;
+            gcCycleMetrics.setLastCollect(status.current);
             status.current = new GcCycleStats();
 
           } catch (Exception e) {
@@ -534,6 +547,7 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
                 new GarbageCollectWriteAheadLogs(getContext(), fs, 
liveTServerSet, isUsingTrash());
             log.info("Beginning garbage collection of write-ahead logs");
             walogCollector.collect(status);
+            gcCycleMetrics.setLastWalCollect(status.lastLog);
           } catch (Exception e) {
             log.error("{}", e.getMessage(), e);
           }
@@ -563,6 +577,8 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
 
           final long actionComplete = System.nanoTime();
 
+          gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart);
+
           log.info("gc post action {} completed in {} seconds", action, 
String.format("%.2f",
               (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 
1000.0)));
 
@@ -571,6 +587,7 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
         }
       }
       try {
+        gcCycleMetrics.incrementRunCycleCount();
         long gcDelay = 
getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
         log.debug("Sleeping for {} milliseconds", gcDelay);
         Thread.sleep(gcDelay);
@@ -696,4 +713,8 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
   public GCStatus getStatus(TInfo info, TCredentials credentials) {
     return status;
   }
+
+  public GcCycleMetrics getGcCycleMetrics() {
+    return gcCycleMetrics;
+  }
 }
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java
new file mode 100644
index 0000000..4caf8c3
--- /dev/null
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.gc.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+
+/**
+ * Wrapper class for GcCycleStats so that underlying thrift code in 
GcCycleStats is not modified.
+ * Provides Thread safe access to the gc cycle stats for metrics reporting.
+ */
+public class GcCycleMetrics {
+
+  private AtomicReference<GcCycleStats> lastCollect = new 
AtomicReference<>(new GcCycleStats());
+  private AtomicReference<GcCycleStats> lastWalCollect = new 
AtomicReference<>(new GcCycleStats());
+
+  private AtomicLong postOpDurationNanos = new AtomicLong(0);
+  private AtomicLong runCycleCount = new AtomicLong(0);
+
+  public GcCycleMetrics() {}
+
+  /**
+   * Get the last gc run statistics.
+   *
+   * @return the statistics for the last gc run.
+   */
+  GcCycleStats getLastCollect() {
+    return lastCollect.get();
+  }
+
+  /**
+   * Set the last gc run statistics. Makes a defensive deep copy so that if 
the gc implementation
+   * modifies the values.
+   *
+   * @param lastCollect
+   *          the last gc run statistics.
+   */
+  public void setLastCollect(final GcCycleStats lastCollect) {
+    this.lastCollect.set(new GcCycleStats(lastCollect));
+  }
+
+  /**
+   * The statistics from the last wal collection.
+   *
+   * @return the last wal collection statistics.
+   */
+  GcCycleStats getLastWalCollect() {
+    return lastWalCollect.get();
+  }
+
+  /**
+   * Set the lost wal collection statistics
+   *
+   * @param lastWalCollect
+   *          last wal statistics
+   */
+  public void setLastWalCollect(final GcCycleStats lastWalCollect) {
+    this.lastWalCollect.set(new GcCycleStats(lastWalCollect));
+  }
+
+  /**
+   * Duration of post operation (compact, flush, none) in nanoseconds.
+   *
+   * @return duration in nanoseconds.
+   */
+  long getPostOpDurationNanos() {
+    return postOpDurationNanos.get();
+  }
+
+  /**
+   * Set the duration of post operation (compact, flush, none) in nanoseconds.
+   *
+   * @param postOpDurationNanos
+   *          the duration, in nanoseconds.
+   */
+  public void setPostOpDurationNanos(long postOpDurationNanos) {
+    this.postOpDurationNanos.set(postOpDurationNanos);
+  }
+
+  /**
+   * The number of gc cycles that have completed since initialization at 
process start.
+   *
+   * @return current run cycle count.
+   */
+  long getRunCycleCount() {
+    return runCycleCount.get();
+  }
+
+  /**
+   * Increment the gc run cycle count by one.
+   */
+  public void incrementRunCycleCount() {
+    this.runCycleCount.incrementAndGet();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("GcMetricsValues{");
+    sb.append("lastCollect=").append(lastCollect.get());
+    sb.append(", lastWalCollect=").append(lastWalCollect.get());
+    sb.append(", postOpDuration=").append(postOpDurationNanos.get());
+    sb.append('}');
+    return sb.toString();
+  }
+}
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java
new file mode 100644
index 0000000..b1b9020
--- /dev/null
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.gc.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * Expected to be instantiated with GcMetricsFactory. This will configure both 
jmx and the hadoop
+ * metrics systems. The naming convention, in hadoop metrics2, the records 
will appear as
+ * CONTEXT.RECORD (accgc.AccGcCycleMetrics). The value for context is also 
used by the configuration
+ * file for sink configuration.
+ */
+public class GcMetrics extends Metrics {
+
+  // use common prefix, different that just gc, to prevent confusion with jvm 
gc metrics.
+  public static final String GC_METRIC_PREFIX = "AccGc";
+
+  private static final String jmxName = "GarbageCollector";
+  private static final String description = "Accumulo garbage collection 
metrics";
+  private static final String record = "AccGcCycleMetrics";
+
+  private final SimpleGarbageCollector gc;
+
+  // metrics gauges / counters.
+  private final MutableGaugeLong gcStarted;
+  private final MutableGaugeLong gcFinished;
+  private final MutableGaugeLong gcCandidates;
+  private final MutableGaugeLong gcInUse;
+  private final MutableGaugeLong gcDeleted;
+  private final MutableGaugeLong gcErrors;
+
+  private final MutableGaugeLong walStarted;
+  private final MutableGaugeLong walFinished;
+  private final MutableGaugeLong walCandidates;
+  private final MutableGaugeLong walInUse;
+  private final MutableGaugeLong walDeleted;
+  private final MutableGaugeLong walErrors;
+
+  private final MutableGaugeLong postOpDuration;
+  private final MutableGaugeLong runCycleCount;
+
+  GcMetrics(final SimpleGarbageCollector gc) {
+    super(jmxName + ",sub=" + gc.getClass().getSimpleName(), description, 
"accgc", record);
+    this.gc = gc;
+
+    MetricsRegistry registry = super.getRegistry();
+
+    gcStarted = registry.newGauge(GC_METRIC_PREFIX + "Started",
+        "Timestamp GC file collection cycle started", 0L);
+    gcFinished = registry.newGauge(GC_METRIC_PREFIX + "Finished",
+        "Timestamp GC file collect cycle finished", 0L);
+    gcCandidates = registry.newGauge(GC_METRIC_PREFIX + "Candidates",
+        "Number of files that are candidates for deletion", 0L);
+    gcInUse =
+        registry.newGauge(GC_METRIC_PREFIX + "InUse", "Number of candidate 
files still in use", 0L);
+    gcDeleted =
+        registry.newGauge(GC_METRIC_PREFIX + "Deleted", "Number of candidate 
files deleted", 0L);
+    gcErrors =
+        registry.newGauge(GC_METRIC_PREFIX + "Errors", "Number of candidate 
deletion errors", 0L);
+
+    walStarted = registry.newGauge(GC_METRIC_PREFIX + "WalStarted",
+        "Timestamp GC WAL collection started", 0L);
+    walFinished = registry.newGauge(GC_METRIC_PREFIX + "WalFinished",
+        "Timestamp GC WAL collection finished", 0L);
+    walCandidates = registry.newGauge(GC_METRIC_PREFIX + "WalCandidates",
+        "Number of files that are candidates for deletion", 0L);
+    walInUse = registry.newGauge(GC_METRIC_PREFIX + "WalInUse",
+        "Number of wal file candidates that are still in use", 0L);
+    walDeleted = registry.newGauge(GC_METRIC_PREFIX + "WalDeleted",
+        "Number of candidate wal files deleted", 0L);
+    walErrors = registry.newGauge(GC_METRIC_PREFIX + "WalErrors",
+        "Number candidate wal file deletion errors", 0L);
+
+    postOpDuration = registry.newGauge(GC_METRIC_PREFIX + "PostOpDuration",
+        "GC metadata table post operation duration in milliseconds", 0L);
+
+    runCycleCount = registry.newGauge(GC_METRIC_PREFIX + "RunCycleCount",
+        "gauge incremented each gc cycle run, rest on process start", 0L);
+
+  }
+
+  @Override
+  protected void prepareMetrics() {
+
+    GcCycleMetrics values = gc.getGcCycleMetrics();
+
+    GcCycleStats lastFileCollect = values.getLastCollect();
+
+    gcStarted.set(lastFileCollect.getStarted());
+    gcFinished.set(lastFileCollect.getFinished());
+    gcCandidates.set(lastFileCollect.getCandidates());
+    gcInUse.set(lastFileCollect.getInUse());
+    gcDeleted.set(lastFileCollect.getDeleted());
+    gcErrors.set(lastFileCollect.getErrors());
+
+    GcCycleStats lastWalCollect = values.getLastWalCollect();
+
+    walStarted.set(lastWalCollect.getStarted());
+    walFinished.set(lastWalCollect.getFinished());
+    walCandidates.set(lastWalCollect.getCandidates());
+    walInUse.set(lastWalCollect.getInUse());
+    walDeleted.set(lastWalCollect.getDeleted());
+    walErrors.set(lastWalCollect.getErrors());
+
+    
postOpDuration.set(TimeUnit.NANOSECONDS.toMillis(values.getPostOpDurationNanos()));
+    runCycleCount.set(values.getRunCycleCount());
+  }
+}
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetricsFactory.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetricsFactory.java
new file mode 100644
index 0000000..15a8ad3
--- /dev/null
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetricsFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.gc.metrics;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GcMetricsFactory {
+
+  private final static Logger log = 
LoggerFactory.getLogger(GcMetricsFactory.class);
+
+  private boolean enableMetrics;
+
+  public GcMetricsFactory(AccumuloConfiguration conf) {
+    requireNonNull(conf, "AccumuloConfiguration must not be null");
+    enableMetrics = conf.getBoolean(Property.GC_METRICS_ENABLED);
+  }
+
+  public boolean register(SimpleGarbageCollector gc) {
+
+    if (!enableMetrics) {
+      log.info("Accumulo gc metrics are disabled.  To enable, set {} in 
configuration",
+          Property.GC_METRICS_ENABLED);
+      return false;
+    }
+
+    try {
+
+      MetricsSystem metricsSystem = gc.getMetricsSystem();
+
+      new GcMetrics(gc).register(metricsSystem);
+
+      return true;
+
+    } catch (Exception ex) {
+      log.error("Failed to register accumulo gc metrics", ex);
+      return false;
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java
new file mode 100644
index 0000000..eab1956
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.gc.metrics.GcMetrics;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.metrics.MetricsFileTailer;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Functional test that uses a hadoop metrics 2 file sink to read published 
metrics for
+ * verification.
+ */
+public class GcMetricsIT extends AccumuloClusterHarness {
+
+  private static final Logger log = LoggerFactory.getLogger(GcMetricsIT.class);
+
+  private AccumuloClient accumuloClient;
+
+  private static final int NUM_TAIL_ATTEMPTS = 20;
+  private static final long TAIL_DELAY = 5_000;
+
+  private static final String[] EXPECTED_METRIC_KEYS = new String[] 
{"AccGcCandidates",
+      "AccGcDeleted", "AccGcErrors", "AccGcFinished", "AccGcInUse", 
"AccGcPostOpDuration",
+      "AccGcRunCycleCount", "AccGcStarted", "AccGcWalCandidates", 
"AccGcWalDeleted",
+      "AccGcWalErrors", "AccGcWalFinished", "AccGcWalInUse", 
"AccGcWalStarted"};
+
+  @Before
+  public void setup() {
+    accumuloClient = Accumulo.newClient().from(getClientProps()).build();
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Test
+  public void gcMetricsPublished() {
+
+    log.trace("Client started, properties:{}", accumuloClient.properties());
+
+    MetricsFileTailer gcTail = new MetricsFileTailer("accumulo.sink.file-gc");
+    Thread t1 = new Thread(gcTail);
+    t1.start();
+
+    try {
+
+      long testStart = System.currentTimeMillis();
+
+      LineUpdate firstUpdate = waitForUpdate(-1, gcTail);
+
+      Map<String,Long> firstSeenMap = parseLine(firstUpdate.getLine());
+
+      log.trace("L:{}", firstUpdate.getLine());
+      log.trace("M:{}", firstSeenMap);
+
+      assertTrue(lookForExpectedKeys(firstSeenMap));
+      sanity(testStart, firstSeenMap);
+
+      LineUpdate nextUpdate = waitForUpdate(firstUpdate.getLastUpdate(), 
gcTail);
+
+      Map<String,Long> updateSeenMap = parseLine(nextUpdate.getLine());
+
+      log.debug("Line received:{}", nextUpdate.getLine());
+      log.trace("Mapped values:{}", updateSeenMap);
+
+      assertTrue(lookForExpectedKeys(updateSeenMap));
+      sanity(testStart, updateSeenMap);
+
+      validate(firstSeenMap, updateSeenMap);
+
+    } catch (Exception ex) {
+      log.debug("reads", ex);
+    }
+  }
+
+  /**
+   * Validate metrics for consistency withing a run cycle.
+   *
+   * @param values
+   *          map of values from one run cycle.
+   */
+  private void sanity(final long testStart, final Map<String,Long> values) {
+
+    long start = values.get("AccGcStarted");
+    long finished = values.get("AccGcFinished");
+    assertTrue(start >= testStart);
+    assertTrue(finished >= start);
+
+    start = values.get("AccGcWalStarted");
+    finished = values.get("AccGcWalFinished");
+    assertTrue(start >= testStart);
+    assertTrue(finished >= start);
+
+  }
+
+  /**
+   * A series of sanity checks for the metrics between different update 
cycles, some values should
+   * be at least different, and some of the checks can include ordering.
+   *
+   * @param firstSeen
+   *          map of first metric update
+   * @param nextSeen
+   *          map of a later metric update.
+   */
+  private void validate(Map<String,Long> firstSeen, Map<String,Long> nextSeen) 
{
+    assertTrue(nextSeen.get("AccGcStarted") > firstSeen.get("AccGcStarted"));
+    assertTrue(nextSeen.get("AccGcFinished") > 
firstSeen.get("AccGcWalStarted"));
+    assertTrue(nextSeen.get("AccGcRunCycleCount") > 
firstSeen.get("AccGcRunCycleCount"));
+  }
+
+  /**
+   * The hadoop metrics file sink published records as a line with comma 
separated key=value pairs.
+   * This method parses the line and extracts the key, value pair from metrics 
that start with AccGc
+   * and returns them in a sort map.
+   *
+   * @param line
+   *          a line from the metrics system file sink.
+   * @return a map of the metrics that start with AccGc
+   */
+  private Map<String,Long> parseLine(final String line) {
+
+    if (line == null) {
+      return Collections.emptyMap();
+    }
+
+    Map<String,Long> m = new TreeMap<>();
+
+    String[] csvTokens = line.split(",");
+
+    for (String token : csvTokens) {
+      token = token.trim();
+      if (token.startsWith(GcMetrics.GC_METRIC_PREFIX)) {
+        String[] parts = token.split("=");
+        m.put(parts[0], Long.parseLong(parts[1]));
+      }
+    }
+    return m;
+  }
+
+  private static class LineUpdate {
+    private final long lastUpdate;
+    private final String line;
+
+    LineUpdate(long lastUpdate, String line) {
+      this.lastUpdate = lastUpdate;
+      this.line = line;
+    }
+
+    long getLastUpdate() {
+      return lastUpdate;
+    }
+
+    String getLine() {
+      return line;
+    }
+  }
+
+  private LineUpdate waitForUpdate(final long prevUpdate, final 
MetricsFileTailer tail) {
+
+    for (int count = 0; count < NUM_TAIL_ATTEMPTS; count++) {
+
+      String line = tail.getLast();
+      long currUpdate = tail.getLastUpdate();
+
+      if (line != null && (currUpdate != prevUpdate)) {
+        return new LineUpdate(tail.getLastUpdate(), line);
+      }
+
+      try {
+        Thread.sleep(TAIL_DELAY);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(ex);
+      }
+    }
+    // not found - throw exception.
+    throw new IllegalStateException(
+        String.format("File source update not received after %d tries in %d 
sec", NUM_TAIL_ATTEMPTS,
+            TimeUnit.MILLISECONDS.toSeconds(TAIL_DELAY * NUM_TAIL_ATTEMPTS)));
+  }
+
+  private boolean lookForExpectedKeys(final Map<String,Long> received) {
+
+    for (String e : EXPECTED_METRIC_KEYS) {
+      if (!received.containsKey(e)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java
new file mode 100644
index 0000000..7236e06
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.metrics;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.FileBasedConfiguration;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
+import org.apache.commons.configuration2.builder.fluent.Parameters;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class allows testing of the publishing to the hadoop metrics system by 
processing a file for
+ * metric records (written as a line.) The file should be configured using the 
hadoop metrics
+ * properties as a file based sink with the prefix that is provided on 
instantiation of the
+ * instance.
+ *
+ * This class will simulate tail-ing a file and is intended to be run in a 
separate thread. When the
+ * underlying file has data written, the vaule returned by getLastUpdate will 
change, and the last
+ * line can be retrieved with getLast().
+ */
+public class MetricsFileTailer implements Runnable, AutoCloseable {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MetricsFileTailer.class);
+
+  private static final int BUFFER_SIZE = 4;
+
+  private final String metricsPrefix;
+
+  private Lock lock = new ReentrantLock();
+  private AtomicBoolean running = new AtomicBoolean(Boolean.TRUE);
+
+  private AtomicLong lastUpdate = new AtomicLong(0);
+  private long startTime = System.nanoTime();
+
+  private int lineCounter = 0;
+  private String[] lineBuffer = new String[BUFFER_SIZE];
+
+  private final String metricsFilename;
+
+  /**
+   * Create an instance that will tail a metrics file. The filename / path is 
determined by the
+   * hadoop-metrics-accumulo.properties sink configuration for the metrics 
prefix that is provided.
+   *
+   * @param metricsPrefix
+   *          the prefix in the metrics configuration.
+   */
+  public MetricsFileTailer(final String metricsPrefix) {
+
+    this.metricsPrefix = metricsPrefix;
+
+    Configuration sub = loadMetricsConfig();
+
+    // dump received configuration keys received.
+    if (log.isTraceEnabled()) {
+      Iterator<String> keys = sub.getKeys();
+      while (keys.hasNext()) {
+        log.trace("configuration key:{}", keys.next());
+      }
+    }
+
+    if (sub.containsKey("filename")) {
+      metricsFilename = sub.getString("filename");
+    } else {
+      metricsFilename = "";
+    }
+
+  }
+
+  /**
+   * Create an instance by specifying a file directly instead of using the 
metrics configuration -
+   * mainly for testing.
+   *
+   * @param metricsPrefix
+   *          generally can be ignored.
+   * @param filename
+   *          the path / file to be monitored.
+   */
+  MetricsFileTailer(final String metricsPrefix, final String filename) {
+    this.metricsPrefix = metricsPrefix;
+    metricsFilename = filename;
+  }
+
+  /**
+   * Look for the accumulo metrics configuration file on the classpath and 
return the subset for the
+   * http sink.
+   *
+   * @return a configuration with http sink properties.
+   */
+  private Configuration loadMetricsConfig() {
+    try {
+
+      final URL propUrl =
+          
getClass().getClassLoader().getResource(MetricsTestSinkProperties.METRICS_PROP_FILENAME);
+
+      if (propUrl == null) {
+        throw new IllegalStateException(
+            "Could not find " + 
MetricsTestSinkProperties.METRICS_PROP_FILENAME + " on classpath");
+      }
+
+      String filename = propUrl.getFile();
+
+      Parameters params = new Parameters();
+      // Read data from this file
+      File propertiesFile = new File(filename);
+
+      FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
+          new 
FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
+              .configure(params.fileBased().setFile(propertiesFile));
+
+      Configuration config = builder.getConfiguration();
+
+      final Configuration sub = config.subset(metricsPrefix);
+
+      if (log.isTraceEnabled()) {
+        log.trace("Config {}", config);
+        Iterator<String> iterator = sub.getKeys();
+        while (iterator.hasNext()) {
+          String key = iterator.next();
+          log.trace("'{}\'=\'{}\'", key, sub.getProperty(key));
+        }
+      }
+
+      return sub;
+
+    } catch (ConfigurationException ex) {
+      throw new IllegalStateException(
+          String.format("Could not find configuration file \'%s\' on 
classpath",
+              MetricsTestSinkProperties.METRICS_PROP_FILENAME));
+    }
+  }
+
+  /**
+   * Creates a marker value that changes each time a new line is detected. 
Clients can use this to
+   * determine if a call to getLast() will return a new value.
+   *
+   * @return a marker value set when a line is available.
+   */
+  public long getLastUpdate() {
+    return lastUpdate.get();
+  }
+
+  /**
+   * Get the last line seen in the file.
+   *
+   * @return the last line from the file.
+   */
+  public String getLast() {
+    lock.lock();
+    try {
+
+      int last = (lineCounter % BUFFER_SIZE) - 1;
+      if (last < 0) {
+        last = BUFFER_SIZE - 1;
+      }
+      return lineBuffer[last];
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * A loop that polls for changes and when the file changes, put the last 
line in a buffer that can
+   * be retrieved by clients using getLast().
+   */
+  @Override
+  public void run() {
+
+    long filePos = 0;
+
+    File f = new File(metricsFilename);
+
+    while (running.get()) {
+
+      try {
+        Thread.sleep(5_000);
+      } catch (InterruptedException ex) {
+        running.set(Boolean.FALSE);
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      long len = f.length();
+
+      try {
+
+        // file truncated? reset position
+        if (len < filePos) {
+          filePos = 0;
+          lock.lock();
+          try {
+            for (int i = 0; i < BUFFER_SIZE; i++) {
+              lineBuffer[i] = "";
+            }
+            lineCounter = 0;
+          } finally {
+            lock.unlock();
+          }
+        }
+
+        if (len > filePos) {
+          // File must have had something added to it!
+          RandomAccessFile raf = new RandomAccessFile(f, "r");
+          raf.seek(filePos);
+          String line;
+          lock.lock();
+          try {
+            while ((line = raf.readLine()) != null) {
+              lineBuffer[lineCounter++ % BUFFER_SIZE] = line;
+            }
+
+            lastUpdate.set(System.nanoTime() - startTime);
+
+          } finally {
+            lock.unlock();
+          }
+          filePos = raf.getFilePointer();
+          raf.close();
+        }
+      } catch (Exception ex) {
+        log.info("Error processing metrics file {}", metricsFilename, ex);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    running.set(Boolean.FALSE);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsTestSinkProperties.java
 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsTestSinkProperties.java
new file mode 100644
index 0000000..5317755
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsTestSinkProperties.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.metrics;
+
+/**
+ * common properties used with metrics configuration.
+ */
+public class MetricsTestSinkProperties {
+
+  public static final String METRICS_PROP_FILENAME = 
"hadoop-metrics2-accumulo.properties";
+  public static final String ACC_GC_SINK_PREFIX = "accumulo.sink.file-gc";
+  public static final String ACC_MASTER_SINK_PREFIX = 
"accumulo.sink.file-master";
+
+}
diff --git 
a/test/src/test/java/org/apache/accumulo/test/metrics/MetricsFileTailerTest.java
 
b/test/src/test/java/org/apache/accumulo/test/metrics/MetricsFileTailerTest.java
new file mode 100644
index 0000000..59359a2
--- /dev/null
+++ 
b/test/src/test/java/org/apache/accumulo/test/metrics/MetricsFileTailerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.metrics;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsFileTailerTest {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MetricsFileTailerTest.class);
+
+  private static final String TEST_OUTFILE_NAME = "/tmp/testfile.txt";
+  private static final String SUCCESS = "success";
+
+  @AfterClass
+  public static void cleanup() {
+    try {
+      
Files.deleteIfExists(FileSystems.getDefault().getPath(TEST_OUTFILE_NAME));
+    } catch (IOException ex) {
+      log.trace("Failed to clean-up test file " + TEST_OUTFILE_NAME, ex);
+    }
+  }
+
+  /**
+   * Create a file tailer and then write some lines and validate the tailer 
returns the last line.
+   */
+  @Test
+  public void fileUpdates() {
+
+    MetricsFileTailer tailer = new MetricsFileTailer("foo", TEST_OUTFILE_NAME);
+
+    Thread t = new Thread(tailer);
+    t.start();
+
+    long lastUpdate = tailer.getLastUpdate();
+
+    writeToFile();
+
+    boolean passed = Boolean.FALSE;
+
+    int count = 0;
+    while (count++ < 5) {
+      if (lastUpdate != tailer.getLastUpdate()) {
+        lastUpdate = tailer.getLastUpdate();
+        log.trace("{} - {}", tailer.getLastUpdate(), tailer.getLast());
+        if (SUCCESS.compareTo(tailer.getLast()) == 0) {
+          passed = Boolean.TRUE;
+          break;
+        }
+      } else {
+        log.trace("no change");
+      }
+      try {
+        Thread.sleep(5_000);
+      } catch (InterruptedException ex) {
+        // empty
+      }
+    }
+
+    try {
+      tailer.close();
+    } catch (Exception ex) {
+      log.trace("Failed to close file tailer on " + TEST_OUTFILE_NAME, ex);
+    }
+    assertTrue(passed);
+  }
+
+  /**
+   * Simulate write record(s) to the file.
+   */
+  private void writeToFile() {
+    try (FileWriter writer = new FileWriter(TEST_OUTFILE_NAME, true);
+        PrintWriter printWriter = new PrintWriter(writer)) {
+      printWriter.println("foo");
+      // needs to be last line for test to pass
+      printWriter.println(SUCCESS);
+      printWriter.flush();
+    } catch (IOException ex) {
+      throw new IllegalStateException("failed to write data to test file", ex);
+    }
+  }
+}
diff --git a/test/src/test/resources/hadoop-metrics2-accumulo.properties 
b/test/src/test/resources/hadoop-metrics2-accumulo.properties
index e2eb761..e869144 100644
--- a/test/src/test/resources/hadoop-metrics2-accumulo.properties
+++ b/test/src/test/resources/hadoop-metrics2-accumulo.properties
@@ -31,13 +31,10 @@
 accumulo.sink.file-all.class=org.apache.hadoop.metrics2.sink.FileSink
 accumulo.sink.file-all.filename=./target/it.all.metrics
 
-accumulo.sink.test-sink.class=org.apache.accumulo.test.functional.util.Metrics2TestSink
-accumulo.sink.test-sink.context=master
-accumulo.sink.test-sink.filename=test.metrics
-accumulo.sink.test-sink.period=7
-
-# accumulo.sink.test-sink.context=*
-# accumulo.sink.test-sink.period=5
+accumulo.sink.file-gc.class=org.apache.hadoop.metrics2.sink.FileSink
+accumulo.sink.file-gc.context=accgc
+accumulo.sink.file-gc.filename=./target/accgc.metrics
+accumulo.sink.file-gc.period=5
 
 # File sink for tserver metrics
 # accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink

Reply via email to