http://git-wip-us.apache.org/repos/asf/hive/blob/bbb312f3/testutils/ptest2/src/test/resources/HIVE-10761.6.patch ---------------------------------------------------------------------- diff --git a/testutils/ptest2/src/test/resources/HIVE-10761.6.patch b/testutils/ptest2/src/test/resources/HIVE-10761.6.patch new file mode 100644 index 0000000..5b41850 --- /dev/null +++ b/testutils/ptest2/src/test/resources/HIVE-10761.6.patch @@ -0,0 +1,2539 @@ +diff --git a/common/pom.xml b/common/pom.xml +index a615c1e..8d4b1ea 100644 +--- a/common/pom.xml ++++ b/common/pom.xml +@@ -98,6 +98,26 @@ + <artifactId>json</artifactId> + <version>${json.version}</version> + </dependency> ++ <dependency> ++ <groupId>io.dropwizard.metrics</groupId> ++ <artifactId>metrics-core</artifactId> ++ <version>${dropwizard.version}</version> ++ </dependency> ++ <dependency> ++ <groupId>io.dropwizard.metrics</groupId> ++ <artifactId>metrics-jvm</artifactId> ++ <version>${dropwizard.version}</version> ++ </dependency> ++ <dependency> ++ <groupId>io.dropwizard.metrics</groupId> ++ <artifactId>metrics-json</artifactId> ++ <version>${dropwizard.version}</version> ++ </dependency> ++ <dependency> ++ <groupId>com.fasterxml.jackson.core</groupId> ++ <artifactId>jackson-databind</artifactId> ++ <version>${jackson.new.version}</version> ++ </dependency> + </dependencies> + + <profiles> +diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java +new file mode 100644 +index 0000000..c3949f2 +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java +@@ -0,0 +1,225 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common; ++ ++import com.google.common.base.Joiner; ++import com.google.common.base.Preconditions; ++import com.google.common.base.Stopwatch; ++import com.google.common.collect.Lists; ++import com.google.common.collect.Maps; ++import com.google.common.collect.Sets; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; ++import org.apache.hadoop.util.Daemon; ++ ++import java.lang.management.GarbageCollectorMXBean; ++import java.lang.management.ManagementFactory; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++ ++/** ++ * Based on the JvmPauseMonitor from Hadoop. ++ */ ++public class JvmPauseMonitor { ++ private static final Log LOG = LogFactory.getLog( ++ JvmPauseMonitor.class); ++ ++ /** The target sleep time */ ++ private static final long SLEEP_INTERVAL_MS = 500; ++ ++ /** log WARN if we detect a pause longer than this threshold */ ++ private final long warnThresholdMs; ++ private static final String WARN_THRESHOLD_KEY = ++ "jvm.pause.warn-threshold.ms"; ++ private static final long WARN_THRESHOLD_DEFAULT = 10000; ++ ++ /** log INFO if we detect a pause longer than this threshold */ ++ private final long infoThresholdMs; ++ private static final String INFO_THRESHOLD_KEY = ++ "jvm.pause.info-threshold.ms"; ++ private static final long INFO_THRESHOLD_DEFAULT = 1000; ++ ++ private long numGcWarnThresholdExceeded = 0; ++ private long numGcInfoThresholdExceeded = 0; ++ private long totalGcExtraSleepTime = 0; ++ ++ private Thread monitorThread; ++ private volatile boolean shouldRun = true; ++ ++ public JvmPauseMonitor(Configuration conf) { ++ this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); ++ this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); ++ } ++ ++ public void start() { ++ Preconditions.checkState(monitorThread == null, ++ "JvmPauseMonitor thread is Already started"); ++ monitorThread = new Daemon(new Monitor()); ++ monitorThread.start(); ++ } ++ ++ public void stop() { ++ shouldRun = false; ++ if (isStarted()) { ++ monitorThread.interrupt(); ++ try { ++ monitorThread.join(); ++ } catch (InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ } ++ } ++ } ++ ++ public boolean isStarted() { ++ return monitorThread != null; ++ } ++ ++ public long getNumGcWarnThreadholdExceeded() { ++ return numGcWarnThresholdExceeded; ++ } ++ ++ public long getNumGcInfoThresholdExceeded() { ++ return numGcInfoThresholdExceeded; ++ } ++ ++ public long getTotalGcExtraSleepTime() { ++ return totalGcExtraSleepTime; ++ } ++ ++ private String formatMessage(long extraSleepTime, ++ Map<String, GcTimes> gcTimesAfterSleep, ++ Map<String, GcTimes> gcTimesBeforeSleep) { ++ ++ Set<String> gcBeanNames = Sets.intersection( ++ gcTimesAfterSleep.keySet(), ++ gcTimesBeforeSleep.keySet()); ++ List<String> gcDiffs = Lists.newArrayList(); ++ for (String name : gcBeanNames) { ++ GcTimes diff = gcTimesAfterSleep.get(name).subtract( ++ gcTimesBeforeSleep.get(name)); ++ if (diff.gcCount != 0) { ++ gcDiffs.add("GC pool '" + name + "' had collection(s): " + ++ diff.toString()); ++ } ++ } ++ ++ String ret = "Detected pause in JVM or host machine (eg GC): " + ++ "pause of approximately " + extraSleepTime + "ms\n"; ++ if (gcDiffs.isEmpty()) { ++ ret += "No GCs detected"; ++ } else { ++ ret += Joiner.on("\n").join(gcDiffs); ++ } ++ return ret; ++ } ++ ++ private Map<String, GcTimes> getGcTimes() { ++ Map<String, GcTimes> map = Maps.newHashMap(); ++ List<GarbageCollectorMXBean> gcBeans = ++ ManagementFactory.getGarbageCollectorMXBeans(); ++ for (GarbageCollectorMXBean gcBean : gcBeans) { ++ map.put(gcBean.getName(), new GcTimes(gcBean)); ++ } ++ return map; ++ } ++ ++ private static class GcTimes { ++ private GcTimes(GarbageCollectorMXBean gcBean) { ++ gcCount = gcBean.getCollectionCount(); ++ gcTimeMillis = gcBean.getCollectionTime(); ++ } ++ ++ private GcTimes(long count, long time) { ++ this.gcCount = count; ++ this.gcTimeMillis = time; ++ } ++ ++ private GcTimes subtract(GcTimes other) { ++ return new GcTimes(this.gcCount - other.gcCount, ++ this.gcTimeMillis - other.gcTimeMillis); ++ } ++ ++ @Override ++ public String toString() { ++ return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; ++ } ++ ++ private long gcCount; ++ private long gcTimeMillis; ++ } ++ ++ private class Monitor implements Runnable { ++ @Override ++ public void run() { ++ Stopwatch sw = new Stopwatch(); ++ Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes(); ++ while (shouldRun) { ++ sw.reset().start(); ++ try { ++ Thread.sleep(SLEEP_INTERVAL_MS); ++ } catch (InterruptedException ie) { ++ return; ++ } ++ long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; ++ Map<String, GcTimes> gcTimesAfterSleep = getGcTimes(); ++ ++ if (extraSleepTime > warnThresholdMs) { ++ ++numGcWarnThresholdExceeded; ++ LOG.warn(formatMessage( ++ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); ++ incrementMetricsCounter("jvm.pause.warn-threshold", 1); ++ } else if (extraSleepTime > infoThresholdMs) { ++ ++numGcInfoThresholdExceeded; ++ LOG.info(formatMessage( ++ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); ++ incrementMetricsCounter("jvm.pause.info-threshold", 1); ++ } ++ incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime); ++ totalGcExtraSleepTime += extraSleepTime; ++ gcTimesBeforeSleep = gcTimesAfterSleep; ++ } ++ } ++ ++ private void incrementMetricsCounter(String name, long count) { ++ try { ++ MetricsFactory.getMetricsInstance().incrementCounter(name, count); ++ } catch (Exception e) { ++ LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e); ++ } ++ } ++ } ++ ++ /** ++ * Simple 'main' to facilitate manual testing of the pause monitor. ++ * ++ * This main function just leaks memory into a list. Running this class ++ * with a 1GB heap will very quickly go into "GC hell" and result in ++ * log messages about the GC pauses. ++ */ ++ public static void main(String []args) throws Exception { ++ new JvmPauseMonitor(new Configuration()).start(); ++ List<String> list = Lists.newArrayList(); ++ int i = 0; ++ while (true) { ++ list.add(String.valueOf(i++)); ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +new file mode 100644 +index 0000000..14f7afb +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +@@ -0,0 +1,262 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics; ++ ++import org.apache.hadoop.hive.common.metrics.common.Metrics; ++import org.apache.hadoop.hive.conf.HiveConf; ++ ++import java.io.IOException; ++import java.lang.management.ManagementFactory; ++import java.util.HashMap; ++ ++import javax.management.MBeanServer; ++import javax.management.MalformedObjectNameException; ++import javax.management.ObjectName; ++ ++/** ++ * This class may eventually get superseded by org.apache.hadoop.hive.common.metrics2.Metrics. ++ * ++ * Metrics Subsystem - allows exposure of a number of named parameters/counters ++ * via jmx, intended to be used as a static subsystem ++ * ++ * Has a couple of primary ways it can be used: ++ * (i) Using the set and get methods to set and get named parameters ++ * (ii) Using the incrementCounter method to increment and set named ++ * parameters in one go, rather than having to make a get and then a set. ++ * (iii) Using the startScope and endScope methods to start and end ++ * named "scopes" that record the number of times they've been ++ * instantiated and amount of time(in milliseconds) spent inside ++ * the scopes. ++ */ ++public class LegacyMetrics implements Metrics { ++ ++ private LegacyMetrics() { ++ // block ++ } ++ ++ /** ++ * MetricsScope : A class that encapsulates an idea of a metered scope. ++ * Instantiating a named scope and then closing it exposes two counters: ++ * (i) a "number of calls" counter ( <name>.n ), and ++ * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) ++ */ ++ public static class MetricsScope { ++ ++ final LegacyMetrics metrics; ++ ++ final String name; ++ final String numCounter; ++ final String timeCounter; ++ final String avgTimeCounter; ++ ++ private boolean isOpen = false; ++ private Long startTime = null; ++ ++ /** ++ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. ++ * @param name - name of the variable ++ * @throws IOException ++ */ ++ private MetricsScope(String name, LegacyMetrics metrics) throws IOException { ++ this.metrics = metrics; ++ this.name = name; ++ this.numCounter = name + ".n"; ++ this.timeCounter = name + ".t"; ++ this.avgTimeCounter = name + ".avg_t"; ++ open(); ++ } ++ ++ public Long getNumCounter() throws IOException { ++ return (Long) metrics.get(numCounter); ++ } ++ ++ public Long getTimeCounter() throws IOException { ++ return (Long) metrics.get(timeCounter); ++ } ++ ++ /** ++ * Opens scope, and makes note of the time started, increments run counter ++ * @throws IOException ++ * ++ */ ++ public void open() throws IOException { ++ if (!isOpen) { ++ isOpen = true; ++ startTime = System.currentTimeMillis(); ++ } else { ++ throw new IOException("Scope named " + name + " is not closed, cannot be opened."); ++ } ++ } ++ ++ /** ++ * Closes scope, and records the time taken ++ * @throws IOException ++ */ ++ public void close() throws IOException { ++ if (isOpen) { ++ Long endTime = System.currentTimeMillis(); ++ synchronized(metrics) { ++ Long num = metrics.incrementCounter(numCounter); ++ Long time = metrics.incrementCounter(timeCounter, endTime - startTime); ++ if (num != null && time != null) { ++ metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); ++ } ++ } ++ } else { ++ throw new IOException("Scope named " + name + " is not open, cannot be closed."); ++ } ++ isOpen = false; ++ } ++ ++ ++ /** ++ * Closes scope if open, and reopens it ++ * @throws IOException ++ */ ++ public void reopen() throws IOException { ++ if(isOpen) { ++ close(); ++ } ++ open(); ++ } ++ ++ } ++ ++ private static final MetricsMBean metrics = new MetricsMBeanImpl(); ++ ++ private static final ObjectName oname; ++ static { ++ try { ++ oname = new ObjectName( ++ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); ++ } catch (MalformedObjectNameException mone) { ++ throw new RuntimeException(mone); ++ } ++ } ++ ++ ++ private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes ++ = new ThreadLocal<HashMap<String,MetricsScope>>() { ++ @Override ++ protected HashMap<String,MetricsScope> initialValue() { ++ return new HashMap<String,MetricsScope>(); ++ } ++ }; ++ ++ private boolean initialized = false; ++ ++ public void init(HiveConf conf) throws Exception { ++ if (!initialized) { ++ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ++ mbs.registerMBean(metrics, oname); ++ initialized = true; ++ } ++ } ++ ++ public boolean isInitialized() { ++ return initialized; ++ } ++ ++ public Long incrementCounter(String name) throws IOException{ ++ if (!initialized) { ++ return null; ++ } ++ return incrementCounter(name,Long.valueOf(1)); ++ } ++ ++ public Long incrementCounter(String name, long increment) throws IOException{ ++ if (!initialized) { ++ return null; ++ } ++ Long value; ++ synchronized(metrics) { ++ if (!metrics.hasKey(name)) { ++ value = Long.valueOf(increment); ++ set(name, value); ++ } else { ++ value = ((Long)get(name)) + increment; ++ set(name, value); ++ } ++ } ++ return value; ++ } ++ ++ public void set(String name, Object value) throws IOException{ ++ if (!initialized) { ++ return; ++ } ++ metrics.put(name,value); ++ } ++ ++ public Object get(String name) throws IOException{ ++ if (!initialized) { ++ return null; ++ } ++ return metrics.get(name); ++ } ++ ++ public void startScope(String name) throws IOException{ ++ if (!initialized) { ++ return; ++ } ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).open(); ++ } else { ++ threadLocalScopes.get().put(name, new MetricsScope(name, this)); ++ } ++ } ++ ++ public MetricsScope getScope(String name) throws IOException { ++ if (!initialized) { ++ return null; ++ } ++ if (threadLocalScopes.get().containsKey(name)) { ++ return threadLocalScopes.get().get(name); ++ } else { ++ throw new IOException("No metrics scope named " + name); ++ } ++ } ++ ++ public void endScope(String name) throws IOException{ ++ if (!initialized) { ++ return; ++ } ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).close(); ++ } ++ } ++ ++ /** ++ * Resets the static context state to initial. ++ * Used primarily for testing purposes. ++ * ++ * Note that threadLocalScopes ThreadLocal is *not* cleared in this call. ++ */ ++ public void deInit() throws Exception { ++ synchronized (metrics) { ++ if (initialized) { ++ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ++ if (mbs.isRegistered(oname)) { ++ mbs.unregisterMBean(oname); ++ } ++ metrics.clear(); ++ initialized = false; ++ } ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java +deleted file mode 100644 +index 01c9d1d..0000000 +--- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java ++++ /dev/null +@@ -1,253 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.hadoop.hive.common.metrics; +- +-import java.io.IOException; +-import java.lang.management.ManagementFactory; +-import java.util.HashMap; +- +-import javax.management.MBeanServer; +-import javax.management.MalformedObjectNameException; +-import javax.management.ObjectName; +- +-/** +- * Metrics Subsystem - allows exposure of a number of named parameters/counters +- * via jmx, intended to be used as a static subsystem +- * +- * Has a couple of primary ways it can be used: +- * (i) Using the set and get methods to set and get named parameters +- * (ii) Using the incrementCounter method to increment and set named +- * parameters in one go, rather than having to make a get and then a set. +- * (iii) Using the startScope and endScope methods to start and end +- * named "scopes" that record the number of times they've been +- * instantiated and amount of time(in milliseconds) spent inside +- * the scopes. +- */ +-public class Metrics { +- +- private Metrics() { +- // block +- } +- +- /** +- * MetricsScope : A class that encapsulates an idea of a metered scope. +- * Instantiating a named scope and then closing it exposes two counters: +- * (i) a "number of calls" counter ( <name>.n ), and +- * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t) +- */ +- public static class MetricsScope { +- +- final String name; +- final String numCounter; +- final String timeCounter; +- final String avgTimeCounter; +- +- private boolean isOpen = false; +- private Long startTime = null; +- +- /** +- * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. +- * @param name - name of the variable +- * @throws IOException +- */ +- private MetricsScope(String name) throws IOException { +- this.name = name; +- this.numCounter = name + ".n"; +- this.timeCounter = name + ".t"; +- this.avgTimeCounter = name + ".avg_t"; +- open(); +- } +- +- public Long getNumCounter() throws IOException { +- return (Long)Metrics.get(numCounter); +- } +- +- public Long getTimeCounter() throws IOException { +- return (Long)Metrics.get(timeCounter); +- } +- +- /** +- * Opens scope, and makes note of the time started, increments run counter +- * @throws IOException +- * +- */ +- public void open() throws IOException { +- if (!isOpen) { +- isOpen = true; +- startTime = System.currentTimeMillis(); +- } else { +- throw new IOException("Scope named " + name + " is not closed, cannot be opened."); +- } +- } +- +- /** +- * Closes scope, and records the time taken +- * @throws IOException +- */ +- public void close() throws IOException { +- if (isOpen) { +- Long endTime = System.currentTimeMillis(); +- synchronized(metrics) { +- Long num = Metrics.incrementCounter(numCounter); +- Long time = Metrics.incrementCounter(timeCounter, endTime - startTime); +- if (num != null && time != null) { +- Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); +- } +- } +- } else { +- throw new IOException("Scope named " + name + " is not open, cannot be closed."); +- } +- isOpen = false; +- } +- +- +- /** +- * Closes scope if open, and reopens it +- * @throws IOException +- */ +- public void reopen() throws IOException { +- if(isOpen) { +- close(); +- } +- open(); +- } +- +- } +- +- private static final MetricsMBean metrics = new MetricsMBeanImpl(); +- +- private static final ObjectName oname; +- static { +- try { +- oname = new ObjectName( +- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); +- } catch (MalformedObjectNameException mone) { +- throw new RuntimeException(mone); +- } +- } +- +- +- private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes +- = new ThreadLocal<HashMap<String,MetricsScope>>() { +- @Override +- protected HashMap<String,MetricsScope> initialValue() { +- return new HashMap<String,MetricsScope>(); +- } +- }; +- +- private static boolean initialized = false; +- +- public static void init() throws Exception { +- synchronized (metrics) { +- if (!initialized) { +- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); +- mbs.registerMBean(metrics, oname); +- initialized = true; +- } +- } +- } +- +- public static Long incrementCounter(String name) throws IOException{ +- if (!initialized) { +- return null; +- } +- return incrementCounter(name,Long.valueOf(1)); +- } +- +- public static Long incrementCounter(String name, long increment) throws IOException{ +- if (!initialized) { +- return null; +- } +- Long value; +- synchronized(metrics) { +- if (!metrics.hasKey(name)) { +- value = Long.valueOf(increment); +- set(name, value); +- } else { +- value = ((Long)get(name)) + increment; +- set(name, value); +- } +- } +- return value; +- } +- +- public static void set(String name, Object value) throws IOException{ +- if (!initialized) { +- return; +- } +- metrics.put(name,value); +- } +- +- public static Object get(String name) throws IOException{ +- if (!initialized) { +- return null; +- } +- return metrics.get(name); +- } +- +- public static MetricsScope startScope(String name) throws IOException{ +- if (!initialized) { +- return null; +- } +- if (threadLocalScopes.get().containsKey(name)) { +- threadLocalScopes.get().get(name).open(); +- } else { +- threadLocalScopes.get().put(name, new MetricsScope(name)); +- } +- return threadLocalScopes.get().get(name); +- } +- +- public static MetricsScope getScope(String name) throws IOException { +- if (!initialized) { +- return null; +- } +- if (threadLocalScopes.get().containsKey(name)) { +- return threadLocalScopes.get().get(name); +- } else { +- throw new IOException("No metrics scope named " + name); +- } +- } +- +- public static void endScope(String name) throws IOException{ +- if (!initialized) { +- return; +- } +- if (threadLocalScopes.get().containsKey(name)) { +- threadLocalScopes.get().get(name).close(); +- } +- } +- +- /** +- * Resets the static context state to initial. +- * Used primarily for testing purposes. +- * +- * Note that threadLocalScopes ThreadLocal is *not* cleared in this call. +- */ +- static void uninit() throws Exception { +- synchronized (metrics) { +- if (initialized) { +- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); +- if (mbs.isRegistered(oname)) { +- mbs.unregisterMBean(oname); +- } +- metrics.clear(); +- initialized = false; +- } +- } +- } +-} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +new file mode 100644 +index 0000000..13a5336 +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +@@ -0,0 +1,68 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.common; ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.hive.conf.HiveConf; ++ ++import java.io.IOException; ++ ++/** ++ * Generic Metics interface. ++ */ ++public interface Metrics { ++ ++ /** ++ * Initialize Metrics system with given Hive configuration. ++ * @param conf ++ */ ++ public void init(HiveConf conf) throws Exception; ++ ++ /** ++ * Deinitializes the Metrics system. ++ */ ++ public void deInit() throws Exception; ++ ++ /** ++ * @param name ++ * @throws IOException ++ */ ++ public void startScope(String name) throws IOException; ++ ++ public void endScope(String name) throws IOException; ++ ++ //Counter-related methods ++ ++ /** ++ * Increments a counter of the given name by 1. ++ * @param name ++ * @return ++ * @throws IOException ++ */ ++ public Long incrementCounter(String name) throws IOException; ++ ++ /** ++ * Increments a counter of the given name by "increment" ++ * @param name ++ * @param increment ++ * @return ++ * @throws IOException ++ */ ++ public Long incrementCounter(String name, long increment) throws IOException; ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java +new file mode 100644 +index 0000000..12a309d +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java +@@ -0,0 +1,48 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.common; ++ ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.apache.hadoop.util.ReflectionUtils; ++ ++/** ++ * Class that manages a static Metric instance for this process. ++ */ ++public class MetricsFactory { ++ ++ private static Metrics metrics; ++ private static Object initLock = new Object(); ++ ++ public synchronized static void init(HiveConf conf) throws Exception { ++ if (metrics == null) { ++ metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName( ++ conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf); ++ } ++ metrics.init(conf); ++ } ++ ++ public synchronized static Metrics getMetricsInstance() { ++ return metrics; ++ } ++ ++ public synchronized static void deInit() throws Exception { ++ if (metrics != null) { ++ metrics.deInit(); ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +new file mode 100644 +index 0000000..e59da99 +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +@@ -0,0 +1,366 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.hive.common.metrics.metrics2; ++ ++import com.codahale.metrics.ConsoleReporter; ++import com.codahale.metrics.Counter; ++import com.codahale.metrics.ExponentiallyDecayingReservoir; ++import com.codahale.metrics.JmxReporter; ++import com.codahale.metrics.Metric; ++import com.codahale.metrics.MetricRegistry; ++import com.codahale.metrics.MetricSet; ++import com.codahale.metrics.Timer; ++import com.codahale.metrics.json.MetricsModule; ++import com.codahale.metrics.jvm.BufferPoolMetricSet; ++import com.codahale.metrics.jvm.ClassLoadingGaugeSet; ++import com.codahale.metrics.jvm.GarbageCollectorMetricSet; ++import com.codahale.metrics.jvm.MemoryUsageGaugeSet; ++import com.codahale.metrics.jvm.ThreadStatesGaugeSet; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Splitter; ++import com.google.common.cache.CacheBuilder; ++import com.google.common.cache.CacheLoader; ++import com.google.common.cache.LoadingCache; ++import com.google.common.collect.Lists; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.fs.permission.FsPermission; ++import org.apache.hadoop.hive.conf.HiveConf; ++ ++import java.io.BufferedReader; ++import java.io.BufferedWriter; ++import java.io.Closeable; ++import java.io.IOException; ++import java.io.OutputStreamWriter; ++import java.lang.management.ManagementFactory; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++import java.util.TimerTask; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.locks.Lock; ++import java.util.concurrent.locks.ReentrantLock; ++ ++/** ++ * Codahale-backed Metrics implementation. ++ */ ++public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics { ++ public static final String API_PREFIX = "api_"; ++ public static final Log LOGGER = LogFactory.getLog(CodahaleMetrics.class); ++ ++ public final MetricRegistry metricRegistry = new MetricRegistry(); ++ private final Lock timersLock = new ReentrantLock(); ++ private final Lock countersLock = new ReentrantLock(); ++ ++ private LoadingCache<String, Timer> timers; ++ private LoadingCache<String, Counter> counters; ++ ++ private boolean initialized = false; ++ private HiveConf conf; ++ private final Set<Closeable> reporters = new HashSet<Closeable>(); ++ ++ private final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes ++ = new ThreadLocal<HashMap<String,MetricsScope>>() { ++ @Override ++ protected HashMap<String,MetricsScope> initialValue() { ++ return new HashMap<String,MetricsScope>(); ++ } ++ }; ++ ++ public static class MetricsScope { ++ ++ final String name; ++ final Timer timer; ++ Timer.Context timerContext; ++ CodahaleMetrics metrics; ++ ++ private boolean isOpen = false; ++ ++ /** ++ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped. ++ * @param name - name of the variable ++ * @throws IOException ++ */ ++ private MetricsScope(String name, CodahaleMetrics metrics) throws IOException { ++ this.name = name; ++ this.metrics = metrics; ++ this.timer = metrics.getTimer(name); ++ open(); ++ } ++ ++ /** ++ * Opens scope, and makes note of the time started, increments run counter ++ * @throws IOException ++ * ++ */ ++ public void open() throws IOException { ++ if (!isOpen) { ++ isOpen = true; ++ this.timerContext = timer.time(); ++ } else { ++ throw new IOException("Scope named " + name + " is not closed, cannot be opened."); ++ } ++ } ++ ++ /** ++ * Closes scope, and records the time taken ++ * @throws IOException ++ */ ++ public void close() throws IOException { ++ if (isOpen) { ++ timerContext.close(); ++ ++ } else { ++ throw new IOException("Scope named " + name + " is not open, cannot be closed."); ++ } ++ isOpen = false; ++ } ++ } ++ ++ public synchronized void init(HiveConf conf) throws Exception { ++ if (initialized) { ++ return; ++ } ++ ++ this.conf = conf; ++ //Codahale artifacts are lazily-created. ++ timers = CacheBuilder.newBuilder().build( ++ new CacheLoader<String, com.codahale.metrics.Timer>() { ++ @Override ++ public com.codahale.metrics.Timer load(String key) throws Exception { ++ Timer timer = new Timer(new ExponentiallyDecayingReservoir()); ++ metricRegistry.register(key, timer); ++ return timer; ++ } ++ } ++ ); ++ counters = CacheBuilder.newBuilder().build( ++ new CacheLoader<String, Counter>() { ++ @Override ++ public Counter load(String key) throws Exception { ++ Counter counter = new Counter(); ++ metricRegistry.register(key, counter); ++ return counter; ++ } ++ } ++ ); ++ ++ //register JVM metrics ++ registerAll("gc", new GarbageCollectorMetricSet()); ++ registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer())); ++ registerAll("memory", new MemoryUsageGaugeSet()); ++ registerAll("threads", new ThreadStatesGaugeSet()); ++ registerAll("classLoading", new ClassLoadingGaugeSet()); ++ ++ //Metrics reporter ++ Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>(); ++ List<String> metricsReporterNames = Lists.newArrayList( ++ Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER))); ++ ++ if(metricsReporterNames != null) { ++ for (String metricsReportingName : metricsReporterNames) { ++ try { ++ MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase()); ++ finalReporterList.add(reporter); ++ } catch (IllegalArgumentException e) { ++ LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName); ++ } ++ } ++ } ++ initReporting(finalReporterList); ++ initialized = true; ++ } ++ ++ ++ public synchronized void deInit() throws Exception { ++ if (initialized) { ++ if (reporters != null) { ++ for (Closeable reporter : reporters) { ++ reporter.close(); ++ } ++ } ++ for (Map.Entry<String, Metric> metric : metricRegistry.getMetrics().entrySet()) { ++ metricRegistry.remove(metric.getKey()); ++ } ++ timers.invalidateAll(); ++ counters.invalidateAll(); ++ initialized = false; ++ } ++ } ++ ++ public void startScope(String name) throws IOException { ++ synchronized (this) { ++ if (!initialized) { ++ return; ++ } ++ } ++ name = API_PREFIX + name; ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).open(); ++ } else { ++ threadLocalScopes.get().put(name, new MetricsScope(name, this)); ++ } ++ } ++ ++ public void endScope(String name) throws IOException{ ++ synchronized (this) { ++ if (!initialized) { ++ return; ++ } ++ } ++ name = API_PREFIX + name; ++ if (threadLocalScopes.get().containsKey(name)) { ++ threadLocalScopes.get().get(name).close(); ++ } ++ } ++ ++ public Long incrementCounter(String name) throws IOException { ++ return incrementCounter(name, 1); ++ } ++ ++ public Long incrementCounter(String name, long increment) throws IOException { ++ String key = name; ++ try { ++ countersLock.lock(); ++ counters.get(key).inc(increment); ++ return counters.get(key).getCount(); ++ } catch(ExecutionException ee) { ++ throw new RuntimeException(ee); ++ } finally { ++ countersLock.unlock(); ++ } ++ } ++ ++ // This method is necessary to synchronize lazy-creation to the timers. ++ private Timer getTimer(String name) throws IOException { ++ String key = name; ++ try { ++ timersLock.lock(); ++ Timer timer = timers.get(key); ++ return timer; ++ } catch (ExecutionException e) { ++ throw new IOException(e); ++ } finally { ++ timersLock.unlock(); ++ } ++ } ++ ++ private void registerAll(String prefix, MetricSet metricSet) { ++ for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) { ++ if (entry.getValue() instanceof MetricSet) { ++ registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue()); ++ } else { ++ metricRegistry.register(prefix + "." + entry.getKey(), entry.getValue()); ++ } ++ } ++ } ++ ++ @VisibleForTesting ++ public MetricRegistry getMetricRegistry() { ++ return metricRegistry; ++ } ++ ++ /** ++ * Should be only called once to initialize the reporters ++ */ ++ private void initReporting(Set<MetricsReporting> reportingSet) throws Exception { ++ for (MetricsReporting reporting : reportingSet) { ++ switch(reporting) { ++ case CONSOLE: ++ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry) ++ .convertRatesTo(TimeUnit.SECONDS) ++ .convertDurationsTo(TimeUnit.MILLISECONDS) ++ .build(); ++ consoleReporter.start(1, TimeUnit.SECONDS); ++ reporters.add(consoleReporter); ++ break; ++ case JMX: ++ final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry) ++ .convertRatesTo(TimeUnit.SECONDS) ++ .convertDurationsTo(TimeUnit.MILLISECONDS) ++ .build(); ++ jmxReporter.start(); ++ reporters.add(jmxReporter); ++ break; ++ case JSON_FILE: ++ final JsonFileReporter jsonFileReporter = new JsonFileReporter(); ++ jsonFileReporter.start(); ++ reporters.add(jsonFileReporter); ++ break; ++ } ++ } ++ } ++ ++ class JsonFileReporter implements Closeable { ++ private ObjectMapper jsonMapper = null; ++ private java.util.Timer timer = null; ++ ++ public void start() { ++ this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false)); ++ this.timer = new java.util.Timer(true); ++ ++ long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS); ++ final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION); ++ ++ timer.schedule(new TimerTask() { ++ @Override ++ public void run() { ++ BufferedWriter bw = null; ++ try { ++ String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); ++ Path tmpPath = new Path(pathString + ".tmp"); ++ FileSystem fs = FileSystem.get(conf); ++ fs.delete(tmpPath, true); ++ bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true))); ++ bw.write(json); ++ bw.close(); ++ ++ Path path = new Path(pathString); ++ fs.rename(tmpPath, path); ++ fs.setPermission(path, FsPermission.createImmutable((short) 0644)); ++ } catch (Exception e) { ++ LOGGER.warn("Error writing JSON Metrics to file", e); ++ } finally { ++ try { ++ if (bw != null) { ++ bw.close(); ++ } ++ } catch (IOException e) { ++ //Ignore. ++ } ++ } ++ ++ ++ } ++ }, 0, time); ++ } ++ ++ public void close() { ++ if (timer != null) { ++ this.timer.cancel(); ++ } ++ } ++ } ++} +diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java +new file mode 100644 +index 0000000..643246f +--- /dev/null ++++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java +@@ -0,0 +1,27 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.metrics2; ++ ++/** ++ * Reporting options for org.apache.hadoop.hive.common.metrics.metrics2.Metrics. ++ */ ++public enum MetricsReporting { ++ JMX, ++ CONSOLE, ++ JSON_FILE ++} +diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +index 49b8f97..55a79a9 100644 +--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ++++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +@@ -645,6 +645,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + "Maximum cache full % after which the cache cleaner thread kicks in."), + METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until", (float) 0.8, + "The cleaner thread cleans until cache reaches this % full size."), ++ METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable metrics on the metastore."), + + // Parameters for exporting metadata on table drop (requires the use of the) + // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener +@@ -1688,6 +1689,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + " EXECUTION: Log completion of tasks\n" + + " PERFORMANCE: Execution + Performance logs \n" + + " VERBOSE: All logs" ), ++ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), + // logging configuration + HIVE_LOG4J_FILE("hive.log4j.file", "", + "Hive log4j configuration file.\n" + +@@ -1715,7 +1717,21 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME( + "hive.autogen.columnalias.prefix.includefuncname", false, + "Whether to include function name in the column alias auto generated by Hive."), +- ++ HIVE_METRICS_CLASS("hive.service.metrics.class", ++ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics", ++ new StringSet( ++ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics", ++ "org.apache.hadoop.hive.common.metrics.LegacyMetrics"), ++ "Hive metrics subsystem implementation class."), ++ HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX", ++ "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"), ++ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/my-logging.properties", ++ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " + ++ "This file will get overwritten at every interval."), ++ HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s", ++ new TimeValidator(TimeUnit.MILLISECONDS), ++ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, " + ++ "the frequency of updating JSON metrics file."), + HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger", + "The class responsible for logging client side performance metrics. \n" + + "Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"), +diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java +new file mode 100644 +index 0000000..c14c7ee +--- /dev/null ++++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java +@@ -0,0 +1,295 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics; ++ ++import java.io.IOException; ++import java.lang.management.ManagementFactory; ++import java.util.concurrent.Callable; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++ ++import javax.management.Attribute; ++import javax.management.MBeanAttributeInfo; ++import javax.management.MBeanInfo; ++import javax.management.MBeanOperationInfo; ++import javax.management.MBeanServer; ++import javax.management.ObjectName; ++ ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; ++import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope; ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.junit.After; ++import org.junit.Before; ++import org.junit.Test; ++import static org.junit.Assert.*; ++ ++public class TestLegacyMetrics { ++ ++ private static final String scopeName = "foo"; ++ private static final long periodMs = 50L; ++ private static LegacyMetrics metrics; ++ ++ @Before ++ public void before() throws Exception { ++ MetricsFactory.deInit(); ++ HiveConf conf = new HiveConf(); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, LegacyMetrics.class.getCanonicalName()); ++ MetricsFactory.init(conf); ++ metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance(); ++ } ++ ++ @After ++ public void after() throws Exception { ++ MetricsFactory.deInit(); ++ } ++ ++ @Test ++ public void testMetricsMBean() throws Exception { ++ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ++ final ObjectName oname = new ObjectName( ++ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); ++ MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname); ++ // check implementation class: ++ assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName()); ++ ++ // check reset operation: ++ MBeanOperationInfo[] oops = mBeanInfo.getOperations(); ++ boolean resetFound = false; ++ for (MBeanOperationInfo op : oops) { ++ if ("reset".equals(op.getName())) { ++ resetFound = true; ++ break; ++ } ++ } ++ assertTrue(resetFound); ++ ++ // add metric with a non-null value: ++ Attribute attr = new Attribute("fooMetric", Long.valueOf(-77)); ++ mbs.setAttribute(oname, attr); ++ ++ mBeanInfo = mbs.getMBeanInfo(oname); ++ MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes(); ++ assertEquals(1, attrinuteInfos.length); ++ boolean attrFound = false; ++ for (MBeanAttributeInfo info : attrinuteInfos) { ++ if ("fooMetric".equals(info.getName())) { ++ assertEquals("java.lang.Long", info.getType()); ++ assertTrue(info.isReadable()); ++ assertTrue(info.isWritable()); ++ assertFalse(info.isIs()); ++ ++ attrFound = true; ++ break; ++ } ++ } ++ assertTrue(attrFound); ++ ++ // check metric value: ++ Object v = mbs.getAttribute(oname, "fooMetric"); ++ assertEquals(Long.valueOf(-77), v); ++ ++ // reset the bean: ++ Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]); ++ assertNull(result); ++ ++ // the metric value must be zeroed: ++ v = mbs.getAttribute(oname, "fooMetric"); ++ assertEquals(Long.valueOf(0), v); ++ } ++ ++ private <T> void expectIOE(Callable<T> c) throws Exception { ++ try { ++ T t = c.call(); ++ fail("IOE expected but ["+t+"] was returned."); ++ } catch (IOException ioe) { ++ // ok, expected ++ } ++ } ++ ++ @Test ++ public void testScopeSingleThread() throws Exception { ++ metrics.startScope(scopeName); ++ final MetricsScope fooScope = metrics.getScope(scopeName); ++ // the time and number counters become available only after the 1st ++ // scope close: ++ expectIOE(new Callable<Long>() { ++ @Override ++ public Long call() throws Exception { ++ Long num = fooScope.getNumCounter(); ++ return num; ++ } ++ }); ++ expectIOE(new Callable<Long>() { ++ @Override ++ public Long call() throws Exception { ++ Long time = fooScope.getTimeCounter(); ++ return time; ++ } ++ }); ++ // cannot open scope that is already open: ++ expectIOE(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ fooScope.open(); ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs+ 1); ++ // 1st close: ++ // closing of open scope should be ok: ++ metrics.endScope(scopeName); ++ expectIOE(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ metrics.endScope(scopeName); // closing of closed scope not allowed ++ return null; ++ } ++ }); ++ ++ assertEquals(Long.valueOf(1), fooScope.getNumCounter()); ++ final long t1 = fooScope.getTimeCounter().longValue(); ++ assertTrue(t1 > periodMs); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ ++ // opening allowed after closing: ++ metrics.startScope(scopeName); ++ // opening of already open scope not allowed: ++ expectIOE(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ metrics.startScope(scopeName); ++ return null; ++ } ++ }); ++ ++ assertEquals(Long.valueOf(1), fooScope.getNumCounter()); ++ assertEquals(t1, fooScope.getTimeCounter().longValue()); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs + 1); ++ // Reopening (close + open) allowed in opened state: ++ fooScope.reopen(); ++ ++ assertEquals(Long.valueOf(2), fooScope.getNumCounter()); ++ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); ++ ++ Thread.sleep(periodMs + 1); ++ // 3rd close: ++ fooScope.close(); ++ ++ assertEquals(Long.valueOf(3), fooScope.getNumCounter()); ++ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); ++ Double avgT = (Double) metrics.get("foo.avg_t"); ++ assertTrue(avgT.doubleValue() > periodMs); ++ } ++ ++ @Test ++ public void testScopeConcurrency() throws Exception { ++ metrics.startScope(scopeName); ++ MetricsScope fooScope = metrics.getScope(scopeName); ++ final int threads = 10; ++ ExecutorService executorService = Executors.newFixedThreadPool(threads); ++ for (int i=0; i<threads; i++) { ++ final int n = i; ++ executorService.submit(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ testScopeImpl(n); ++ return null; ++ } ++ }); ++ } ++ executorService.shutdown(); ++ assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS)); ++ ++ fooScope = metrics.getScope(scopeName); ++ assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter()); ++ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads); ++ Double avgT = (Double) metrics.get("foo.avg_t"); ++ assertTrue(avgT.doubleValue() > periodMs); ++ metrics.endScope(scopeName); ++ } ++ ++ void testScopeImpl(int n) throws Exception { ++ metrics.startScope(scopeName); ++ final MetricsScope fooScope = metrics.getScope(scopeName); ++ // cannot open scope that is already open: ++ expectIOE(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ fooScope.open(); ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs+ 1); ++ // 1st close: ++ metrics.endScope(scopeName); // closing of open scope should be ok. ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 1); ++ final long t1 = fooScope.getTimeCounter().longValue(); ++ assertTrue(t1 > periodMs); ++ ++ expectIOE(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ metrics.endScope(scopeName); // closing of closed scope not allowed ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ ++ // opening allowed after closing: ++ metrics.startScope(scopeName); ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 1); ++ assertTrue(fooScope.getTimeCounter().longValue() >= t1); ++ ++ // opening of already open scope not allowed: ++ expectIOE(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ metrics.startScope(scopeName); ++ return null; ++ } ++ }); ++ ++ assertSame(fooScope, metrics.getScope(scopeName)); ++ Thread.sleep(periodMs + 1); ++ // Reopening (close + open) allowed in opened state: ++ fooScope.reopen(); ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 2); ++ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); ++ ++ Thread.sleep(periodMs + 1); ++ // 3rd close: ++ fooScope.close(); ++ ++ assertTrue(fooScope.getNumCounter().longValue() >= 3); ++ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); ++ Double avgT = (Double) metrics.get("foo.avg_t"); ++ assertTrue(avgT.doubleValue() > periodMs); ++ } ++} +diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java +deleted file mode 100644 +index e85d3f8..0000000 +--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java ++++ /dev/null +@@ -1,286 +0,0 @@ +-/** +- * Licensed to the Apache Software Foundation (ASF) under one +- * or more contributor license agreements. See the NOTICE file +- * distributed with this work for additional information +- * regarding copyright ownership. The ASF licenses this file +- * to you under the Apache License, Version 2.0 (the +- * "License"); you may not use this file except in compliance +- * with the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-package org.apache.hadoop.hive.common.metrics; +- +-import java.io.IOException; +-import java.lang.management.ManagementFactory; +-import java.util.concurrent.Callable; +-import java.util.concurrent.ExecutorService; +-import java.util.concurrent.Executors; +-import java.util.concurrent.TimeUnit; +- +-import javax.management.Attribute; +-import javax.management.MBeanAttributeInfo; +-import javax.management.MBeanInfo; +-import javax.management.MBeanOperationInfo; +-import javax.management.MBeanServer; +-import javax.management.ObjectName; +- +-import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope; +-import org.junit.After; +-import org.junit.Before; +-import org.junit.Test; +-import static org.junit.Assert.*; +- +-public class TestMetrics { +- +- private static final String scopeName = "foo"; +- private static final long periodMs = 50L; +- +- @Before +- public void before() throws Exception { +- Metrics.uninit(); +- Metrics.init(); +- } +- +- @After +- public void after() throws Exception { +- Metrics.uninit(); +- } +- +- @Test +- public void testMetricsMBean() throws Exception { +- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); +- final ObjectName oname = new ObjectName( +- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean"); +- MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname); +- // check implementation class: +- assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName()); +- +- // check reset operation: +- MBeanOperationInfo[] oops = mBeanInfo.getOperations(); +- boolean resetFound = false; +- for (MBeanOperationInfo op : oops) { +- if ("reset".equals(op.getName())) { +- resetFound = true; +- break; +- } +- } +- assertTrue(resetFound); +- +- // add metric with a non-null value: +- Attribute attr = new Attribute("fooMetric", Long.valueOf(-77)); +- mbs.setAttribute(oname, attr); +- +- mBeanInfo = mbs.getMBeanInfo(oname); +- MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes(); +- assertEquals(1, attrinuteInfos.length); +- boolean attrFound = false; +- for (MBeanAttributeInfo info : attrinuteInfos) { +- if ("fooMetric".equals(info.getName())) { +- assertEquals("java.lang.Long", info.getType()); +- assertTrue(info.isReadable()); +- assertTrue(info.isWritable()); +- assertFalse(info.isIs()); +- +- attrFound = true; +- break; +- } +- } +- assertTrue(attrFound); +- +- // check metric value: +- Object v = mbs.getAttribute(oname, "fooMetric"); +- assertEquals(Long.valueOf(-77), v); +- +- // reset the bean: +- Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]); +- assertNull(result); +- +- // the metric value must be zeroed: +- v = mbs.getAttribute(oname, "fooMetric"); +- assertEquals(Long.valueOf(0), v); +- } +- +- private <T> void expectIOE(Callable<T> c) throws Exception { +- try { +- T t = c.call(); +- fail("IOE expected but ["+t+"] was returned."); +- } catch (IOException ioe) { +- // ok, expected +- } +- } +- +- @Test +- public void testScopeSingleThread() throws Exception { +- final MetricsScope fooScope = Metrics.startScope(scopeName); +- // the time and number counters become available only after the 1st +- // scope close: +- expectIOE(new Callable<Long>() { +- @Override +- public Long call() throws Exception { +- Long num = fooScope.getNumCounter(); +- return num; +- } +- }); +- expectIOE(new Callable<Long>() { +- @Override +- public Long call() throws Exception { +- Long time = fooScope.getTimeCounter(); +- return time; +- } +- }); +- // cannot open scope that is already open: +- expectIOE(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- fooScope.open(); +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs+1); +- // 1st close: +- // closing of open scope should be ok: +- Metrics.endScope(scopeName); +- expectIOE(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- Metrics.endScope(scopeName); // closing of closed scope not allowed +- return null; +- } +- }); +- +- assertEquals(Long.valueOf(1), fooScope.getNumCounter()); +- final long t1 = fooScope.getTimeCounter().longValue(); +- assertTrue(t1 > periodMs); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- +- // opening allowed after closing: +- Metrics.startScope(scopeName); +- // opening of already open scope not allowed: +- expectIOE(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- Metrics.startScope(scopeName); +- return null; +- } +- }); +- +- assertEquals(Long.valueOf(1), fooScope.getNumCounter()); +- assertEquals(t1, fooScope.getTimeCounter().longValue()); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs + 1); +- // Reopening (close + open) allowed in opened state: +- fooScope.reopen(); +- +- assertEquals(Long.valueOf(2), fooScope.getNumCounter()); +- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); +- +- Thread.sleep(periodMs + 1); +- // 3rd close: +- fooScope.close(); +- +- assertEquals(Long.valueOf(3), fooScope.getNumCounter()); +- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); +- Double avgT = (Double)Metrics.get("foo.avg_t"); +- assertTrue(avgT.doubleValue() > periodMs); +- } +- +- @Test +- public void testScopeConcurrency() throws Exception { +- MetricsScope fooScope = Metrics.startScope(scopeName); +- final int threads = 10; +- ExecutorService executorService = Executors.newFixedThreadPool(threads); +- for (int i=0; i<threads; i++) { +- final int n = i; +- executorService.submit(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- testScopeImpl(n); +- return null; +- } +- }); +- } +- executorService.shutdown(); +- assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS)); +- +- fooScope = Metrics.getScope(scopeName); +- assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter()); +- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads); +- Double avgT = (Double)Metrics.get("foo.avg_t"); +- assertTrue(avgT.doubleValue() > periodMs); +- Metrics.endScope(scopeName); +- } +- +- void testScopeImpl(int n) throws Exception { +- final MetricsScope fooScope = Metrics.startScope(scopeName); +- // cannot open scope that is already open: +- expectIOE(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- fooScope.open(); +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs+1); +- // 1st close: +- Metrics.endScope(scopeName); // closing of open scope should be ok. +- +- assertTrue(fooScope.getNumCounter().longValue() >= 1); +- final long t1 = fooScope.getTimeCounter().longValue(); +- assertTrue(t1 > periodMs); +- +- expectIOE(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- Metrics.endScope(scopeName); // closing of closed scope not allowed +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- +- // opening allowed after closing: +- Metrics.startScope(scopeName); +- +- assertTrue(fooScope.getNumCounter().longValue() >= 1); +- assertTrue(fooScope.getTimeCounter().longValue() >= t1); +- +- // opening of already open scope not allowed: +- expectIOE(new Callable<Void>() { +- @Override +- public Void call() throws Exception { +- Metrics.startScope(scopeName); +- return null; +- } +- }); +- +- assertSame(fooScope, Metrics.getScope(scopeName)); +- Thread.sleep(periodMs + 1); +- // Reopening (close + open) allowed in opened state: +- fooScope.reopen(); +- +- assertTrue(fooScope.getNumCounter().longValue() >= 2); +- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs); +- +- Thread.sleep(periodMs + 1); +- // 3rd close: +- fooScope.close(); +- +- assertTrue(fooScope.getNumCounter().longValue() >= 3); +- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs); +- Double avgT = (Double)Metrics.get("foo.avg_t"); +- assertTrue(avgT.doubleValue() > periodMs); +- } +-} +diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java +new file mode 100644 +index 0000000..8749349 +--- /dev/null ++++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java +@@ -0,0 +1,138 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.common.metrics.metrics2; ++ ++import com.codahale.metrics.Counter; ++import com.codahale.metrics.MetricRegistry; ++import com.codahale.metrics.Timer; ++import com.fasterxml.jackson.databind.JsonNode; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.apache.hadoop.hive.shims.ShimLoader; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++ ++import java.io.File; ++import java.nio.file.Files; ++import java.nio.file.Paths; ++import java.util.concurrent.Callable; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.TimeUnit; ++ ++import static org.junit.Assert.assertTrue; ++ ++/** ++ * Unit test for new Metrics subsystem. ++ */ ++public class TestCodahaleMetrics { ++ ++ private static File workDir = new File(System.getProperty("test.tmp.dir")); ++ private static File jsonReportFile; ++ public static MetricRegistry metricRegistry; ++ ++ @Before ++ public void before() throws Exception { ++ HiveConf conf = new HiveConf(); ++ ++ jsonReportFile = new File(workDir, "json_reporting"); ++ jsonReportFile.delete(); ++ String defaultFsName = ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS"); ++ conf.set(defaultFsName, "local"); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, CodahaleMetrics.class.getCanonicalName()); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); ++ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); ++ ++ MetricsFactory.init(conf); ++ metricRegistry = ((CodahaleMetrics) MetricsFactory.getMetricsInstance()).getMetricRegistry(); ++ } ++ ++ @After ++ public void after() throws Exception { ++ MetricsFactory.deInit(); ++ } ++ ++ @Test ++ public void testScope() throws Exception { ++ int runs = 5; ++ for (int i = 0; i < runs; i++) { ++ MetricsFactory.getMetricsInstance().startScope("method1"); ++ MetricsFactory.getMetricsInstance().endScope("method1"); ++ } ++ ++ Timer timer = metricRegistry.getTimers().get("api_method1"); ++ Assert.assertEquals(5, timer.getCount()); ++ Assert.assertTrue(timer.getMeanRate() > 0); ++ } ++ ++ ++ @Test ++ public void testCount() throws Exception { ++ int runs = 5; ++ for (int i = 0; i < runs; i++) { ++ MetricsFactory.getMetricsInstance().incrementCounter("count1"); ++ } ++ Counter counter = metricRegistry.getCounters().get("count1"); ++ Assert.assertEquals(5L, counter.getCount()); ++ } ++ ++ @Test ++ public void testConcurrency() throws Exception { ++ int threads = 4; ++ ExecutorService executorService = Executors.newFixedThreadPool(threads); ++ for (int i=0; i< threads; i++) { ++ final int n = i; ++ executorService.submit(new Callable<Void>() { ++ @Override ++ public Void call() throws Exception { ++ MetricsFactory.getMetricsInstance().startScope("method2"); ++ MetricsFactory.getMetricsInstance().endScope("method2"); ++ return null; ++ } ++ }); ++ } ++ executorService.shutdown(); ++ assertTrue(executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)); ++ Timer timer = metricRegistry.getTimers().get("api_method2"); ++ Assert.assertEquals(4, timer.getCount()); ++ Assert.assertTrue(timer.getMeanRate() > 0); ++ } ++ ++ @Test ++ public void testFileReporting() throws Exception { ++ int runs = 5; ++ for (int i = 0; i < runs; i++) { ++ MetricsFactory.getMetricsInstance().incrementCounter("count2"); ++ Thread.sleep(100); ++ } ++ ++ Thread.sleep(2000); ++ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); ++ ObjectMapper objectMapper = new ObjectMapper(); ++ ++ JsonNode rootNode = objectMapper.readTree(jsonData); ++ JsonNode countersNode = rootNode.path("counters"); ++ JsonNode methodCounterNode = countersNode.path("count2"); ++ JsonNode countNode = methodCounterNode.path("count"); ++ Assert.assertEquals(countNode.asInt(), 5); ++ } ++} +diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java +new file mode 100644 +index 0000000..25f34d1 +--- /dev/null ++++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java +@@ -0,0 +1,94 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hive.metastore; ++ ++import com.fasterxml.jackson.databind.JsonNode; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import junit.framework.TestCase; ++import org.apache.hadoop.hive.cli.CliSessionState; ++import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; ++import org.apache.hadoop.hive.conf.HiveConf; ++import org.apache.hadoop.hive.ql.Driver; ++import org.apache.hadoop.hive.ql.session.SessionState; ++import org.apache.hadoop.hive.shims.ShimLoader; ++import org.junit.After; ++import org.junit.AfterClass; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.BeforeClass; ++import org.junit.Test; ++ ++import java.io.File; ++import java.io.IOException; ++import java.nio.file.Files; ++import java.nio.file.Paths; ++ ++/** ++ * Tests Hive Metastore Metrics. ++ */ ++public class TestMetaStoreMetrics { ++ ++ private static File workDir = new File(System.getProperty("test.tmp.dir")); ++ private static File jsonReportFile; ++ ++ private static HiveConf hiveConf; ++ private static Driver driver; ++ ++ ++ @Before ++ public void before() throws Exception { ++ ++ int port = MetaStoreUtils.findFreePort(); ++ ++ jsonReportFile = new File(workDir, "json_reporting"); ++ jsonReportFile.delete(); ++ ++ hiveConf = new HiveConf(TestMetaStoreMetrics.class); ++ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); ++ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); ++ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true); ++ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); ++ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name()); ++ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString()); ++ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms"); ++ ++ MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf); ++ ++ SessionState.start(new CliSessionState(hiveConf)); ++ driver = new Driver(hiveConf); ++ } ++ ++ @Test ++ public void testMetricsFile() throws Exception { ++ driver.run("show databases"); ++ ++ //give timer thread a chance to print the metrics ++ Thread.sleep(2000); ++ ++ //As the file is being written, try a few times. ++ //This can be replaced by CodahaleMetrics's JsonServlet reporter once it is exposed. ++ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath())); ++ ObjectMapper objectMapper = new ObjectMapper(); ++ ++ JsonNode rootNode = objectMapper.readTree(jsonData); ++ JsonNode countersNode = rootNode.path("timers"); ++ JsonNode methodCounterNode = countersNode.path("api_get_all_databases"); ++ JsonNode countNode = methodCounterNode.path("count"); ++ Assert.assertTrue(countNode.asInt() > 0); ++ } ++} +diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +index d81c856..1688920 100644 +--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ++++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +@@ -18,39 +18,14 @@ + + package org.apache.hadoop.hive.metastore; + +-import static org.apache.commons.lang.StringUtils.join; +-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT; +-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName; +- +-import java.io.IOException; +-import java.text.DateFormat; +-import java.text.SimpleDateFormat; +-import java.util.AbstractMap; +-import java.util.ArrayList; +-import java.util.Arrays; +-import java.util.Collections; +-import java.util.Formatter; +-import java.util.HashMap; +-import java.util.HashSet; +-import java.util.Iterator; +-import java.util.LinkedHashMap; +-import java.util.LinkedList; +-import java.util.List; +-import java.util.Map; +-import java.util.Map.Entry; +-import java.util.Properties; +-import java.util.Set; +-import java.util.Timer; +-import java.util.concurrent.TimeUnit; +-import java.util.concurrent.atomic.AtomicBoolean; +-import java.util.concurrent.locks.Condition; +-import java.util.concurrent.locks.Lock; +-import java.util.concurrent.locks.ReentrantLock; +-import java.util.regex.Pattern; +- +-import javax.jdo.JDOException; +- ++import com.facebook.fb303.FacebookBase; ++import com.facebook.fb303.fb_status; ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Splitter; ++import com.google.common.collect.ImmutableList; ++import com.google.common.collect.ImmutableListMultimap; ++import com.google.common.collect.Lists; ++import com.google.common.collect.Multimaps; + import org.apache.commons.cli.OptionBuilder; + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; +@@ -58,12 +33,13 @@ + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.common.FileUtils; ++import org.apache.hadoop.hive.common.JvmPauseMonitor; + import org.apache.hadoop.hive.common.LogUtils; + import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; + import org.apache.hadoop.hive.common.classification.InterfaceAudience; + import org.apache.hadoop.hive.common.classification.InterfaceStability; + import org.apache.hadoop.hive.common.cli.CommonCliOptions; +-import org.apache.hadoop.hive.common.metrics.Metrics; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; + import org.apache.hadoop.hive.conf.HiveConf; + import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +@@ -221,14 +197,35 @@ + import org.apache.thrift.transport.TTransport; + import org.apache.thrift.transport.TTransportFactory; + +-import com.facebook.fb303.FacebookBase; +-import com.facebook.fb303.fb_status; +-import com.google.common.annotations.VisibleForTesting; +-import com.google.common.base.Splitter; +-import com.google.common.collect.ImmutableList; +-import com.google.common.collect.ImmutableListMultimap; +-import com.google.common.collect.Lists; +-import com.google.common.collect.Multimaps; ++import javax.jdo.JDOException; ++import java.io.IOException; ++import java.text.DateFormat; ++import java.text.SimpleDateFormat; ++import java.util.AbstractMap; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collections; ++import java.util.Formatter; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.Iterator; ++import java.util.LinkedHashMap; ++import java.util.LinkedList; ++import java.util.List; ++import java.util.Map; ++import java.util.Map.Entry; ++import java.util.Properties; ++import java.util.Set; ++import java.util.Timer; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.locks.Condition; ++import java.util.concurrent.locks.Lock; ++import java.util.concurrent.locks.ReentrantLock; ++import java.util.regex.Pattern; ++ ++import static org.apache.commons.lang.StringUtils.join; ++import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*; + + /** + * TODO:pc remove application logic to a separate interface. +@@ -464,9 +461,10 @@ public void init() throws MetaException { + } + } + +- if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { ++ //Start Metrics for Embedded mode ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { + try { +- Metrics.init(); ++ MetricsFactory.init(hiveConf); + } catch (Exception e) { + // log exception, but ignore inability to start + LOG.error("error in Metrics init: " + e.getClass().getName() + " " +@@ -750,11 +748,13 @@ private String startFunction(String function, String extraLogInfo) { + incrementCounter(function); + logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") + + function + extraLogInfo); +- try { +- Metrics.startScope(function); +- } catch (IOException e) { +- LOG.debug("Exception when starting metrics scope" ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.getMetricsInstance().startScope(function); ++ } catch (IOException e) { ++ LOG.debug("Exception when starting metrics scope" + + e.getClass().getName() + " " + e.getMessage(), e); ++ } + } + return function; + } +@@ -792,10 +792,12 @@ private void endFunction(String function, boolean successful, Exception e, + } + + private void endFunction(String function, MetaStoreEndFunctionContext context) { +- try { +- Metrics.endScope(function); +- } catch (IOException e) { +- LOG.debug("Exception when closing metrics scope" + e); ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.getMetricsInstance().endScope(function); ++ } catch (IOException e) { ++ LOG.debug("Exception when closing metrics scope" + e); ++ } + } + + for (MetaStoreEndFunctionListener listener : endFunctionListeners) { +@@ -819,6 +821,14 @@ public void shutdown() { + threadLocalMS.remove(); + } + } ++ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.deInit(); ++ } catch (Exception e) { ++ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " " ++ + e.getMessage(), e); ++ } ++ } + logInfo("Metastore shutdown complete."); + } + +@@ -5901,6 +5911,16 @@ public void run() { + } + }); + ++ //Start Metrics for Standalone (Remote) Mode ++ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) { ++ try { ++ MetricsFactory.init(conf); ++ } catch (Exception e) { ++ // log exception, but ignore inability to start ++ LOG.error("error in Metrics init: " + e.getClass().getName() + " " ++ + e.getMessage(), e); ++ } ++ } + + Lock startLock = new ReentrantLock(); + Condition startCondition = startLock.newCondition(); +@@ -6091,7 +6111,13 @@ public void run() { + // Wrap the start of the threads in a catch Throwable loop so that any failures + // don't doom the rest of the metastore. + startLock.lock(); +- ShimLoader.getHadoopShims().startPauseMonitor(conf); ++ try { ++ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf); ++ pauseMonitor.start(); ++ } catch (Throwable t) { ++ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " + ++ "warned upon.", t); ++ } + + try { + // Per the javadocs on Condition, do not depend on the condition alone as a start gate +diff --git a/pom.xml b/pom.xml +index b21d894..35133f2 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -116,6 +116,7 @@ + <commons-pool.version>1.5.4</commons-pool.version> + <commons-dbcp.version>1.4</commons-dbcp.version> + <derby.version>10.11.1.1</derby.version> ++ <dropwizard.version>3.1.0</dropwizard.version> + <guava.version>14.0.1</guava.version> + <groovy.version>2.1.6</groovy.version> + <hadoop-20S.version>1.2.1</hadoop-20S.version> +@@ -128,6 +129,8 @@ + <httpcomponents.core.version>4.4</httpcomponents.core.version> + <ivy.version>2.4.0</ivy.version> + <jackson.version>1.9.2</jackson.version> ++ <!-- jackson 1 and 2 lines can coexist without issue, as they have different artifactIds --> ++ <jackson.new.version>2.4.2</jackson.new.version> + <javaewah.version>0.3.2</javaewah.version> + <javolution.version>5.5.1</javolution.version> + <jdo-api.version>3.0.1</jdo-api.version> +diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java +index 58e8e49..7820ed5 100644 +--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java ++++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java +@@ -42,14 +42,15 @@ + import org.apache.curator.framework.api.CuratorEventType; + import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; + import org.apache.curator.retry.ExponentialBackoffRetry; ++import org.apache.hadoop.hive.common.JvmPauseMonitor; + import org.apache.hadoop.hive.common.LogUtils; + import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; ++import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; + import org.apache.hadoop.hive.conf.HiveConf; + import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; + import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; + import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; +-import org.apache.hadoop.hive.shims.ShimLoader; + import org.apache.hadoop.hive.shims.Util
<TRUNCATED>
