http://git-wip-us.apache.org/repos/asf/hive/blob/bbb312f3/testutils/ptest2/src/test/resources/HIVE-10761.6.patch
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/test/resources/HIVE-10761.6.patch 
b/testutils/ptest2/src/test/resources/HIVE-10761.6.patch
new file mode 100644
index 0000000..5b41850
--- /dev/null
+++ b/testutils/ptest2/src/test/resources/HIVE-10761.6.patch
@@ -0,0 +1,2539 @@
+diff --git a/common/pom.xml b/common/pom.xml
+index a615c1e..8d4b1ea 100644
+--- a/common/pom.xml
++++ b/common/pom.xml
+@@ -98,6 +98,26 @@
+       <artifactId>json</artifactId>
+       <version>${json.version}</version>
+     </dependency>
++    <dependency>
++      <groupId>io.dropwizard.metrics</groupId>
++      <artifactId>metrics-core</artifactId>
++      <version>${dropwizard.version}</version>
++    </dependency>
++    <dependency>
++      <groupId>io.dropwizard.metrics</groupId>
++      <artifactId>metrics-jvm</artifactId>
++      <version>${dropwizard.version}</version>
++    </dependency>
++    <dependency>
++      <groupId>io.dropwizard.metrics</groupId>
++      <artifactId>metrics-json</artifactId>
++      <version>${dropwizard.version}</version>
++    </dependency>
++    <dependency>
++      <groupId>com.fasterxml.jackson.core</groupId>
++      <artifactId>jackson-databind</artifactId>
++      <version>${jackson.new.version}</version>
++    </dependency>
+   </dependencies>
+ 
+   <profiles>
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+new file mode 100644
+index 0000000..c3949f2
+--- /dev/null
++++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+@@ -0,0 +1,225 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common;
++
++import com.google.common.base.Joiner;
++import com.google.common.base.Preconditions;
++import com.google.common.base.Stopwatch;
++import com.google.common.collect.Lists;
++import com.google.common.collect.Maps;
++import com.google.common.collect.Sets;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
++import org.apache.hadoop.util.Daemon;
++
++import java.lang.management.GarbageCollectorMXBean;
++import java.lang.management.ManagementFactory;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++
++/**
++ * Based on the JvmPauseMonitor from Hadoop.
++ */
++public class JvmPauseMonitor {
++  private static final Log LOG = LogFactory.getLog(
++    JvmPauseMonitor.class);
++
++  /** The target sleep time */
++  private static final long SLEEP_INTERVAL_MS = 500;
++
++  /** log WARN if we detect a pause longer than this threshold */
++  private final long warnThresholdMs;
++  private static final String WARN_THRESHOLD_KEY =
++    "jvm.pause.warn-threshold.ms";
++  private static final long WARN_THRESHOLD_DEFAULT = 10000;
++
++  /** log INFO if we detect a pause longer than this threshold */
++  private final long infoThresholdMs;
++  private static final String INFO_THRESHOLD_KEY =
++    "jvm.pause.info-threshold.ms";
++  private static final long INFO_THRESHOLD_DEFAULT = 1000;
++
++  private long numGcWarnThresholdExceeded = 0;
++  private long numGcInfoThresholdExceeded = 0;
++  private long totalGcExtraSleepTime = 0;
++
++  private Thread monitorThread;
++  private volatile boolean shouldRun = true;
++
++  public JvmPauseMonitor(Configuration conf) {
++    this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, 
WARN_THRESHOLD_DEFAULT);
++    this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, 
INFO_THRESHOLD_DEFAULT);
++  }
++
++  public void start() {
++    Preconditions.checkState(monitorThread == null,
++      "JvmPauseMonitor thread is Already started");
++    monitorThread = new Daemon(new Monitor());
++    monitorThread.start();
++  }
++
++  public void stop() {
++    shouldRun = false;
++    if (isStarted()) {
++      monitorThread.interrupt();
++      try {
++        monitorThread.join();
++      } catch (InterruptedException e) {
++        Thread.currentThread().interrupt();
++      }
++    }
++  }
++
++  public boolean isStarted() {
++    return monitorThread != null;
++  }
++
++  public long getNumGcWarnThreadholdExceeded() {
++    return numGcWarnThresholdExceeded;
++  }
++
++  public long getNumGcInfoThresholdExceeded() {
++    return numGcInfoThresholdExceeded;
++  }
++
++  public long getTotalGcExtraSleepTime() {
++    return totalGcExtraSleepTime;
++  }
++
++  private String formatMessage(long extraSleepTime,
++    Map<String, GcTimes> gcTimesAfterSleep,
++    Map<String, GcTimes> gcTimesBeforeSleep) {
++
++    Set<String> gcBeanNames = Sets.intersection(
++      gcTimesAfterSleep.keySet(),
++      gcTimesBeforeSleep.keySet());
++    List<String> gcDiffs = Lists.newArrayList();
++    for (String name : gcBeanNames) {
++      GcTimes diff = gcTimesAfterSleep.get(name).subtract(
++        gcTimesBeforeSleep.get(name));
++      if (diff.gcCount != 0) {
++        gcDiffs.add("GC pool '" + name + "' had collection(s): " +
++          diff.toString());
++      }
++    }
++
++    String ret = "Detected pause in JVM or host machine (eg GC): " +
++      "pause of approximately " + extraSleepTime + "ms\n";
++    if (gcDiffs.isEmpty()) {
++      ret += "No GCs detected";
++    } else {
++      ret += Joiner.on("\n").join(gcDiffs);
++    }
++    return ret;
++  }
++
++  private Map<String, GcTimes> getGcTimes() {
++    Map<String, GcTimes> map = Maps.newHashMap();
++    List<GarbageCollectorMXBean> gcBeans =
++      ManagementFactory.getGarbageCollectorMXBeans();
++    for (GarbageCollectorMXBean gcBean : gcBeans) {
++      map.put(gcBean.getName(), new GcTimes(gcBean));
++    }
++    return map;
++  }
++
++  private static class GcTimes {
++    private GcTimes(GarbageCollectorMXBean gcBean) {
++      gcCount = gcBean.getCollectionCount();
++      gcTimeMillis = gcBean.getCollectionTime();
++    }
++
++    private GcTimes(long count, long time) {
++      this.gcCount = count;
++      this.gcTimeMillis = time;
++    }
++
++    private GcTimes subtract(GcTimes other) {
++      return new GcTimes(this.gcCount - other.gcCount,
++        this.gcTimeMillis - other.gcTimeMillis);
++    }
++
++    @Override
++    public String toString() {
++      return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
++    }
++
++    private long gcCount;
++    private long gcTimeMillis;
++  }
++
++  private class Monitor implements Runnable {
++    @Override
++    public void run() {
++      Stopwatch sw = new Stopwatch();
++      Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
++      while (shouldRun) {
++        sw.reset().start();
++        try {
++          Thread.sleep(SLEEP_INTERVAL_MS);
++        } catch (InterruptedException ie) {
++          return;
++        }
++        long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
++        Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
++
++        if (extraSleepTime > warnThresholdMs) {
++          ++numGcWarnThresholdExceeded;
++          LOG.warn(formatMessage(
++            extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
++          incrementMetricsCounter("jvm.pause.warn-threshold", 1);
++        } else if (extraSleepTime > infoThresholdMs) {
++          ++numGcInfoThresholdExceeded;
++          LOG.info(formatMessage(
++            extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
++          incrementMetricsCounter("jvm.pause.info-threshold", 1);
++        }
++        incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime);
++        totalGcExtraSleepTime += extraSleepTime;
++        gcTimesBeforeSleep = gcTimesAfterSleep;
++      }
++    }
++
++    private void incrementMetricsCounter(String name, long count) {
++      try {
++        MetricsFactory.getMetricsInstance().incrementCounter(name, count);
++      } catch (Exception e) {
++        LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e);
++      }
++    }
++  }
++
++  /**
++   * Simple 'main' to facilitate manual testing of the pause monitor.
++   *
++   * This main function just leaks memory into a list. Running this class
++   * with a 1GB heap will very quickly go into "GC hell" and result in
++   * log messages about the GC pauses.
++   */
++  public static void main(String []args) throws Exception {
++    new JvmPauseMonitor(new Configuration()).start();
++    List<String> list = Lists.newArrayList();
++    int i = 0;
++    while (true) {
++      list.add(String.valueOf(i++));
++    }
++  }
++}
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java 
b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+new file mode 100644
+index 0000000..14f7afb
+--- /dev/null
++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+@@ -0,0 +1,262 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common.metrics;
++
++import org.apache.hadoop.hive.common.metrics.common.Metrics;
++import org.apache.hadoop.hive.conf.HiveConf;
++
++import java.io.IOException;
++import java.lang.management.ManagementFactory;
++import java.util.HashMap;
++
++import javax.management.MBeanServer;
++import javax.management.MalformedObjectNameException;
++import javax.management.ObjectName;
++
++/**
++ * This class may eventually get superseded by 
org.apache.hadoop.hive.common.metrics2.Metrics.
++ *
++ * Metrics Subsystem  - allows exposure of a number of named 
parameters/counters
++ *                      via jmx, intended to be used as a static subsystem
++ *
++ *                      Has a couple of primary ways it can be used:
++ *                      (i) Using the set and get methods to set and get 
named parameters
++ *                      (ii) Using the incrementCounter method to increment 
and set named
++ *                      parameters in one go, rather than having to make a 
get and then a set.
++ *                      (iii) Using the startScope and endScope methods to 
start and end
++ *                      named "scopes" that record the number of times 
they've been
++ *                      instantiated and amount of time(in milliseconds) 
spent inside
++ *                      the scopes.
++ */
++public class LegacyMetrics implements Metrics {
++
++  private LegacyMetrics() {
++    // block
++  }
++
++  /**
++   * MetricsScope : A class that encapsulates an idea of a metered scope.
++   * Instantiating a named scope and then closing it exposes two counters:
++   *   (i) a "number of calls" counter ( &lt;name&gt;.n ), and
++   *  (ii) a "number of msecs spent between scope open and close" counter. ( 
&lt;name&gt;.t)
++   */
++  public static class MetricsScope {
++
++    final LegacyMetrics metrics;
++
++    final String name;
++    final String numCounter;
++    final String timeCounter;
++    final String avgTimeCounter;
++
++    private boolean isOpen = false;
++    private Long startTime = null;
++
++    /**
++     * Instantiates a named scope - intended to only be called by Metrics, so 
locally scoped.
++     * @param name - name of the variable
++     * @throws IOException
++     */
++    private MetricsScope(String name, LegacyMetrics metrics) throws 
IOException {
++      this.metrics = metrics;
++      this.name = name;
++      this.numCounter = name + ".n";
++      this.timeCounter = name + ".t";
++      this.avgTimeCounter = name + ".avg_t";
++      open();
++    }
++
++    public Long getNumCounter() throws IOException {
++      return (Long) metrics.get(numCounter);
++    }
++
++    public Long getTimeCounter() throws IOException {
++      return (Long) metrics.get(timeCounter);
++    }
++
++    /**
++     * Opens scope, and makes note of the time started, increments run counter
++     * @throws IOException
++     *
++     */
++    public void open() throws IOException {
++      if (!isOpen) {
++        isOpen = true;
++        startTime = System.currentTimeMillis();
++      } else {
++        throw new IOException("Scope named " + name + " is not closed, cannot 
be opened.");
++      }
++    }
++
++    /**
++     * Closes scope, and records the time taken
++     * @throws IOException
++     */
++    public void close() throws IOException {
++      if (isOpen) {
++        Long endTime = System.currentTimeMillis();
++        synchronized(metrics) {
++          Long num = metrics.incrementCounter(numCounter);
++          Long time = metrics.incrementCounter(timeCounter, endTime - 
startTime);
++          if (num != null && time != null) {
++            metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / 
num.doubleValue()));
++          }
++        }
++      } else {
++        throw new IOException("Scope named " + name + " is not open, cannot 
be closed.");
++      }
++      isOpen = false;
++    }
++
++
++    /**
++     * Closes scope if open, and reopens it
++     * @throws IOException
++     */
++    public void reopen() throws IOException {
++      if(isOpen) {
++        close();
++      }
++      open();
++    }
++
++  }
++
++  private static final MetricsMBean metrics = new MetricsMBeanImpl();
++
++  private static final ObjectName oname;
++  static {
++    try {
++      oname = new ObjectName(
++          "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
++    } catch (MalformedObjectNameException mone) {
++      throw new RuntimeException(mone);
++    }
++  }
++
++
++  private static final ThreadLocal<HashMap<String, MetricsScope>> 
threadLocalScopes
++    = new ThreadLocal<HashMap<String,MetricsScope>>() {
++    @Override
++    protected HashMap<String,MetricsScope> initialValue() {
++      return new HashMap<String,MetricsScope>();
++    }
++  };
++
++  private boolean initialized = false;
++
++  public void init(HiveConf conf) throws Exception {
++    if (!initialized) {
++      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
++      mbs.registerMBean(metrics, oname);
++      initialized = true;
++    }
++  }
++
++  public boolean isInitialized() {
++    return initialized;
++  }
++
++  public Long incrementCounter(String name) throws IOException{
++    if (!initialized) {
++      return null;
++    }
++    return incrementCounter(name,Long.valueOf(1));
++  }
++
++  public Long incrementCounter(String name, long increment) throws 
IOException{
++    if (!initialized) {
++      return null;
++    }
++    Long value;
++    synchronized(metrics) {
++      if (!metrics.hasKey(name)) {
++        value = Long.valueOf(increment);
++        set(name, value);
++      } else {
++        value = ((Long)get(name)) + increment;
++        set(name, value);
++      }
++    }
++    return value;
++  }
++
++  public void set(String name, Object value) throws IOException{
++    if (!initialized) {
++      return;
++    }
++    metrics.put(name,value);
++  }
++
++  public Object get(String name) throws IOException{
++    if (!initialized) {
++      return null;
++    }
++    return metrics.get(name);
++  }
++
++  public void startScope(String name) throws IOException{
++    if (!initialized) {
++      return;
++    }
++    if (threadLocalScopes.get().containsKey(name)) {
++      threadLocalScopes.get().get(name).open();
++    } else {
++      threadLocalScopes.get().put(name, new MetricsScope(name, this));
++    }
++  }
++
++  public MetricsScope getScope(String name) throws IOException {
++    if (!initialized) {
++      return null;
++    }
++    if (threadLocalScopes.get().containsKey(name)) {
++      return threadLocalScopes.get().get(name);
++    } else {
++      throw new IOException("No metrics scope named " + name);
++    }
++  }
++
++  public void endScope(String name) throws IOException{
++    if (!initialized) {
++      return;
++    }
++    if (threadLocalScopes.get().containsKey(name)) {
++      threadLocalScopes.get().get(name).close();
++    }
++  }
++
++  /**
++   * Resets the static context state to initial.
++   * Used primarily for testing purposes.
++   *
++   * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
++   */
++  public void deInit() throws Exception {
++    synchronized (metrics) {
++      if (initialized) {
++        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
++        if (mbs.isRegistered(oname)) {
++          mbs.unregisterMBean(oname);
++        }
++        metrics.clear();
++        initialized = false;
++      }
++    }
++  }
++}
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java 
b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
+deleted file mode 100644
+index 01c9d1d..0000000
+--- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
++++ /dev/null
+@@ -1,253 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.hadoop.hive.common.metrics;
+-
+-import java.io.IOException;
+-import java.lang.management.ManagementFactory;
+-import java.util.HashMap;
+-
+-import javax.management.MBeanServer;
+-import javax.management.MalformedObjectNameException;
+-import javax.management.ObjectName;
+-
+-/**
+- * Metrics Subsystem  - allows exposure of a number of named 
parameters/counters
+- *                      via jmx, intended to be used as a static subsystem
+- *
+- *                      Has a couple of primary ways it can be used:
+- *                      (i) Using the set and get methods to set and get 
named parameters
+- *                      (ii) Using the incrementCounter method to increment 
and set named
+- *                      parameters in one go, rather than having to make a 
get and then a set.
+- *                      (iii) Using the startScope and endScope methods to 
start and end
+- *                      named "scopes" that record the number of times 
they've been
+- *                      instantiated and amount of time(in milliseconds) 
spent inside
+- *                      the scopes.
+- */
+-public class Metrics {
+-
+-  private Metrics() {
+-    // block
+-  }
+-  
+-  /**
+-   * MetricsScope : A class that encapsulates an idea of a metered scope.
+-   * Instantiating a named scope and then closing it exposes two counters:
+-   *   (i) a "number of calls" counter ( &lt;name&gt;.n ), and
+-   *  (ii) a "number of msecs spent between scope open and close" counter. ( 
&lt;name&gt;.t)
+-   */
+-  public static class MetricsScope {
+-
+-    final String name;
+-    final String numCounter;
+-    final String timeCounter;
+-    final String avgTimeCounter;
+-    
+-    private boolean isOpen = false;
+-    private Long startTime = null;
+-
+-    /**
+-     * Instantiates a named scope - intended to only be called by Metrics, so 
locally scoped.
+-     * @param name - name of the variable
+-     * @throws IOException
+-     */
+-    private MetricsScope(String name) throws IOException {
+-      this.name = name;
+-      this.numCounter = name + ".n";
+-      this.timeCounter = name + ".t";
+-      this.avgTimeCounter = name + ".avg_t";
+-      open();
+-    }
+-
+-    public Long getNumCounter() throws IOException {
+-      return (Long)Metrics.get(numCounter);
+-    }
+-
+-    public Long getTimeCounter() throws IOException {
+-      return (Long)Metrics.get(timeCounter);
+-    }
+-
+-    /**
+-     * Opens scope, and makes note of the time started, increments run counter
+-     * @throws IOException
+-     *
+-     */
+-    public void open() throws IOException {
+-      if (!isOpen) {
+-        isOpen = true;
+-        startTime = System.currentTimeMillis();
+-      } else {
+-        throw new IOException("Scope named " + name + " is not closed, cannot 
be opened.");
+-      }
+-    }
+-
+-    /**
+-     * Closes scope, and records the time taken
+-     * @throws IOException
+-     */
+-    public void close() throws IOException {
+-      if (isOpen) {
+-        Long endTime = System.currentTimeMillis();
+-        synchronized(metrics) {
+-          Long num = Metrics.incrementCounter(numCounter);
+-          Long time = Metrics.incrementCounter(timeCounter, endTime - 
startTime);
+-          if (num != null && time != null) {
+-            Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / 
num.doubleValue()));
+-          }
+-        }
+-      } else {
+-        throw new IOException("Scope named " + name + " is not open, cannot 
be closed.");
+-      }
+-      isOpen = false;
+-    }
+-
+-
+-    /**
+-     * Closes scope if open, and reopens it
+-     * @throws IOException
+-     */
+-    public void reopen() throws IOException {
+-      if(isOpen) {
+-        close();
+-      }
+-      open();
+-    }
+-
+-  }
+-
+-  private static final MetricsMBean metrics = new MetricsMBeanImpl();
+-
+-  private static final ObjectName oname;
+-  static {
+-    try {
+-      oname = new ObjectName(
+-          "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");      
+-    } catch (MalformedObjectNameException mone) {
+-      throw new RuntimeException(mone);
+-    }
+-  }
+-  
+-  
+-  private static final ThreadLocal<HashMap<String, MetricsScope>> 
threadLocalScopes
+-    = new ThreadLocal<HashMap<String,MetricsScope>>() {
+-    @Override
+-    protected HashMap<String,MetricsScope> initialValue() {
+-      return new HashMap<String,MetricsScope>();
+-    }
+-  };
+-
+-  private static boolean initialized = false;
+-
+-  public static void init() throws Exception {
+-    synchronized (metrics) {
+-      if (!initialized) {
+-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+-        mbs.registerMBean(metrics, oname);
+-        initialized = true;
+-      }
+-    }
+-  }
+-
+-  public static Long incrementCounter(String name) throws IOException{
+-    if (!initialized) {
+-      return null;
+-    }
+-    return incrementCounter(name,Long.valueOf(1));
+-  }
+-
+-  public static Long incrementCounter(String name, long increment) throws 
IOException{
+-    if (!initialized) {
+-      return null;
+-    }
+-    Long value;
+-    synchronized(metrics) {
+-      if (!metrics.hasKey(name)) {
+-        value = Long.valueOf(increment);
+-        set(name, value);
+-      } else {
+-        value = ((Long)get(name)) + increment;
+-        set(name, value);
+-      }
+-    }
+-    return value;
+-  }
+-
+-  public static void set(String name, Object value) throws IOException{
+-    if (!initialized) {
+-      return;
+-    }
+-    metrics.put(name,value);
+-  }
+-
+-  public static Object get(String name) throws IOException{
+-    if (!initialized) {
+-      return null;
+-    }
+-    return metrics.get(name);
+-  }
+-
+-  public static MetricsScope startScope(String name) throws IOException{
+-    if (!initialized) {
+-      return null;
+-    }
+-    if (threadLocalScopes.get().containsKey(name)) {
+-      threadLocalScopes.get().get(name).open();
+-    } else {
+-      threadLocalScopes.get().put(name, new MetricsScope(name));
+-    }
+-    return threadLocalScopes.get().get(name);
+-  }
+-
+-  public static MetricsScope getScope(String name) throws IOException {
+-    if (!initialized) {
+-      return null;
+-    }
+-    if (threadLocalScopes.get().containsKey(name)) {
+-      return threadLocalScopes.get().get(name);
+-    } else {
+-      throw new IOException("No metrics scope named " + name);
+-    }
+-  }
+-
+-  public static void endScope(String name) throws IOException{
+-    if (!initialized) {
+-      return;
+-    }
+-    if (threadLocalScopes.get().containsKey(name)) {
+-      threadLocalScopes.get().get(name).close();
+-    }
+-  }
+-
+-  /**
+-   * Resets the static context state to initial.
+-   * Used primarily for testing purposes.
+-   * 
+-   * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
+-   */
+-  static void uninit() throws Exception {
+-    synchronized (metrics) {
+-      if (initialized) {
+-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+-        if (mbs.isRegistered(oname)) {
+-          mbs.unregisterMBean(oname);
+-        }
+-        metrics.clear();
+-        initialized = false;
+-      }
+-    }
+-  }
+-}
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java 
b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+new file mode 100644
+index 0000000..13a5336
+--- /dev/null
++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+@@ -0,0 +1,68 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common.metrics.common;
++
++import java.io.IOException;
++
++import org.apache.hadoop.hive.conf.HiveConf;
++
++import java.io.IOException;
++
++/**
++ * Generic Metics interface.
++ */
++public interface Metrics {
++
++  /**
++   * Initialize Metrics system with given Hive configuration.
++   * @param conf
++   */
++  public void init(HiveConf conf) throws Exception;
++
++  /**
++   * Deinitializes the Metrics system.
++   */
++  public void deInit() throws Exception;
++
++  /**
++   * @param name
++   * @throws IOException
++   */
++  public void startScope(String name) throws IOException;
++
++  public void endScope(String name) throws IOException;
++
++  //Counter-related methods
++
++  /**
++   * Increments a counter of the given name by 1.
++   * @param name
++   * @return
++   * @throws IOException
++   */
++  public Long incrementCounter(String name) throws IOException;
++
++  /**
++   * Increments a counter of the given name by "increment"
++   * @param name
++   * @param increment
++   * @return
++   * @throws IOException
++   */
++  public Long incrementCounter(String name, long increment) throws 
IOException;
++}
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
 
b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
+new file mode 100644
+index 0000000..12a309d
+--- /dev/null
++++ 
b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
+@@ -0,0 +1,48 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common.metrics.common;
++
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.util.ReflectionUtils;
++
++/**
++ * Class that manages a static Metric instance for this process.
++ */
++public class MetricsFactory {
++
++  private static Metrics metrics;
++  private static Object initLock = new Object();
++
++  public synchronized static void init(HiveConf conf) throws Exception {
++    if (metrics == null) {
++      metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName(
++        conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf);
++    }
++    metrics.init(conf);
++  }
++
++  public synchronized static Metrics getMetricsInstance() {
++    return metrics;
++  }
++
++  public synchronized static void deInit() throws Exception {
++    if (metrics != null) {
++      metrics.deInit();
++    }
++  }
++}
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
 
b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+new file mode 100644
+index 0000000..e59da99
+--- /dev/null
++++ 
b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+@@ -0,0 +1,366 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.hive.common.metrics.metrics2;
++
++import com.codahale.metrics.ConsoleReporter;
++import com.codahale.metrics.Counter;
++import com.codahale.metrics.ExponentiallyDecayingReservoir;
++import com.codahale.metrics.JmxReporter;
++import com.codahale.metrics.Metric;
++import com.codahale.metrics.MetricRegistry;
++import com.codahale.metrics.MetricSet;
++import com.codahale.metrics.Timer;
++import com.codahale.metrics.json.MetricsModule;
++import com.codahale.metrics.jvm.BufferPoolMetricSet;
++import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
++import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
++import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
++import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
++import com.fasterxml.jackson.databind.ObjectMapper;
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Splitter;
++import com.google.common.cache.CacheBuilder;
++import com.google.common.cache.CacheLoader;
++import com.google.common.cache.LoadingCache;
++import com.google.common.collect.Lists;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.hive.conf.HiveConf;
++
++import java.io.BufferedReader;
++import java.io.BufferedWriter;
++import java.io.Closeable;
++import java.io.IOException;
++import java.io.OutputStreamWriter;
++import java.lang.management.ManagementFactory;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.TimerTask;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.locks.Lock;
++import java.util.concurrent.locks.ReentrantLock;
++
++/**
++ * Codahale-backed Metrics implementation.
++ */
++public class CodahaleMetrics implements 
org.apache.hadoop.hive.common.metrics.common.Metrics {
++  public static final String API_PREFIX = "api_";
++  public static final Log LOGGER = LogFactory.getLog(CodahaleMetrics.class);
++
++  public final MetricRegistry metricRegistry = new MetricRegistry();
++  private final Lock timersLock = new ReentrantLock();
++  private final Lock countersLock = new ReentrantLock();
++
++  private LoadingCache<String, Timer> timers;
++  private LoadingCache<String, Counter> counters;
++
++  private boolean initialized = false;
++  private HiveConf conf;
++  private final Set<Closeable> reporters = new HashSet<Closeable>();
++
++  private final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
++    = new ThreadLocal<HashMap<String,MetricsScope>>() {
++    @Override
++    protected HashMap<String,MetricsScope> initialValue() {
++      return new HashMap<String,MetricsScope>();
++    }
++  };
++
++  public static class MetricsScope {
++
++    final String name;
++    final Timer timer;
++    Timer.Context timerContext;
++    CodahaleMetrics metrics;
++
++    private boolean isOpen = false;
++
++    /**
++     * Instantiates a named scope - intended to only be called by Metrics, so 
locally scoped.
++     * @param name - name of the variable
++     * @throws IOException
++     */
++    private MetricsScope(String name, CodahaleMetrics metrics) throws 
IOException {
++      this.name = name;
++      this.metrics = metrics;
++      this.timer = metrics.getTimer(name);
++      open();
++    }
++
++    /**
++     * Opens scope, and makes note of the time started, increments run counter
++     * @throws IOException
++     *
++     */
++    public void open() throws IOException {
++      if (!isOpen) {
++        isOpen = true;
++        this.timerContext = timer.time();
++      } else {
++        throw new IOException("Scope named " + name + " is not closed, cannot 
be opened.");
++      }
++    }
++
++    /**
++     * Closes scope, and records the time taken
++     * @throws IOException
++     */
++    public void close() throws IOException {
++      if (isOpen) {
++        timerContext.close();
++
++      } else {
++        throw new IOException("Scope named " + name + " is not open, cannot 
be closed.");
++      }
++      isOpen = false;
++    }
++  }
++
++  public synchronized void init(HiveConf conf) throws Exception {
++    if (initialized) {
++      return;
++    }
++
++    this.conf = conf;
++    //Codahale artifacts are lazily-created.
++    timers = CacheBuilder.newBuilder().build(
++      new CacheLoader<String, com.codahale.metrics.Timer>() {
++        @Override
++        public com.codahale.metrics.Timer load(String key) throws Exception {
++          Timer timer = new Timer(new ExponentiallyDecayingReservoir());
++          metricRegistry.register(key, timer);
++          return timer;
++        }
++      }
++    );
++    counters = CacheBuilder.newBuilder().build(
++      new CacheLoader<String, Counter>() {
++        @Override
++        public Counter load(String key) throws Exception {
++          Counter counter = new Counter();
++          metricRegistry.register(key, counter);
++          return counter;
++        }
++      }
++    );
++
++    //register JVM metrics
++    registerAll("gc", new GarbageCollectorMetricSet());
++    registerAll("buffers", new 
BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
++    registerAll("memory", new MemoryUsageGaugeSet());
++    registerAll("threads", new ThreadStatesGaugeSet());
++    registerAll("classLoading", new ClassLoadingGaugeSet());
++
++    //Metrics reporter
++    Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>();
++    List<String> metricsReporterNames = Lists.newArrayList(
++      
Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER)));
++
++    if(metricsReporterNames != null) {
++      for (String metricsReportingName : metricsReporterNames) {
++        try {
++          MetricsReporting reporter = 
MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
++          finalReporterList.add(reporter);
++        } catch (IllegalArgumentException e) {
++          LOGGER.warn("Metrics reporter skipped due to invalid configured 
reporter: " + metricsReportingName);
++        }
++      }
++    }
++    initReporting(finalReporterList);
++    initialized = true;
++  }
++
++
++  public synchronized void deInit() throws Exception {
++    if (initialized) {
++      if (reporters != null) {
++        for (Closeable reporter : reporters) {
++          reporter.close();
++        }
++      }
++      for (Map.Entry<String, Metric> metric : 
metricRegistry.getMetrics().entrySet()) {
++        metricRegistry.remove(metric.getKey());
++      }
++      timers.invalidateAll();
++      counters.invalidateAll();
++      initialized = false;
++    }
++  }
++
++  public void startScope(String name) throws IOException {
++    synchronized (this) {
++      if (!initialized) {
++        return;
++      }
++    }
++    name = API_PREFIX + name;
++    if (threadLocalScopes.get().containsKey(name)) {
++      threadLocalScopes.get().get(name).open();
++    } else {
++      threadLocalScopes.get().put(name, new MetricsScope(name, this));
++    }
++  }
++
++  public void endScope(String name) throws IOException{
++    synchronized (this) {
++      if (!initialized) {
++        return;
++      }
++    }
++    name = API_PREFIX + name;
++    if (threadLocalScopes.get().containsKey(name)) {
++      threadLocalScopes.get().get(name).close();
++    }
++  }
++
++  public Long incrementCounter(String name) throws IOException {
++    return incrementCounter(name, 1);
++  }
++
++  public Long incrementCounter(String name, long increment) throws 
IOException {
++    String key = name;
++    try {
++      countersLock.lock();
++      counters.get(key).inc(increment);
++      return counters.get(key).getCount();
++    } catch(ExecutionException ee) {
++      throw new RuntimeException(ee);
++    } finally {
++      countersLock.unlock();
++    }
++  }
++
++  // This method is necessary to synchronize lazy-creation to the timers.
++  private Timer getTimer(String name) throws IOException {
++    String key = name;
++    try {
++      timersLock.lock();
++      Timer timer = timers.get(key);
++      return timer;
++    } catch (ExecutionException e) {
++      throw new IOException(e);
++    } finally {
++      timersLock.unlock();
++    }
++  }
++
++  private void registerAll(String prefix, MetricSet metricSet) {
++    for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) 
{
++      if (entry.getValue() instanceof MetricSet) {
++        registerAll(prefix + "." + entry.getKey(), (MetricSet) 
entry.getValue());
++      } else {
++        metricRegistry.register(prefix + "." + entry.getKey(), 
entry.getValue());
++      }
++    }
++  }
++
++  @VisibleForTesting
++  public MetricRegistry getMetricRegistry() {
++    return metricRegistry;
++  }
++
++  /**
++   * Should be only called once to initialize the reporters
++   */
++  private void initReporting(Set<MetricsReporting> reportingSet) throws 
Exception {
++    for (MetricsReporting reporting : reportingSet) {
++      switch(reporting) {
++        case CONSOLE:
++          final ConsoleReporter consoleReporter = 
ConsoleReporter.forRegistry(metricRegistry)
++            .convertRatesTo(TimeUnit.SECONDS)
++            .convertDurationsTo(TimeUnit.MILLISECONDS)
++            .build();
++          consoleReporter.start(1, TimeUnit.SECONDS);
++          reporters.add(consoleReporter);
++          break;
++        case JMX:
++          final JmxReporter jmxReporter = 
JmxReporter.forRegistry(metricRegistry)
++            .convertRatesTo(TimeUnit.SECONDS)
++            .convertDurationsTo(TimeUnit.MILLISECONDS)
++            .build();
++          jmxReporter.start();
++          reporters.add(jmxReporter);
++          break;
++        case JSON_FILE:
++          final JsonFileReporter jsonFileReporter = new JsonFileReporter();
++          jsonFileReporter.start();
++          reporters.add(jsonFileReporter);
++          break;
++      }
++    }
++  }
++
++  class JsonFileReporter implements Closeable {
++    private ObjectMapper jsonMapper = null;
++    private java.util.Timer timer = null;
++
++    public void start() {
++      this.jsonMapper = new ObjectMapper().registerModule(new 
MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false));
++      this.timer = new java.util.Timer(true);
++
++      long time = 
conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, 
TimeUnit.MILLISECONDS);
++      final String pathString = 
conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION);
++
++      timer.schedule(new TimerTask() {
++        @Override
++        public void run() {
++          BufferedWriter bw = null;
++          try {
++            String json = 
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
++            Path tmpPath = new Path(pathString + ".tmp");
++            FileSystem fs = FileSystem.get(conf);
++            fs.delete(tmpPath, true);
++            bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, 
true)));
++            bw.write(json);
++            bw.close();
++
++            Path path = new Path(pathString);
++            fs.rename(tmpPath, path);
++            fs.setPermission(path, FsPermission.createImmutable((short) 
0644));
++          } catch (Exception e) {
++            LOGGER.warn("Error writing JSON Metrics to file", e);
++          } finally {
++            try {
++              if (bw != null) {
++                bw.close();
++              }
++            } catch (IOException e) {
++              //Ignore.
++            }
++          }
++
++
++        }
++      }, 0, time);
++    }
++
++    public void close() {
++      if (timer != null) {
++        this.timer.cancel();
++      }
++    }
++  }
++}
+diff --git 
a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
 
b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
+new file mode 100644
+index 0000000..643246f
+--- /dev/null
++++ 
b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
+@@ -0,0 +1,27 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common.metrics.metrics2;
++
++/**
++ * Reporting options for 
org.apache.hadoop.hive.common.metrics.metrics2.Metrics.
++ */
++public enum MetricsReporting {
++  JMX,
++  CONSOLE,
++  JSON_FILE
++}
+diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+index 49b8f97..55a79a9 100644
+--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
++++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+@@ -645,6 +645,7 @@ public void setSparkConfigUpdated(boolean 
isSparkConfigUpdated) {
+         "Maximum cache full % after which the cache cleaner thread kicks 
in."),
+     
METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until",
 (float) 0.8,
+         "The cleaner thread cleans until cache reaches this % full size."),
++    METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable 
metrics on the metastore."),
+ 
+     // Parameters for exporting metadata on table drop (requires the use of 
the)
+     // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent 
listener
+@@ -1688,6 +1689,7 @@ public void setSparkConfigUpdated(boolean 
isSparkConfigUpdated) {
+         "  EXECUTION: Log completion of tasks\n" +
+         "  PERFORMANCE: Execution + Performance logs \n" +
+         "  VERBOSE: All logs" ),
++    HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, 
"Enable metrics on the HiveServer2."),
+     // logging configuration
+     HIVE_LOG4J_FILE("hive.log4j.file", "",
+         "Hive log4j configuration file.\n" +
+@@ -1715,7 +1717,21 @@ public void setSparkConfigUpdated(boolean 
isSparkConfigUpdated) {
+     HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME(
+         "hive.autogen.columnalias.prefix.includefuncname", false,
+         "Whether to include function name in the column alias auto generated 
by Hive."),
+-
++    HIVE_METRICS_CLASS("hive.service.metrics.class",
++        "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics",
++        new StringSet(
++            "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics",
++            "org.apache.hadoop.hive.common.metrics.LegacyMetrics"),
++        "Hive metrics subsystem implementation class."),
++    HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
++        "Reporter type for metric class 
org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated 
list of JMX, CONSOLE, JSON_FILE"),
++    HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", 
"file:///tmp/my-logging.properties",
++        "For metric class 
org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE 
reporter, the location of JSON metrics file.  " +
++        "This file will get overwritten at every interval."),
++    HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", 
"5s",
++        new TimeValidator(TimeUnit.MILLISECONDS),
++        "For metric class 
org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE 
reporter, " +
++        "the frequency of updating JSON metrics file."),
+     HIVE_PERF_LOGGER("hive.exec.perf.logger", 
"org.apache.hadoop.hive.ql.log.PerfLogger",
+         "The class responsible for logging client side performance metrics. 
\n" +
+         "Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"),
+diff --git 
a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java 
b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
+new file mode 100644
+index 0000000..c14c7ee
+--- /dev/null
++++ 
b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
+@@ -0,0 +1,295 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common.metrics;
++
++import java.io.IOException;
++import java.lang.management.ManagementFactory;
++import java.util.concurrent.Callable;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.TimeUnit;
++
++import javax.management.Attribute;
++import javax.management.MBeanAttributeInfo;
++import javax.management.MBeanInfo;
++import javax.management.MBeanOperationInfo;
++import javax.management.MBeanServer;
++import javax.management.ObjectName;
++
++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
++import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import static org.junit.Assert.*;
++
++public class TestLegacyMetrics {
++
++  private static final String scopeName = "foo";
++  private static final long periodMs = 50L;
++  private static LegacyMetrics metrics;
++
++  @Before
++  public void before() throws Exception {
++    MetricsFactory.deInit();
++    HiveConf conf = new HiveConf();
++    conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, 
LegacyMetrics.class.getCanonicalName());
++    MetricsFactory.init(conf);
++    metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance();
++  }
++
++  @After
++  public void after() throws Exception {
++    MetricsFactory.deInit();
++  }
++
++  @Test
++  public void testMetricsMBean() throws Exception {
++    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
++    final ObjectName oname = new ObjectName(
++        "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
++    MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
++    // check implementation class:
++    assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
++
++    // check reset operation:
++    MBeanOperationInfo[] oops = mBeanInfo.getOperations();
++    boolean resetFound = false;
++    for (MBeanOperationInfo op : oops) {
++      if ("reset".equals(op.getName())) {
++        resetFound = true;
++        break;
++      }
++    }
++    assertTrue(resetFound);
++
++    // add metric with a non-null value:
++    Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
++    mbs.setAttribute(oname, attr);
++
++    mBeanInfo = mbs.getMBeanInfo(oname);
++    MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
++    assertEquals(1, attrinuteInfos.length);
++    boolean attrFound = false;
++    for (MBeanAttributeInfo info : attrinuteInfos) {
++      if ("fooMetric".equals(info.getName())) {
++        assertEquals("java.lang.Long", info.getType());
++        assertTrue(info.isReadable());
++        assertTrue(info.isWritable());
++        assertFalse(info.isIs());
++
++        attrFound = true;
++        break;
++      }
++    }
++    assertTrue(attrFound);
++
++    // check metric value:
++    Object v = mbs.getAttribute(oname, "fooMetric");
++    assertEquals(Long.valueOf(-77), v);
++
++    // reset the bean:
++    Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
++    assertNull(result);
++
++    // the metric value must be zeroed:
++    v = mbs.getAttribute(oname, "fooMetric");
++    assertEquals(Long.valueOf(0), v);
++  }
++
++  private <T> void expectIOE(Callable<T> c) throws Exception {
++    try {
++      T t = c.call();
++      fail("IOE expected but ["+t+"] was returned.");
++    } catch (IOException ioe) {
++      // ok, expected
++    }
++  }
++
++  @Test
++  public void testScopeSingleThread() throws Exception {
++    metrics.startScope(scopeName);
++    final MetricsScope fooScope = metrics.getScope(scopeName);
++    // the time and number counters become available only after the 1st
++    // scope close:
++    expectIOE(new Callable<Long>() {
++      @Override
++      public Long call() throws Exception {
++        Long num = fooScope.getNumCounter();
++        return num;
++      }
++    });
++    expectIOE(new Callable<Long>() {
++      @Override
++      public Long call() throws Exception {
++        Long time = fooScope.getTimeCounter();
++        return time;
++      }
++    });
++    // cannot open scope that is already open:
++    expectIOE(new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        fooScope.open();
++        return null;
++      }
++    });
++
++    assertSame(fooScope, metrics.getScope(scopeName));
++    Thread.sleep(periodMs+ 1);
++    // 1st close:
++    // closing of open scope should be ok:
++    metrics.endScope(scopeName);
++    expectIOE(new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        metrics.endScope(scopeName); // closing of closed scope not allowed
++        return null;
++      }
++    });
++
++    assertEquals(Long.valueOf(1), fooScope.getNumCounter());
++    final long t1 = fooScope.getTimeCounter().longValue();
++    assertTrue(t1 > periodMs);
++
++    assertSame(fooScope, metrics.getScope(scopeName));
++
++   // opening allowed after closing:
++    metrics.startScope(scopeName);
++    // opening of already open scope not allowed:
++    expectIOE(new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        metrics.startScope(scopeName);
++        return null;
++      }
++    });
++
++    assertEquals(Long.valueOf(1), fooScope.getNumCounter());
++    assertEquals(t1, fooScope.getTimeCounter().longValue());
++
++    assertSame(fooScope, metrics.getScope(scopeName));
++    Thread.sleep(periodMs + 1);
++    // Reopening (close + open) allowed in opened state:
++    fooScope.reopen();
++
++    assertEquals(Long.valueOf(2), fooScope.getNumCounter());
++    assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
++
++    Thread.sleep(periodMs + 1);
++    // 3rd close:
++    fooScope.close();
++
++    assertEquals(Long.valueOf(3), fooScope.getNumCounter());
++    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
++    Double avgT = (Double) metrics.get("foo.avg_t");
++    assertTrue(avgT.doubleValue() > periodMs);
++  }
++
++  @Test
++  public void testScopeConcurrency() throws Exception {
++    metrics.startScope(scopeName);
++    MetricsScope fooScope = metrics.getScope(scopeName);
++    final int threads = 10;
++    ExecutorService executorService = Executors.newFixedThreadPool(threads);
++    for (int i=0; i<threads; i++) {
++      final int n = i;
++      executorService.submit(new Callable<Void>() {
++        @Override
++        public Void call() throws Exception {
++          testScopeImpl(n);
++          return null;
++        }
++      });
++    }
++    executorService.shutdown();
++    assertTrue(executorService.awaitTermination(periodMs * 3 * threads, 
TimeUnit.MILLISECONDS));
++
++    fooScope = metrics.getScope(scopeName);
++    assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
++    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * 
threads);
++    Double avgT = (Double) metrics.get("foo.avg_t");
++    assertTrue(avgT.doubleValue() > periodMs);
++    metrics.endScope(scopeName);
++  }
++
++  void testScopeImpl(int n) throws Exception {
++    metrics.startScope(scopeName);
++    final MetricsScope fooScope = metrics.getScope(scopeName);
++      // cannot open scope that is already open:
++    expectIOE(new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        fooScope.open();
++        return null;
++      }
++    });
++
++    assertSame(fooScope, metrics.getScope(scopeName));
++    Thread.sleep(periodMs+ 1);
++    // 1st close:
++    metrics.endScope(scopeName); // closing of open scope should be ok.
++
++    assertTrue(fooScope.getNumCounter().longValue() >= 1);
++    final long t1 = fooScope.getTimeCounter().longValue();
++    assertTrue(t1 > periodMs);
++
++    expectIOE(new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        metrics.endScope(scopeName); // closing of closed scope not allowed
++        return null;
++      }
++    });
++
++    assertSame(fooScope, metrics.getScope(scopeName));
++
++   // opening allowed after closing:
++    metrics.startScope(scopeName);
++
++    assertTrue(fooScope.getNumCounter().longValue() >= 1);
++    assertTrue(fooScope.getTimeCounter().longValue() >= t1);
++
++   // opening of already open scope not allowed:
++    expectIOE(new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        metrics.startScope(scopeName);
++        return null;
++      }
++    });
++
++    assertSame(fooScope, metrics.getScope(scopeName));
++    Thread.sleep(periodMs + 1);
++    // Reopening (close + open) allowed in opened state:
++    fooScope.reopen();
++
++    assertTrue(fooScope.getNumCounter().longValue() >= 2);
++    assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
++
++    Thread.sleep(periodMs + 1);
++    // 3rd close:
++    fooScope.close();
++
++    assertTrue(fooScope.getNumCounter().longValue() >= 3);
++    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
++    Double avgT = (Double) metrics.get("foo.avg_t");
++    assertTrue(avgT.doubleValue() > periodMs);
++  }
++}
+diff --git 
a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java 
b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
+deleted file mode 100644
+index e85d3f8..0000000
+--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
++++ /dev/null
+@@ -1,286 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.hadoop.hive.common.metrics;
+-
+-import java.io.IOException;
+-import java.lang.management.ManagementFactory;
+-import java.util.concurrent.Callable;
+-import java.util.concurrent.ExecutorService;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.TimeUnit;
+-
+-import javax.management.Attribute;
+-import javax.management.MBeanAttributeInfo;
+-import javax.management.MBeanInfo;
+-import javax.management.MBeanOperationInfo;
+-import javax.management.MBeanServer;
+-import javax.management.ObjectName;
+-
+-import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope;
+-import org.junit.After;
+-import org.junit.Before;
+-import org.junit.Test;
+-import static org.junit.Assert.*;
+-
+-public class TestMetrics {
+-
+-  private static final String scopeName = "foo";
+-  private static final long periodMs = 50L;
+-
+-  @Before
+-  public void before() throws Exception {
+-    Metrics.uninit();
+-    Metrics.init();
+-  }
+-  
+-  @After
+-  public void after() throws Exception {
+-    Metrics.uninit();
+-  }
+-  
+-  @Test
+-  public void testMetricsMBean() throws Exception {
+-    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+-    final ObjectName oname = new ObjectName(
+-        "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+-    MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
+-    // check implementation class:
+-    assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
+-
+-    // check reset operation:
+-    MBeanOperationInfo[] oops = mBeanInfo.getOperations();
+-    boolean resetFound = false;
+-    for (MBeanOperationInfo op : oops) {
+-      if ("reset".equals(op.getName())) {
+-        resetFound = true;
+-        break;
+-      }
+-    }
+-    assertTrue(resetFound);
+-
+-    // add metric with a non-null value:
+-    Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
+-    mbs.setAttribute(oname, attr);
+-
+-    mBeanInfo = mbs.getMBeanInfo(oname);
+-    MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
+-    assertEquals(1, attrinuteInfos.length);
+-    boolean attrFound = false;
+-    for (MBeanAttributeInfo info : attrinuteInfos) {
+-      if ("fooMetric".equals(info.getName())) {
+-        assertEquals("java.lang.Long", info.getType());
+-        assertTrue(info.isReadable());
+-        assertTrue(info.isWritable());
+-        assertFalse(info.isIs());
+-
+-        attrFound = true;
+-        break;
+-      }
+-    }
+-    assertTrue(attrFound);
+-
+-    // check metric value:
+-    Object v = mbs.getAttribute(oname, "fooMetric");
+-    assertEquals(Long.valueOf(-77), v);
+-
+-    // reset the bean:
+-    Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
+-    assertNull(result);
+-
+-    // the metric value must be zeroed:
+-    v = mbs.getAttribute(oname, "fooMetric");
+-    assertEquals(Long.valueOf(0), v);
+-  }
+-  
+-  private <T> void expectIOE(Callable<T> c) throws Exception {
+-    try {
+-      T t = c.call();
+-      fail("IOE expected but ["+t+"] was returned.");
+-    } catch (IOException ioe) {
+-      // ok, expected
+-    } 
+-  }
+-
+-  @Test
+-  public void testScopeSingleThread() throws Exception {
+-    final MetricsScope fooScope = Metrics.startScope(scopeName);
+-    // the time and number counters become available only after the 1st 
+-    // scope close:
+-    expectIOE(new Callable<Long>() {
+-      @Override
+-      public Long call() throws Exception {
+-        Long num = fooScope.getNumCounter();
+-        return num;
+-      }
+-    });
+-    expectIOE(new Callable<Long>() {
+-      @Override
+-      public Long call() throws Exception {
+-        Long time = fooScope.getTimeCounter();
+-        return time;
+-      }
+-    });
+-    // cannot open scope that is already open:
+-    expectIOE(new Callable<Void>() {
+-      @Override
+-      public Void call() throws Exception {
+-        fooScope.open();
+-        return null;
+-      }
+-    });
+-    
+-    assertSame(fooScope, Metrics.getScope(scopeName));
+-    Thread.sleep(periodMs+1);
+-    // 1st close:
+-    // closing of open scope should be ok:
+-    Metrics.endScope(scopeName); 
+-    expectIOE(new Callable<Void>() {
+-      @Override
+-      public Void call() throws Exception {
+-        Metrics.endScope(scopeName); // closing of closed scope not allowed
+-        return null;
+-      }
+-    });
+-    
+-    assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+-    final long t1 = fooScope.getTimeCounter().longValue(); 
+-    assertTrue(t1 > periodMs);
+-    
+-    assertSame(fooScope, Metrics.getScope(scopeName));
+-    
+-   // opening allowed after closing:
+-    Metrics.startScope(scopeName);
+-    // opening of already open scope not allowed:
+-    expectIOE(new Callable<Void>() {
+-      @Override
+-      public Void call() throws Exception {
+-        Metrics.startScope(scopeName); 
+-        return null;
+-      }
+-    });
+-    
+-    assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+-    assertEquals(t1, fooScope.getTimeCounter().longValue());
+-    
+-    assertSame(fooScope, Metrics.getScope(scopeName));
+-    Thread.sleep(periodMs + 1);
+-    // Reopening (close + open) allowed in opened state: 
+-    fooScope.reopen();
+-
+-    assertEquals(Long.valueOf(2), fooScope.getNumCounter());
+-    assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+-    
+-    Thread.sleep(periodMs + 1);
+-    // 3rd close:
+-    fooScope.close();
+-    
+-    assertEquals(Long.valueOf(3), fooScope.getNumCounter());
+-    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+-    Double avgT = (Double)Metrics.get("foo.avg_t");
+-    assertTrue(avgT.doubleValue() > periodMs);
+-  }
+-  
+-  @Test
+-  public void testScopeConcurrency() throws Exception {
+-    MetricsScope fooScope = Metrics.startScope(scopeName);
+-    final int threads = 10;
+-    ExecutorService executorService = Executors.newFixedThreadPool(threads);
+-    for (int i=0; i<threads; i++) {
+-      final int n = i;
+-      executorService.submit(new Callable<Void>() {
+-        @Override
+-        public Void call() throws Exception {
+-          testScopeImpl(n);
+-          return null; 
+-        }
+-      });
+-    }
+-    executorService.shutdown();
+-    assertTrue(executorService.awaitTermination(periodMs * 3 * threads, 
TimeUnit.MILLISECONDS));
+-
+-    fooScope = Metrics.getScope(scopeName);
+-    assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
+-    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * 
threads);
+-    Double avgT = (Double)Metrics.get("foo.avg_t");
+-    assertTrue(avgT.doubleValue() > periodMs);
+-    Metrics.endScope(scopeName);
+-  }
+-  
+-  void testScopeImpl(int n) throws Exception {
+-    final MetricsScope fooScope = Metrics.startScope(scopeName);
+-    // cannot open scope that is already open:
+-    expectIOE(new Callable<Void>() {
+-      @Override
+-      public Void call() throws Exception {
+-        fooScope.open();
+-        return null;
+-      }
+-    });
+-    
+-    assertSame(fooScope, Metrics.getScope(scopeName));
+-    Thread.sleep(periodMs+1);
+-    // 1st close:
+-    Metrics.endScope(scopeName); // closing of open scope should be ok.
+-    
+-    assertTrue(fooScope.getNumCounter().longValue() >= 1);
+-    final long t1 = fooScope.getTimeCounter().longValue(); 
+-    assertTrue(t1 > periodMs);
+-    
+-    expectIOE(new Callable<Void>() {
+-      @Override
+-      public Void call() throws Exception {
+-        Metrics.endScope(scopeName); // closing of closed scope not allowed
+-        return null;
+-      }
+-    });
+-    
+-    assertSame(fooScope, Metrics.getScope(scopeName));
+-    
+-   // opening allowed after closing:
+-    Metrics.startScope(scopeName);
+-    
+-    assertTrue(fooScope.getNumCounter().longValue() >= 1);
+-    assertTrue(fooScope.getTimeCounter().longValue() >= t1);
+-    
+-   // opening of already open scope not allowed:
+-    expectIOE(new Callable<Void>() {
+-      @Override
+-      public Void call() throws Exception {
+-        Metrics.startScope(scopeName); 
+-        return null;
+-      }
+-    });
+-    
+-    assertSame(fooScope, Metrics.getScope(scopeName));
+-    Thread.sleep(periodMs + 1);
+-    // Reopening (close + open) allowed in opened state: 
+-    fooScope.reopen();
+-
+-    assertTrue(fooScope.getNumCounter().longValue() >= 2);
+-    assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+-    
+-    Thread.sleep(periodMs + 1);
+-    // 3rd close:
+-    fooScope.close();
+-    
+-    assertTrue(fooScope.getNumCounter().longValue() >= 3);
+-    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+-    Double avgT = (Double)Metrics.get("foo.avg_t");
+-    assertTrue(avgT.doubleValue() > periodMs);
+-  }
+-}
+diff --git 
a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
 
b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
+new file mode 100644
+index 0000000..8749349
+--- /dev/null
++++ 
b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
+@@ -0,0 +1,138 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.common.metrics.metrics2;
++
++import com.codahale.metrics.Counter;
++import com.codahale.metrics.MetricRegistry;
++import com.codahale.metrics.Timer;
++import com.fasterxml.jackson.databind.JsonNode;
++import com.fasterxml.jackson.databind.ObjectMapper;
++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.shims.ShimLoader;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++
++import java.io.File;
++import java.nio.file.Files;
++import java.nio.file.Paths;
++import java.util.concurrent.Callable;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.TimeUnit;
++
++import static org.junit.Assert.assertTrue;
++
++/**
++ * Unit test for new Metrics subsystem.
++ */
++public class TestCodahaleMetrics {
++
++  private static File workDir = new File(System.getProperty("test.tmp.dir"));
++  private static File jsonReportFile;
++  public static MetricRegistry metricRegistry;
++
++  @Before
++  public void before() throws Exception {
++    HiveConf conf = new HiveConf();
++
++    jsonReportFile = new File(workDir, "json_reporting");
++    jsonReportFile.delete();
++    String defaultFsName = 
ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS");
++    conf.set(defaultFsName, "local");
++    conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, 
CodahaleMetrics.class.getCanonicalName());
++    conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, 
MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
++    conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, 
jsonReportFile.toString());
++    conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
++
++    MetricsFactory.init(conf);
++    metricRegistry = ((CodahaleMetrics) 
MetricsFactory.getMetricsInstance()).getMetricRegistry();
++  }
++
++  @After
++  public void after() throws Exception {
++    MetricsFactory.deInit();
++  }
++
++  @Test
++  public void testScope() throws Exception {
++    int runs = 5;
++    for (int i = 0; i < runs; i++) {
++      MetricsFactory.getMetricsInstance().startScope("method1");
++      MetricsFactory.getMetricsInstance().endScope("method1");
++    }
++
++    Timer timer = metricRegistry.getTimers().get("api_method1");
++    Assert.assertEquals(5, timer.getCount());
++    Assert.assertTrue(timer.getMeanRate() > 0);
++  }
++
++
++  @Test
++  public void testCount() throws Exception {
++    int runs = 5;
++    for (int i = 0; i < runs; i++) {
++      MetricsFactory.getMetricsInstance().incrementCounter("count1");
++    }
++    Counter counter = metricRegistry.getCounters().get("count1");
++    Assert.assertEquals(5L, counter.getCount());
++  }
++
++  @Test
++  public void testConcurrency() throws Exception {
++    int threads = 4;
++    ExecutorService executorService = Executors.newFixedThreadPool(threads);
++    for (int i=0; i< threads; i++) {
++      final int n = i;
++      executorService.submit(new Callable<Void>() {
++        @Override
++        public Void call() throws Exception {
++          MetricsFactory.getMetricsInstance().startScope("method2");
++          MetricsFactory.getMetricsInstance().endScope("method2");
++          return null;
++        }
++      });
++    }
++    executorService.shutdown();
++    assertTrue(executorService.awaitTermination(10000, 
TimeUnit.MILLISECONDS));
++    Timer timer = metricRegistry.getTimers().get("api_method2");
++    Assert.assertEquals(4, timer.getCount());
++    Assert.assertTrue(timer.getMeanRate() > 0);
++  }
++
++  @Test
++  public void testFileReporting() throws Exception {
++    int runs = 5;
++    for (int i = 0; i < runs; i++) {
++      MetricsFactory.getMetricsInstance().incrementCounter("count2");
++      Thread.sleep(100);
++    }
++
++    Thread.sleep(2000);
++    byte[] jsonData = 
Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
++    ObjectMapper objectMapper = new ObjectMapper();
++
++    JsonNode rootNode = objectMapper.readTree(jsonData);
++    JsonNode countersNode = rootNode.path("counters");
++    JsonNode methodCounterNode = countersNode.path("count2");
++    JsonNode countNode = methodCounterNode.path("count");
++    Assert.assertEquals(countNode.asInt(), 5);
++  }
++}
+diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
+new file mode 100644
+index 0000000..25f34d1
+--- /dev/null
++++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
+@@ -0,0 +1,94 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.metastore;
++
++import com.fasterxml.jackson.databind.JsonNode;
++import com.fasterxml.jackson.databind.ObjectMapper;
++import junit.framework.TestCase;
++import org.apache.hadoop.hive.cli.CliSessionState;
++import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.ql.Driver;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.apache.hadoop.hive.shims.ShimLoader;
++import org.junit.After;
++import org.junit.AfterClass;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.IOException;
++import java.nio.file.Files;
++import java.nio.file.Paths;
++
++/**
++ * Tests Hive Metastore Metrics.
++ */
++public class TestMetaStoreMetrics {
++
++  private static File workDir = new File(System.getProperty("test.tmp.dir"));
++  private static File jsonReportFile;
++
++  private static HiveConf hiveConf;
++  private static Driver driver;
++
++
++  @Before
++  public void before() throws Exception {
++
++    int port = MetaStoreUtils.findFreePort();
++
++    jsonReportFile = new File(workDir, "json_reporting");
++    jsonReportFile.delete();
++
++    hiveConf = new HiveConf(TestMetaStoreMetrics.class);
++    hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + 
port);
++    hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
++    hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true);
++    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
++    hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, 
MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
++    hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, 
jsonReportFile.toString());
++    hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, 
"100ms");
++
++    MetaStoreUtils.startMetaStore(port, 
ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
++
++    SessionState.start(new CliSessionState(hiveConf));
++    driver = new Driver(hiveConf);
++  }
++
++  @Test
++  public void testMetricsFile() throws Exception {
++    driver.run("show databases");
++
++    //give timer thread a chance to print the metrics
++    Thread.sleep(2000);
++
++    //As the file is being written, try a few times.
++    //This can be replaced by CodahaleMetrics's JsonServlet reporter once it 
is exposed.
++    byte[] jsonData = 
Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
++    ObjectMapper objectMapper = new ObjectMapper();
++
++    JsonNode rootNode = objectMapper.readTree(jsonData);
++    JsonNode countersNode = rootNode.path("timers");
++    JsonNode methodCounterNode = countersNode.path("api_get_all_databases");
++    JsonNode countNode = methodCounterNode.path("count");
++    Assert.assertTrue(countNode.asInt() > 0);
++  }
++}
+diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+index d81c856..1688920 100644
+--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
++++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+@@ -18,39 +18,14 @@
+ 
+ package org.apache.hadoop.hive.metastore;
+ 
+-import static org.apache.commons.lang.StringUtils.join;
+-import static 
org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT;
+-import static 
org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
+-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName;
+-
+-import java.io.IOException;
+-import java.text.DateFormat;
+-import java.text.SimpleDateFormat;
+-import java.util.AbstractMap;
+-import java.util.ArrayList;
+-import java.util.Arrays;
+-import java.util.Collections;
+-import java.util.Formatter;
+-import java.util.HashMap;
+-import java.util.HashSet;
+-import java.util.Iterator;
+-import java.util.LinkedHashMap;
+-import java.util.LinkedList;
+-import java.util.List;
+-import java.util.Map;
+-import java.util.Map.Entry;
+-import java.util.Properties;
+-import java.util.Set;
+-import java.util.Timer;
+-import java.util.concurrent.TimeUnit;
+-import java.util.concurrent.atomic.AtomicBoolean;
+-import java.util.concurrent.locks.Condition;
+-import java.util.concurrent.locks.Lock;
+-import java.util.concurrent.locks.ReentrantLock;
+-import java.util.regex.Pattern;
+-
+-import javax.jdo.JDOException;
+-
++import com.facebook.fb303.FacebookBase;
++import com.facebook.fb303.fb_status;
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Splitter;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.ImmutableListMultimap;
++import com.google.common.collect.Lists;
++import com.google.common.collect.Multimaps;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+@@ -58,12 +33,13 @@
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.common.FileUtils;
++import org.apache.hadoop.hive.common.JvmPauseMonitor;
+ import org.apache.hadoop.hive.common.LogUtils;
+ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
+ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+ import org.apache.hadoop.hive.common.classification.InterfaceStability;
+ import org.apache.hadoop.hive.common.cli.CommonCliOptions;
+-import org.apache.hadoop.hive.common.metrics.Metrics;
++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+@@ -221,14 +197,35 @@
+ import org.apache.thrift.transport.TTransport;
+ import org.apache.thrift.transport.TTransportFactory;
+ 
+-import com.facebook.fb303.FacebookBase;
+-import com.facebook.fb303.fb_status;
+-import com.google.common.annotations.VisibleForTesting;
+-import com.google.common.base.Splitter;
+-import com.google.common.collect.ImmutableList;
+-import com.google.common.collect.ImmutableListMultimap;
+-import com.google.common.collect.Lists;
+-import com.google.common.collect.Multimaps;
++import javax.jdo.JDOException;
++import java.io.IOException;
++import java.text.DateFormat;
++import java.text.SimpleDateFormat;
++import java.util.AbstractMap;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.Formatter;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.LinkedHashMap;
++import java.util.LinkedList;
++import java.util.List;
++import java.util.Map;
++import java.util.Map.Entry;
++import java.util.Properties;
++import java.util.Set;
++import java.util.Timer;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.locks.Condition;
++import java.util.concurrent.locks.Lock;
++import java.util.concurrent.locks.ReentrantLock;
++import java.util.regex.Pattern;
++
++import static org.apache.commons.lang.StringUtils.join;
++import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*;
+ 
+ /**
+  * TODO:pc remove application logic to a separate interface.
+@@ -464,9 +461,10 @@ public void init() throws MetaException {
+         }
+       }
+ 
+-      if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
++      //Start Metrics for Embedded mode
++      if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+         try {
+-          Metrics.init();
++          MetricsFactory.init(hiveConf);
+         } catch (Exception e) {
+           // log exception, but ignore inability to start
+           LOG.error("error in Metrics init: " + e.getClass().getName() + " "
+@@ -750,11 +748,13 @@ private String startFunction(String function, String 
extraLogInfo) {
+       incrementCounter(function);
+       logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " 
") +
+           function + extraLogInfo);
+-      try {
+-        Metrics.startScope(function);
+-      } catch (IOException e) {
+-        LOG.debug("Exception when starting metrics scope"
++      if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
++        try {
++          MetricsFactory.getMetricsInstance().startScope(function);
++        } catch (IOException e) {
++          LOG.debug("Exception when starting metrics scope"
+             + e.getClass().getName() + " " + e.getMessage(), e);
++        }
+       }
+       return function;
+     }
+@@ -792,10 +792,12 @@ private void endFunction(String function, boolean 
successful, Exception e,
+     }
+ 
+     private void endFunction(String function, MetaStoreEndFunctionContext 
context) {
+-      try {
+-        Metrics.endScope(function);
+-      } catch (IOException e) {
+-        LOG.debug("Exception when closing metrics scope" + e);
++      if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
++        try {
++          MetricsFactory.getMetricsInstance().endScope(function);
++        } catch (IOException e) {
++          LOG.debug("Exception when closing metrics scope" + e);
++        }
+       }
+ 
+       for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
+@@ -819,6 +821,14 @@ public void shutdown() {
+           threadLocalMS.remove();
+         }
+       }
++      if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
++        try {
++          MetricsFactory.deInit();
++        } catch (Exception e) {
++          LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
++            + e.getMessage(), e);
++        }
++      }
+       logInfo("Metastore shutdown complete.");
+     }
+ 
+@@ -5901,6 +5911,16 @@ public void run() {
+         }
+       });
+ 
++      //Start Metrics for Standalone (Remote) Mode
++      if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) {
++        try {
++          MetricsFactory.init(conf);
++        } catch (Exception e) {
++          // log exception, but ignore inability to start
++          LOG.error("error in Metrics init: " + e.getClass().getName() + " "
++            + e.getMessage(), e);
++        }
++      }
+ 
+       Lock startLock = new ReentrantLock();
+       Condition startCondition = startLock.newCondition();
+@@ -6091,7 +6111,13 @@ public void run() {
+         // Wrap the start of the threads in a catch Throwable loop so that 
any failures
+         // don't doom the rest of the metastore.
+         startLock.lock();
+-        ShimLoader.getHadoopShims().startPauseMonitor(conf);
++        try {
++          JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf);
++          pauseMonitor.start();
++        } catch (Throwable t) {
++          LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs 
and Pauses may not be " +
++            "warned upon.", t);
++        }
+ 
+         try {
+           // Per the javadocs on Condition, do not depend on the condition 
alone as a start gate
+diff --git a/pom.xml b/pom.xml
+index b21d894..35133f2 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -116,6 +116,7 @@
+     <commons-pool.version>1.5.4</commons-pool.version>
+     <commons-dbcp.version>1.4</commons-dbcp.version>
+     <derby.version>10.11.1.1</derby.version>
++    <dropwizard.version>3.1.0</dropwizard.version>
+     <guava.version>14.0.1</guava.version>
+     <groovy.version>2.1.6</groovy.version>
+     <hadoop-20S.version>1.2.1</hadoop-20S.version>
+@@ -128,6 +129,8 @@
+     <httpcomponents.core.version>4.4</httpcomponents.core.version>
+     <ivy.version>2.4.0</ivy.version>
+     <jackson.version>1.9.2</jackson.version>
++    <!-- jackson 1 and 2 lines can coexist without issue, as they have 
different artifactIds -->
++    <jackson.new.version>2.4.2</jackson.new.version>
+     <javaewah.version>0.3.2</javaewah.version>
+     <javolution.version>5.5.1</javolution.version>
+     <jdo-api.version>3.0.1</jdo-api.version>
+diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
+index 58e8e49..7820ed5 100644
+--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
++++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
+@@ -42,14 +42,15 @@
+ import org.apache.curator.framework.api.CuratorEventType;
+ import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+ import org.apache.curator.retry.ExponentialBackoffRetry;
++import org.apache.hadoop.hive.common.JvmPauseMonitor;
+ import org.apache.hadoop.hive.common.LogUtils;
+ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
+ import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+ import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
+-import org.apache.hadoop.hive.shims.ShimLoader;
+ import org.apache.hadoop.hive.shims.Util

<TRUNCATED>

Reply via email to