Author: mbautin Date: Fri Feb 17 01:56:08 2012 New Revision: 1245287 URL: http://svn.apache.org/viewvc?rev=1245287&view=rev Log: [HBASE-5186]Add metrics to ThriftServer
Summary: Port from Apache JIRA https://issues.apache.org/jira/browse/Hbase-5186 Test Plan: Past thrift unit tests Reviewers: kannan, dhruba Reviewed By: dhruba CC: hbase@lists, dhruba, davejwatson Differential Revision: https://phabricator.fb.com/D403352 Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java?rev=1245287&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java Fri Feb 17 01:56:08 2012 @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A BlockingQueue reports waiting time in queue and queue length to + * ThriftMetrics. + */ +public class CallQueue implements BlockingQueue<Runnable> { + + private final BlockingQueue<Call> underlyingQueue; + private final ThriftMetrics metrics; + + public CallQueue(BlockingQueue<Call> underlyingQueue, + ThriftMetrics metrics) { + this.underlyingQueue = underlyingQueue; + this.metrics = metrics; + } + + private static long now() { + return System.nanoTime(); + } + + public static class Call implements Runnable { + final long startTime; + final Runnable underlyingRunnable; + + Call(Runnable underlyingRunnable) { + this.underlyingRunnable = underlyingRunnable; + this.startTime = now(); + } + + @Override + public void run() { + underlyingRunnable.run(); + } + + public long timeInQueue() { + return now() - startTime; + } + + @Override + public boolean equals(Object other) { + if (other instanceof Call) { + Call otherCall = (Call)(other); + return this.underlyingRunnable.equals(otherCall.underlyingRunnable); + } else if (other instanceof Runnable) { + return this.underlyingRunnable.equals(other); + } + return false; + } + + @Override + public int hashCode() { + return this.underlyingRunnable.hashCode(); + } + } + + @Override + public Runnable poll() { + Call result = underlyingQueue.poll(); + updateMetrics(result); + return result; + } + + private void updateMetrics(Call result) { + if (result == null) { + return; + } + metrics.incTimeInQueue(result.timeInQueue()); + metrics.setCallQueueLen(this.size()); + } + + @Override + public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { + Call result = underlyingQueue.poll(timeout, unit); + updateMetrics(result); + return result; + } + + @Override + public Runnable remove() { + Call result = underlyingQueue.remove(); + updateMetrics(result); + return result; + } + + @Override + public Runnable take() throws InterruptedException { + Call result = underlyingQueue.take(); + updateMetrics(result); + return result; + } + + @Override + public int drainTo(Collection<? super Runnable> destination) { + return drainTo(destination, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection<? super Runnable> destination, + int maxElements) { + if (destination == this) { + throw new IllegalArgumentException( + "A BlockingQueue cannot drain to itself."); + } + List<Call> drained = new ArrayList<Call>(); + underlyingQueue.drainTo(drained, maxElements); + for (Call r : drained) { + updateMetrics(r); + } + destination.addAll(drained); + return drained.size(); + } + + + @Override + public boolean offer(Runnable element) { + return underlyingQueue.offer(new Call(element)); + } + + @Override + public boolean offer(Runnable element, long timeout, TimeUnit unit) + throws InterruptedException { + return underlyingQueue.offer(new Call(element), timeout, unit); + } + @Override + public void put(Runnable element) throws InterruptedException { + underlyingQueue.put(new Call(element)); + } + + @Override + public boolean add(Runnable element) { + return underlyingQueue.add(new Call(element)); + } + + @Override + public boolean addAll(Collection<? extends Runnable> elements) { + int added = 0; + for (Runnable r : elements) { + added += underlyingQueue.add(new Call(r)) ? 1 : 0; + } + return added != 0; + } + + @Override + public Runnable element() { + return underlyingQueue.element(); + } + + @Override + public Runnable peek() { + return underlyingQueue.peek(); + } + + @Override + public void clear() { + underlyingQueue.clear(); + } + + @Override + public boolean containsAll(Collection<?> elements) { + return underlyingQueue.containsAll(elements); + } + + @Override + public boolean isEmpty() { + return underlyingQueue.isEmpty(); + } + + @Override + public Iterator<Runnable> iterator() { + return new Iterator<Runnable>() { + final Iterator<Call> underlyingIterator = underlyingQueue.iterator(); + @Override + public Runnable next() { + return underlyingIterator.next(); + } + + @Override + public boolean hasNext() { + return underlyingIterator.hasNext(); + } + + @Override + public void remove() { + underlyingIterator.remove(); + } + }; + } + + @Override + public boolean removeAll(Collection<?> elements) { + return underlyingQueue.removeAll(elements); + } + + @Override + public boolean retainAll(Collection<?> elements) { + return underlyingQueue.retainAll(elements); + } + + @Override + public int size() { + return underlyingQueue.size(); + } + + @Override + public Object[] toArray() { + return underlyingQueue.toArray(); + } + + @Override + public <T> T[] toArray(T[] array) { + return underlyingQueue.toArray(array); + } + + @Override + public boolean contains(Object element) { + return underlyingQueue.contains(element); + } + + @Override + public int remainingCapacity() { + return underlyingQueue.remainingCapacity(); + } + + @Override + public boolean remove(Object element) { + return underlyingQueue.remove(element); + } +} Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java?rev=1245287&r1=1245286&r2=1245287&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java Fri Feb 17 01:56:08 2012 @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.thrift.CallQueue.Call; import org.apache.hadoop.hbase.util.Threads; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -110,16 +111,18 @@ public class HBaseThreadPoolServer exten TServerTransport serverTransport, TTransportFactory transportFactory, TProtocolFactory protocolFactory, - Options options) { + Options options, + ThriftMetrics metrics) { super(new TProcessorFactory(processor), serverTransport, transportFactory, transportFactory, protocolFactory, protocolFactory); BlockingQueue<Runnable> executorQueue; if (options.maxQueuedRequests > 0) { - executorQueue = new LinkedBlockingQueue<Runnable>( - options.maxQueuedRequests); + executorQueue = new CallQueue( + new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics); } else { - executorQueue = new SynchronousQueue<Runnable>(); + executorQueue = new CallQueue( + new SynchronousQueue<Call>(), metrics); } ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java?rev=1245287&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java Fri Feb 17 01:56:08 2012 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.thrift.generated.Hbase; + + +/** + * Converts a Hbase.Iface using InvocationHandler so that it reports process + * time of each call to ThriftMetrics. + */ +public class HbaseHandlerMetricsProxy implements InvocationHandler { + + public static final Log LOG = LogFactory.getLog( + HbaseHandlerMetricsProxy.class); + + private final Hbase.Iface handler; + private final ThriftMetrics metrics; + + public static Hbase.Iface newInstance(Hbase.Iface handler, + ThriftMetrics metrics, + Configuration conf) { + return (Hbase.Iface) Proxy.newProxyInstance( + handler.getClass().getClassLoader(), + handler.getClass().getInterfaces(), + new HbaseHandlerMetricsProxy(handler, metrics, conf)); + } + + private HbaseHandlerMetricsProxy( + Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) { + this.handler = handler; + this.metrics = metrics; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) + throws Throwable { + Object result; + try { + long start = now(); + result = m.invoke(handler, args); + int processTime = (int)(now() - start); + metrics.incMethodTime(m.getName(), processTime); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException( + "unexpected invocation exception: " + e.getMessage()); + } + return result; + } + + private static long now() { + return System.nanoTime(); + } +} Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1245287&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java Fri Feb 17 01:56:08 2012 @@ -0,0 +1,131 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.lang.reflect.Method; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsIntValue; +import org.apache.hadoop.metrics.util.MetricsRegistry; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; + +/** + * This class is for maintaining the various statistics of thrift server + * and publishing them through the metrics interfaces. + */ +public class ThriftMetrics implements Updater { + public final static Log LOG = LogFactory.getLog(ThriftMetrics.class); + public final static String CONTEXT_NAME = "thriftserver"; + + private final MetricsContext context; + private final MetricsRecord metricsRecord; + private final MetricsRegistry registry = new MetricsRegistry(); + private final long slowResponseTime; + public static final String SLOW_RESPONSE_NANO_SEC = + "hbase.thrift.slow.response.nano.second"; + public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000; + + private final MetricsIntValue callQueueLen = + new MetricsIntValue("callQueueLen", registry); + private final MetricsTimeVaryingRate timeInQueue = + new MetricsTimeVaryingRate("timeInQueue", registry); + private MetricsTimeVaryingRate thriftCall = + new MetricsTimeVaryingRate("thriftCall", registry); + private MetricsTimeVaryingRate slowThriftCall = + new MetricsTimeVaryingRate("slowThriftCall", registry); + + public ThriftMetrics(int port, Configuration conf) { + slowResponseTime = conf.getLong( + SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC); + context = MetricsUtil.getContext(CONTEXT_NAME); + metricsRecord = MetricsUtil.createRecord(context, CONTEXT_NAME); + + metricsRecord.setTag("port", port + ""); + + LOG.info("Initializing RPC Metrics with port=" + port); + + context.registerUpdater(this); + + createMetricsForMethods(Hbase.Iface.class); + } + + public void incTimeInQueue(long time) { + timeInQueue.inc(time); + } + + public void setCallQueueLen(int len) { + callQueueLen.set(len); + } + + public void incMethodTime(String name, int time) { + MetricsTimeVaryingRate methodTimeMetrc = getMethodTimeMetrics(name); + if (methodTimeMetrc == null) { + LOG.warn( + "Got incMethodTime() request for method that doesnt exist: " + name); + return; // ignore methods that dont exist. + } + + // inc method specific processTime + methodTimeMetrc.inc(time); + + // inc general processTime + thriftCall.inc(time); + if (time > slowResponseTime) { + slowThriftCall.inc(time); + } + } + + private void createMetricsForMethods(Class<?> iface) { + for (Method m : iface.getDeclaredMethods()) { + if (getMethodTimeMetrics(m.getName()) == null) + LOG.debug("Creating metrics for method:" + m.getName()); + createMethodTimeMetrics(m.getName()); + } + } + + private MetricsTimeVaryingRate getMethodTimeMetrics(String key) { + return (MetricsTimeVaryingRate) registry.get(key); + } + + private MetricsTimeVaryingRate createMethodTimeMetrics(String key) { + return new MetricsTimeVaryingRate(key, this.registry); + } + + /** + * Push the metrics to the monitoring subsystem on doUpdate() call. + */ + public void doUpdates(final MetricsContext context) { + // getMetricsList() and pushMetric() are thread safe methods + for (MetricsBase m : registry.getMetricsList()) { + m.pushMetric(metricsRecord); + } + metricsRecord.update(); + } +} Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1245287&r1=1245286&r2=1245287&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Fri Feb 17 01:56:08 2012 @@ -1049,7 +1049,7 @@ public class ThriftServer { System.exit(exitCode); } - private static final String DEFAULT_LISTEN_PORT = "9090"; + static final String DEFAULT_LISTEN_PORT = "9090"; /* * Start up the Thrift server. @@ -1134,7 +1134,9 @@ public class ThriftServer { protocolFactory = new TBinaryProtocol.Factory(); } - HBaseHandler handler = new HBaseHandler(conf); + ThriftMetrics metrics = new ThriftMetrics(listenPort, conf); + Hbase.Iface handler = new HBaseHandler(conf); + handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); Hbase.Processor processor = new Hbase.Processor(handler); TServer server; @@ -1188,7 +1190,7 @@ public class ThriftServer { + serverOptions.maxQueuedRequests); server = new HBaseThreadPoolServer(processor, serverTransport, - transportFactory, protocolFactory, serverOptions); + transportFactory, protocolFactory, serverOptions, metrics); if (server.getClass() != THREAD_POOL_SERVER_CLASS) { // A sanity check that we instantiated the right thing. Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java?rev=1245287&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java Fri Feb 17 01:56:08 2012 @@ -0,0 +1,134 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.thrift; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.thrift.CallQueue.Call; +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.spi.NoEmitMetricsContext; +import org.apache.hadoop.metrics.spi.OutputRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Unit testing for CallQueue, a part of the + * org.apache.hadoop.hbase.thrift package. + */ +@RunWith(Parameterized.class) +public class TestCallQueue { + + public static final Log LOG = LogFactory.getLog(TestCallQueue.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private int elementsAdded; + private int elementsRemoved; + + @Parameters + public static Collection<Object[]> getParameters() { + Collection<Object[]> parameters = new ArrayList<Object[]>(); + for (int elementsAdded : new int[] {100, 200, 300}) { + for (int elementsRemoved : new int[] {0, 20, 100}) { + parameters.add(new Object[]{new Integer(elementsAdded), + new Integer(elementsRemoved)}); + } + } + return parameters; + } + + public TestCallQueue(int elementsAdded, int elementsRemoved) { + this.elementsAdded = elementsAdded; + this.elementsRemoved = elementsRemoved; + LOG.debug("elementsAdded:" + elementsAdded + + " elementsRemoved:" + elementsRemoved); + } + + @Test(timeout=3000) + public void testPutTake() throws Exception { + ThriftMetrics metrics = createMetrics(); + CallQueue callQueue = new CallQueue( + new LinkedBlockingQueue<Call>(), metrics); + for (int i = 0; i < elementsAdded; ++i) { + callQueue.put(createDummyRunnable()); + } + for (int i = 0; i < elementsRemoved; ++i) { + callQueue.take(); + } + verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); + } + + @Test(timeout=3000) + public void testOfferPoll() throws Exception { + ThriftMetrics metrics = createMetrics(); + CallQueue callQueue = new CallQueue( + new LinkedBlockingQueue<Call>(), metrics); + for (int i = 0; i < elementsAdded; ++i) { + callQueue.offer(createDummyRunnable()); + } + for (int i = 0; i < elementsRemoved; ++i) { + callQueue.poll(); + } + verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); + } + + private static ThriftMetrics createMetrics() throws Exception { + setupMetricsContext(); + Configuration conf = UTIL.getConfiguration(); + return new ThriftMetrics( + Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT), conf); + } + + private static void setupMetricsContext() throws Exception { + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class", + NoEmitMetricsContext.class.getName()); + MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME) + .createRecord(ThriftMetrics.CONTEXT_NAME).remove(); + } + + private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue) + throws Exception { + MetricsContext context = MetricsUtil.getContext( + ThriftMetrics.CONTEXT_NAME); + metrics.doUpdates(context); + OutputRecord record = context.getAllRecords().get( + ThriftMetrics.CONTEXT_NAME).iterator().next(); + assertEquals(expectValue, record.getMetric(name).intValue()); + } + + private static Runnable createDummyRunnable() { + return new Runnable() { + @Override + public void run() { + } + }; + } +} Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java?rev=1245287&r1=1245286&r2=1245287&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java Fri Feb 17 01:56:08 2012 @@ -19,16 +19,24 @@ */ package org.apache.hadoop.hbase.thrift; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; +import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.thrift.generated.TCell; import org.apache.hadoop.hbase.thrift.generated.TRowResult; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.metrics.ContextFactory; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.spi.NoEmitMetricsContext; +import org.apache.hadoop.metrics.spi.OutputRecord; /** * Unit testing for ThriftServer.HBaseHandler, a part of the @@ -59,6 +67,7 @@ public class TestThriftServer extends HB public void testAll() throws Exception { // Run all tests doTestTableCreateDrop(); + doTestThriftMetrics(); doTestTableMutations(); doTestTableTimestampsAndColumns(); doTestTableScanners(); @@ -97,6 +106,54 @@ public class TestThriftServer extends HB } /** + * Tests if the metrics for thrift handler work correctly + */ + public void doTestThriftMetrics() throws Exception { + ThriftMetrics metrics = getMetrics(conf); + Hbase.Iface handler = getHandler(metrics, conf); + handler.createTable(tableAname, getColumnDescriptors()); + handler.disableTable(tableAname); + handler.deleteTable(tableAname); + handler.createTable(tableBname, getColumnDescriptors()); + handler.disableTable(tableBname); + handler.deleteTable(tableBname); + verifyMetrics(metrics, "createTable_num_ops", 2); + verifyMetrics(metrics, "deleteTable_num_ops", 2); + verifyMetrics(metrics, "disableTable_num_ops", 2); + } + + private static Hbase.Iface getHandler(ThriftMetrics metrics, Configuration conf) + throws Exception { + Hbase.Iface handler = + new ThriftServer.HBaseHandler(conf); + return HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); + } + + private static ThriftMetrics getMetrics(Configuration conf) throws Exception { + setupMetricsContext(); + return new ThriftMetrics( + Integer.parseInt(ThriftServer.DEFAULT_LISTEN_PORT), conf); + } + + private static void setupMetricsContext() throws IOException { + ContextFactory factory = ContextFactory.getFactory(); + factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class", + NoEmitMetricsContext.class.getName()); + MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME) + .createRecord(ThriftMetrics.CONTEXT_NAME).remove(); + } + + private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue) + throws Exception { + MetricsContext context = MetricsUtil.getContext( + ThriftMetrics.CONTEXT_NAME); + metrics.doUpdates(context); + OutputRecord record = context.getAllRecords().get( + ThriftMetrics.CONTEXT_NAME).iterator().next(); + assertEquals(expectValue, record.getMetric(name).intValue()); + } + + /** * Tests adding a series of Mutations and BatchMutations, including a * delete mutation. Also tests data retrieval, and getting back multiple * versions.
