Author: cdouglas
Date: Wed Dec 3 17:16:07 2008
New Revision: 723181
URL: http://svn.apache.org/viewvc?rev=723181&view=rev
Log:
HADOOP-4758. Add a splitter for metrics contexts to support more than one type
of collector.
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723181&r1=723180&r2=723181&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Dec 3 17:16:07 2008
@@ -158,6 +158,9 @@
HADOOP-4708. Add support for dfsadmin commands in TestCLI. (Boris Shkolnik
via cdouglas)
+ HADOOP-4758. Add a splitter for metrics contexts to support more than one
+ type of collector. (cdouglas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java?rev=723181&r1=723180&r2=723181&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/ContextFactory.java
Wed Dec 3 17:16:07 2008
@@ -45,8 +45,8 @@
private static ContextFactory theFactory = null;
private Map<String,Object> attributeMap = new HashMap<String,Object>();
- private Map<String,AbstractMetricsContext> contextMap =
- new HashMap<String,AbstractMetricsContext>();
+ private Map<String,MetricsContext> contextMap =
+ new HashMap<String,MetricsContext>();
// Used only when contexts, or the ContextFactory itself, cannot be
// created.
@@ -119,23 +119,29 @@
* @param contextName the name of the context
* @return the named MetricsContext
*/
- public synchronized MetricsContext getContext(String contextName)
- throws IOException, ClassNotFoundException, InstantiationException,
IllegalAccessException
- {
- AbstractMetricsContext metricsContext = contextMap.get(contextName);
+ public synchronized MetricsContext getContext(String refName, String
contextName)
+ throws IOException, ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ MetricsContext metricsContext = contextMap.get(refName);
if (metricsContext == null) {
- String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
+ String classNameAttribute = refName + CONTEXT_CLASS_SUFFIX;
String className = (String) getAttribute(classNameAttribute);
if (className == null) {
className = DEFAULT_CONTEXT_CLASSNAME;
}
Class contextClass = Class.forName(className);
- metricsContext = (AbstractMetricsContext) contextClass.newInstance();
+ metricsContext = (MetricsContext) contextClass.newInstance();
metricsContext.init(contextName, this);
contextMap.put(contextName, metricsContext);
}
return metricsContext;
}
+
+ public synchronized MetricsContext getContext(String contextName)
+ throws IOException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ return getContext(contextName, contextName);
+ }
/**
* Returns a "null" context - one which does nothing.
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java?rev=723181&r1=723180&r2=723181&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsContext.java
Wed Dec 3 17:16:07 2008
@@ -31,7 +31,14 @@
* Default period in seconds at which data is sent to the metrics system.
*/
public static final int DEFAULT_PERIOD = 5;
-
+
+ /**
+ * Initialize this context.
+ * @param contextName The given name for this context
+ * @param factory The creator of this context
+ */
+ public void init(String contextName, ContextFactory factory);
+
/**
* Returns the context name.
*
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java?rev=723181&r1=723180&r2=723181&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/metrics/MetricsUtil.java Wed
Dec 3 17:16:07 2008
@@ -41,15 +41,20 @@
*/
private MetricsUtil() {}
+ public static MetricsContext getContext(String contextName) {
+ return getContext(contextName, contextName);
+ }
+
/**
* Utility method to return the named context.
* If the desired context cannot be created for any reason, the exception
* is logged, and a null context is returned.
*/
- public static MetricsContext getContext(String contextName) {
+ public static MetricsContext getContext(String refName, String contextName) {
MetricsContext metricsContext;
try {
- metricsContext = ContextFactory.getFactory().getContext(contextName);
+ metricsContext =
+ ContextFactory.getFactory().getContext(refName, contextName);
if (!metricsContext.isMonitoring()) {
metricsContext.startMonitoring();
}
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java?rev=723181&r1=723180&r2=723181&view=diff
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
(original)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
Wed Dec 3 17:16:07 2008
@@ -208,7 +208,7 @@
* @param recordName the name of the record
* @return newly created instance of MetricsRecordImpl or subclass
*/
- protected MetricsRecordImpl newRecord(String recordName) {
+ protected MetricsRecord newRecord(String recordName) {
return new MetricsRecordImpl(recordName, this);
}
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java?rev=723181&view=auto
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java
(added)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/metrics/spi/CompositeContext.java
Wed Dec 3 17:16:07 2008
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics.spi;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsException;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+
+public class CompositeContext extends AbstractMetricsContext {
+
+ private static final Log LOG = LogFactory.getLog(CompositeContext.class);
+ private static final String ARITY_LABEL = "arity";
+ private static final String SUB_FMT = "%s.sub%d";
+ private final ArrayList<MetricsContext> subctxt =
+ new ArrayList<MetricsContext>();
+
+ public CompositeContext() {
+ }
+
+ public void init(String contextName, ContextFactory factory) {
+ super.init(contextName, factory);
+ int nKids;
+ try {
+ String sKids = getAttribute(ARITY_LABEL);
+ nKids = Integer.valueOf(sKids);
+ } catch (Exception e) {
+ LOG.error("Unable to initialize composite metric " + contextName +
+ ": could not init arity", e);
+ return;
+ }
+ for (int i = 0; i < nKids; ++i) {
+ MetricsContext ctxt = MetricsUtil.getContext(
+ String.format(SUB_FMT, contextName, i), contextName);
+ if (null != ctxt) {
+ subctxt.add(ctxt);
+ }
+ }
+ }
+
+ @Override
+ public MetricsRecord newRecord(String recordName) {
+ return (MetricsRecord) Proxy.newProxyInstance(
+ MetricsRecord.class.getClassLoader(),
+ new Class[] { MetricsRecord.class },
+ new MetricsRecordDelegator(recordName, subctxt));
+ }
+
+ @Override
+ protected void emitRecord(String contextName, String recordName,
+ OutputRecord outRec) throws IOException {
+ for (MetricsContext ctxt : subctxt) {
+ try {
+ ((AbstractMetricsContext)ctxt).emitRecord(
+ contextName, recordName, outRec);
+ if (contextName == null || recordName == null || outRec == null) {
+ throw new IOException(contextName + ":" + recordName + ":" + outRec);
+ }
+ } catch (IOException e) {
+ LOG.warn("emitRecord failed: " + ctxt.getContextName(), e);
+ }
+ }
+ }
+
+ @Override
+ protected void flush() throws IOException {
+ for (MetricsContext ctxt : subctxt) {
+ try {
+ ((AbstractMetricsContext)ctxt).flush();
+ } catch (IOException e) {
+ LOG.warn("flush failed: " + ctxt.getContextName(), e);
+ }
+ }
+ }
+
+ @Override
+ public void startMonitoring() throws IOException {
+ for (MetricsContext ctxt : subctxt) {
+ try {
+ ctxt.startMonitoring();
+ } catch (IOException e) {
+ LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e);
+ }
+ }
+ }
+
+ @Override
+ public void stopMonitoring() {
+ for (MetricsContext ctxt : subctxt) {
+ ctxt.stopMonitoring();
+ }
+ }
+
+ /**
+ * Return true if all subcontexts are monitoring.
+ */
+ @Override
+ public boolean isMonitoring() {
+ boolean ret = true;
+ for (MetricsContext ctxt : subctxt) {
+ ret &= ctxt.isMonitoring();
+ }
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ for (MetricsContext ctxt : subctxt) {
+ ctxt.close();
+ }
+ }
+
+ @Override
+ public void registerUpdater(Updater updater) {
+ for (MetricsContext ctxt : subctxt) {
+ ctxt.registerUpdater(updater);
+ }
+ }
+
+ @Override
+ public void unregisterUpdater(Updater updater) {
+ for (MetricsContext ctxt : subctxt) {
+ ctxt.unregisterUpdater(updater);
+ }
+ }
+
+ private static class MetricsRecordDelegator implements InvocationHandler {
+ private static final Method m_getRecordName = initMethod();
+ private static Method initMethod() {
+ try {
+ return MetricsRecord.class.getMethod("getRecordName", new Class[0]);
+ } catch (Exception e) {
+ throw new RuntimeException("Internal error", e);
+ }
+ }
+
+ private final String recordName;
+ private final ArrayList<MetricsRecord> subrecs;
+
+ MetricsRecordDelegator(String recordName, ArrayList<MetricsContext> ctxts)
{
+ this.recordName = recordName;
+ this.subrecs = new ArrayList<MetricsRecord>(ctxts.size());
+ for (MetricsContext ctxt : ctxts) {
+ subrecs.add(ctxt.createRecord(recordName));
+ }
+ }
+
+ public Object invoke(Object p, Method m, Object[] args) throws Throwable {
+ if (m_getRecordName.equals(m)) {
+ return recordName;
+ }
+ assert Void.TYPE.equals(m.getReturnType());
+ for (MetricsRecord rec : subrecs) {
+ m.invoke(rec, args);
+ }
+ return null;
+ }
+ }
+
+}