Author: tedyu
Date: Thu Feb 2 17:30:13 2012
New Revision: 1239737
URL: http://svn.apache.org/viewvc?rev=1239737&view=rev
Log:
HBASE-5186 Add metrics to ThriftServer (Scott Chen)
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1239737&r1=1239736&r2=1239737&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
Thu Feb 2 17:30:13 2012
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.thrift.ge
public class HRegionThriftServer extends Thread {
public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
- public static final int DEFAULT_LISTEN_PORT = 9090;
private final HRegionServer rs;
private final ThriftServerRunner serverRunner;
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java?rev=1239737&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/CallQueue.java Thu
Feb 2 17:30:13 2012
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A BlockingQueue reports waiting time in queue and queue length to
+ * ThriftMetrics.
+ */
+public class CallQueue implements BlockingQueue<Runnable> {
+ private static Log LOG = LogFactory.getLog(CallQueue.class);
+
+ private final BlockingQueue<Call> underlyingQueue;
+ private final ThriftMetrics metrics;
+
+ public CallQueue(BlockingQueue<Call> underlyingQueue,
+ ThriftMetrics metrics) {
+ this.underlyingQueue = underlyingQueue;
+ this.metrics = metrics;
+ }
+
+ private static long now() {
+ return System.nanoTime();
+ }
+
+ public static class Call implements Runnable {
+ final long startTime;
+ final Runnable underlyingRunnable;
+
+ Call(Runnable underlyingRunnable) {
+ this.underlyingRunnable = underlyingRunnable;
+ this.startTime = now();
+ }
+
+ @Override
+ public void run() {
+ underlyingRunnable.run();
+ }
+
+ public long timeInQueue() {
+ return now() - startTime;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof Call) {
+ Call otherCall = (Call)(other);
+ return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
+ } else if (other instanceof Runnable) {
+ return this.underlyingRunnable.equals(other);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.underlyingRunnable.hashCode();
+ }
+ }
+
+ @Override
+ public Runnable poll() {
+ Call result = underlyingQueue.poll();
+ updateMetrics(result);
+ return result;
+ }
+
+ private void updateMetrics(Call result) {
+ if (result == null) {
+ return;
+ }
+ metrics.incTimeInQueue(result.timeInQueue());
+ metrics.setCallQueueLen(this.size());
+ }
+
+ @Override
+ public Runnable poll(long timeout, TimeUnit unit) throws
InterruptedException {
+ Call result = underlyingQueue.poll(timeout, unit);
+ updateMetrics(result);
+ return result;
+ }
+
+ @Override
+ public Runnable remove() {
+ Call result = underlyingQueue.remove();
+ updateMetrics(result);
+ return result;
+ }
+
+ @Override
+ public Runnable take() throws InterruptedException {
+ Call result = underlyingQueue.take();
+ updateMetrics(result);
+ return result;
+ }
+
+ @Override
+ public int drainTo(Collection<? super Runnable> destination) {
+ return drainTo(destination, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public int drainTo(Collection<? super Runnable> destination,
+ int maxElements) {
+ if (destination == this) {
+ throw new IllegalArgumentException(
+ "A BlockingQueue cannot drain to itself.");
+ }
+ List<Call> drained = new ArrayList<Call>();
+ underlyingQueue.drainTo(drained, maxElements);
+ for (Call r : drained) {
+ updateMetrics(r);
+ }
+ destination.addAll(drained);
+ int sz = drained.size();
+ LOG.info("Elements drained: " + sz);
+ return sz;
+ }
+
+
+ @Override
+ public boolean offer(Runnable element) {
+ return underlyingQueue.offer(new Call(element));
+ }
+
+ @Override
+ public boolean offer(Runnable element, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return underlyingQueue.offer(new Call(element), timeout, unit);
+ }
+ @Override
+ public void put(Runnable element) throws InterruptedException {
+ underlyingQueue.put(new Call(element));
+ }
+
+ @Override
+ public boolean add(Runnable element) {
+ return underlyingQueue.add(new Call(element));
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Runnable> elements) {
+ int added = 0;
+ for (Runnable r : elements) {
+ added += underlyingQueue.add(new Call(r)) ? 1 : 0;
+ }
+ return added != 0;
+ }
+
+ @Override
+ public Runnable element() {
+ return underlyingQueue.element();
+ }
+
+ @Override
+ public Runnable peek() {
+ return underlyingQueue.peek();
+ }
+
+ @Override
+ public void clear() {
+ underlyingQueue.clear();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> elements) {
+ return underlyingQueue.containsAll(elements);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return underlyingQueue.isEmpty();
+ }
+
+ @Override
+ public Iterator<Runnable> iterator() {
+ return new Iterator<Runnable>() {
+ final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
+ @Override
+ public Runnable next() {
+ return underlyingIterator.next();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return underlyingIterator.hasNext();
+ }
+
+ @Override
+ public void remove() {
+ underlyingIterator.remove();
+ }
+ };
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> elements) {
+ return underlyingQueue.removeAll(elements);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> elements) {
+ return underlyingQueue.retainAll(elements);
+ }
+
+ @Override
+ public int size() {
+ return underlyingQueue.size();
+ }
+
+ @Override
+ public Object[] toArray() {
+ return underlyingQueue.toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] array) {
+ return underlyingQueue.toArray(array);
+ }
+
+ @Override
+ public boolean contains(Object element) {
+ return underlyingQueue.contains(element);
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return underlyingQueue.remainingCapacity();
+ }
+
+ @Override
+ public boolean remove(Object element) {
+ return underlyingQueue.remove(element);
+ }
+}
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java?rev=1239737&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
Thu Feb 2 17:30:13 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+
+
+/**
+ * Converts a Hbase.Iface using InvocationHandler so that it reports process
+ * time of each call to ThriftMetrics.
+ */
+public class HbaseHandlerMetricsProxy implements InvocationHandler {
+
+ public static final Log LOG = LogFactory.getLog(
+ HbaseHandlerMetricsProxy.class);
+
+ private final Hbase.Iface handler;
+ private final ThriftMetrics metrics;
+
+ public static Hbase.Iface newInstance(Hbase.Iface handler,
+ ThriftMetrics metrics,
+ Configuration conf) {
+ return (Hbase.Iface) Proxy.newProxyInstance(
+ handler.getClass().getClassLoader(),
+ handler.getClass().getInterfaces(),
+ new HbaseHandlerMetricsProxy(handler, metrics, conf));
+ }
+
+ private HbaseHandlerMetricsProxy(
+ Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) {
+ this.handler = handler;
+ this.metrics = metrics;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method m, Object[] args)
+ throws Throwable {
+ Object result;
+ try {
+ long start = now();
+ result = m.invoke(handler, args);
+ int processTime = (int)(now() - start);
+ metrics.incMethodTime(m.getName(), processTime);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "unexpected invocation exception: " + e.getMessage());
+ }
+ return result;
+ }
+
+ private static long now() {
+ return System.nanoTime();
+ }
+}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java?rev=1239737&r1=1239736&r2=1239737&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
Thu Feb 2 17:30:13 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.thrift;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -29,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.thrift.CallQueue.Call;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -103,6 +103,8 @@ public class TBoundedThreadPoolServer ex
private static final Log LOG = LogFactory.getLog(
TBoundedThreadPoolServer.class.getName());
+ private final CallQueue callQueue;
+
public static class Args extends TThreadPoolServer.Args {
int maxQueuedRequests;
int threadKeepAliveTimeSec;
@@ -135,15 +137,14 @@ public class TBoundedThreadPoolServer ex
private Args serverOptions;
- public TBoundedThreadPoolServer(Args options) {
+ public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
super(options);
- BlockingQueue<Runnable> executorQueue;
if (options.maxQueuedRequests > 0) {
- executorQueue = new LinkedBlockingQueue<Runnable>(
- options.maxQueuedRequests);
+ this.callQueue = new CallQueue(
+ new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
} else {
- executorQueue = new SynchronousQueue<Runnable>();
+ this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
}
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
@@ -151,11 +152,18 @@ public class TBoundedThreadPoolServer ex
tfb.setNameFormat("thrift-worker-%d");
executorService =
new ThreadPoolExecutor(options.minWorkerThreads,
- options.maxWorkerThreads, options.threadKeepAliveTimeSec,
TimeUnit.SECONDS,
- executorQueue, tfb.build());
+ options.maxWorkerThreads, options.threadKeepAliveTimeSec,
+ TimeUnit.SECONDS, this.callQueue, tfb.build());
serverOptions = options;
}
+ /**
+ * Return the server working queue
+ */
+ public CallQueue getCallQueue() {
+ return this.callQueue;
+ }
+
public void serve() {
try {
serverTransport_.listen();
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1239737&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
Thu Feb 2 17:30:13 2012
@@ -0,0 +1,131 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * This class is for maintaining the various statistics of thrift server
+ * and publishing them through the metrics interfaces.
+ */
+public class ThriftMetrics implements Updater {
+ public final static Log LOG = LogFactory.getLog(ThriftMetrics.class);
+ public final static String CONTEXT_NAME = "thriftserver";
+
+ private final MetricsContext context;
+ private final MetricsRecord metricsRecord;
+ private final MetricsRegistry registry = new MetricsRegistry();
+ private final long slowResponseTime;
+ public static final String SLOW_RESPONSE_NANO_SEC =
+ "hbase.thrift.slow.response.nano.second";
+ public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000;
+
+ private final MetricsIntValue callQueueLen =
+ new MetricsIntValue("callQueueLen", registry);
+ private final MetricsTimeVaryingRate timeInQueue =
+ new MetricsTimeVaryingRate("timeInQueue", registry);
+ private MetricsTimeVaryingRate thriftCall =
+ new MetricsTimeVaryingRate("thriftCall", registry);
+ private MetricsTimeVaryingRate slowThriftCall =
+ new MetricsTimeVaryingRate("slowThriftCall", registry);
+
+ public ThriftMetrics(int port, Configuration conf) {
+ slowResponseTime = conf.getLong(
+ SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
+ context = MetricsUtil.getContext(CONTEXT_NAME);
+ metricsRecord = MetricsUtil.createRecord(context, CONTEXT_NAME);
+
+ metricsRecord.setTag("port", port + "");
+
+ LOG.info("Initializing RPC Metrics with port=" + port);
+
+ context.registerUpdater(this);
+
+ createMetricsForMethods(Hbase.Iface.class);
+ }
+
+ public void incTimeInQueue(long time) {
+ timeInQueue.inc(time);
+ }
+
+ public void setCallQueueLen(int len) {
+ callQueueLen.set(len);
+ }
+
+ public void incMethodTime(String name, int time) {
+ MetricsTimeVaryingRate methodTimeMetrc = getMethodTimeMetrics(name);
+ if (methodTimeMetrc == null) {
+ LOG.warn(
+ "Got incMethodTime() request for method that doesnt exist: " + name);
+ return; // ignore methods that dont exist.
+ }
+
+ // inc method specific processTime
+ methodTimeMetrc.inc(time);
+
+ // inc general processTime
+ thriftCall.inc(time);
+ if (time > slowResponseTime) {
+ slowThriftCall.inc(time);
+ }
+ }
+
+ private void createMetricsForMethods(Class<?> iface) {
+ for (Method m : iface.getDeclaredMethods()) {
+ if (getMethodTimeMetrics(m.getName()) == null)
+ LOG.debug("Creating metrics for method:" + m.getName());
+ createMethodTimeMetrics(m.getName());
+ }
+ }
+
+ private MetricsTimeVaryingRate getMethodTimeMetrics(String key) {
+ return (MetricsTimeVaryingRate) registry.get(key);
+ }
+
+ private MetricsTimeVaryingRate createMethodTimeMetrics(String key) {
+ return new MetricsTimeVaryingRate(key, this.registry);
+ }
+
+ /**
+ * Push the metrics to the monitoring subsystem on doUpdate() call.
+ */
+ public void doUpdates(final MetricsContext context) {
+ // getMetricsList() and pushMetric() are thread safe methods
+ for (MetricsBase m : registry.getMetricsList()) {
+ m.pushMetric(metricsRecord);
+ }
+ metricsRecord.update();
+ }
+}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1239737&r1=1239736&r2=1239737&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Thu Feb 2 17:30:13 2012
@@ -31,6 +31,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -57,12 +62,13 @@ import org.apache.hadoop.hbase.filter.Pa
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.thrift.CallQueue.Call;
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
-import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
@@ -89,6 +95,7 @@ import org.apache.thrift.transport.TServ
import org.apache.thrift.transport.TTransportFactory;
import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* ThriftServerRunner - this class starts up a Thrift server which implements
@@ -107,11 +114,14 @@ public class ThriftServerRunner implemen
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
- private static final int DEFAULT_LISTEN_PORT = 9090;
+ public static final int DEFAULT_LISTEN_PORT = 9090;
+ private final int listenPort;
private Configuration conf;
volatile TServer tserver;
- private final HBaseHandler handler;
+ private final Hbase.Iface handler;
+ private final ThriftMetrics metrics;
+ private CallQueue callQueue;
/** An enum of server implementation selections */
enum ImplType {
@@ -214,7 +224,10 @@ public class ThriftServerRunner implemen
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
this.conf = HBaseConfiguration.create(conf);
- this.handler = handler;
+ this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+ this.metrics = new ThriftMetrics(listenPort, conf);
+ this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics,
conf);
+
}
/*
@@ -243,9 +256,6 @@ public class ThriftServerRunner implemen
* Setting up the thrift TServer
*/
private void setupServer() throws Exception {
- // Get port to bind to
- int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
-
// Construct correct ProtocolFactory
TProtocolFactory protocolFactory;
if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
@@ -293,14 +303,22 @@ public class ThriftServerRunner implemen
tserver = new TNonblockingServer(serverArgs);
} else if (implType == ImplType.HS_HA) {
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
- serverArgs.processor(processor)
+ this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(),
metrics);
+ ExecutorService executorService = createExecutor(
+ this.callQueue, serverArgs.getWorkerThreads());
+ serverArgs.executorService(executorService)
+ .processor(processor)
.transportFactory(transportFactory)
.protocolFactory(protocolFactory);
tserver = new THsHaServer(serverArgs);
} else { // THREADED_SELECTOR
TThreadedSelectorServer.Args serverArgs =
new HThreadedSelectorServerArgs(serverTransport, conf);
- serverArgs.processor(processor)
+ this.callQueue = new CallQueue(new LinkedBlockingQueue<Call>(),
metrics);
+ ExecutorService executorService = createExecutor(
+ this.callQueue, serverArgs.getWorkerThreads());
+ serverArgs.executorService(executorService)
+ .processor(processor)
.transportFactory(transportFactory)
.protocolFactory(protocolFactory);
tserver = new TThreadedSelectorServer(serverArgs);
@@ -322,7 +340,10 @@ public class ThriftServerRunner implemen
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+ listenAddress + ":" + Integer.toString(listenPort)
+ "; " + serverArgs);
- tserver = new TBoundedThreadPoolServer(serverArgs);
+ TBoundedThreadPoolServer tserver =
+ new TBoundedThreadPoolServer(serverArgs, metrics);
+ this.callQueue = tserver.getCallQueue();
+ this.tserver = tserver;
} else {
throw new AssertionError("Unsupported Thrift server implementation: " +
implType.simpleClassName());
@@ -336,7 +357,6 @@ public class ThriftServerRunner implemen
}
// login the server principal (if using secure Hadoop)
- Configuration conf = handler.conf;
if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
String machineName = Strings.domainNamePointerToHostName(
DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
@@ -346,12 +366,21 @@ public class ThriftServerRunner implemen
}
}
+ ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
+ int workerThreads) {
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+ tfb.setDaemon(true);
+ tfb.setNameFormat("thrift-worker-%d");
+ return new ThreadPoolExecutor(workerThreads, workerThreads,
+ Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
+ }
+
private InetAddress getBindAddress(Configuration conf)
throws UnknownHostException {
String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
return InetAddress.getByName(bindAddressStr);
}
-
+
/**
* The HBaseHandler is a glue object that connects Thrift RPC calls to the
* HBase client API primarily defined in the HBaseAdmin and HTable objects.
@@ -456,8 +485,7 @@ public class ThriftServerRunner implemen
this(HBaseConfiguration.create());
}
- protected HBaseHandler(final Configuration c)
- throws IOException {
+ protected HBaseHandler(final Configuration c) throws IOException {
this.conf = c;
admin = new HBaseAdmin(conf);
scannerMap = new HashMap<Integer, ResultScanner>();
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java?rev=1239737&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
Thu Feb 2 17:30:13 2012
@@ -0,0 +1,142 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.thrift.CallQueue.Call;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.Test;
+
+/**
+ * Unit testing for CallQueue, a part of the
+ * org.apache.hadoop.hbase.thrift package.
+ */
+@Category(SmallTests.class)
+@RunWith(Parameterized.class)
+public class TestCallQueue {
+
+ public static final Log LOG = LogFactory.getLog(TestCallQueue.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private int elementsAdded;
+ private int elementsRemoved;
+
+ @Parameters
+ public static Collection<Object[]> getParameters() {
+ Collection<Object[]> parameters = new ArrayList<Object[]>();
+ for (int elementsAdded : new int[] {100, 200, 300}) {
+ for (int elementsRemoved : new int[] {0, 20, 100}) {
+ parameters.add(new Object[]{new Integer(elementsAdded),
+ new Integer(elementsRemoved)});
+ }
+ }
+ return parameters;
+ }
+
+ public TestCallQueue(int elementsAdded, int elementsRemoved) {
+ this.elementsAdded = elementsAdded;
+ this.elementsRemoved = elementsRemoved;
+ LOG.debug("elementsAdded:" + elementsAdded +
+ " elementsRemoved:" + elementsRemoved);
+ }
+
+ @Test(timeout=3000)
+ public void testPutTake() throws Exception {
+ ThriftMetrics metrics = createMetrics();
+ CallQueue callQueue = new CallQueue(
+ new LinkedBlockingQueue<Call>(), metrics);
+ for (int i = 0; i < elementsAdded; ++i) {
+ callQueue.put(createDummyRunnable());
+ }
+ for (int i = 0; i < elementsRemoved; ++i) {
+ callQueue.take();
+ }
+ verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
+ }
+
+ @Test(timeout=3000)
+ public void testOfferPoll() throws Exception {
+ ThriftMetrics metrics = createMetrics();
+ CallQueue callQueue = new CallQueue(
+ new LinkedBlockingQueue<Call>(), metrics);
+ for (int i = 0; i < elementsAdded; ++i) {
+ callQueue.offer(createDummyRunnable());
+ }
+ for (int i = 0; i < elementsRemoved; ++i) {
+ callQueue.poll();
+ }
+ verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
+ }
+
+ private static ThriftMetrics createMetrics() throws Exception {
+ setupMetricsContext();
+ Configuration conf = UTIL.getConfiguration();
+ return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
+ }
+
+ private static void setupMetricsContext() throws Exception {
+ ContextFactory factory = ContextFactory.getFactory();
+ factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
+ NoEmitMetricsContext.class.getName());
+ MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
+ .createRecord(ThriftMetrics.CONTEXT_NAME).remove();
+ }
+
+ private static void verifyMetrics(ThriftMetrics metrics, String name, int
expectValue)
+ throws Exception {
+ MetricsContext context = MetricsUtil.getContext(
+ ThriftMetrics.CONTEXT_NAME);
+ metrics.doUpdates(context);
+ OutputRecord record = context.getAllRecords().get(
+ ThriftMetrics.CONTEXT_NAME).iterator().next();
+ assertEquals(expectValue, record.getMetric(name).intValue());
+ }
+
+ private static Runnable createDummyRunnable() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
+
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java?rev=1239737&r1=1239736&r2=1239737&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
Thu Feb 2 17:30:13 2012
@@ -19,14 +19,20 @@
*/
package org.apache.hadoop.hbase.thrift;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Collection;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.MediumTests;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+import org.junit.BeforeClass;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
@@ -34,10 +40,15 @@ import org.apache.hadoop.hbase.thrift.ge
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.conf.Configuration;
/**
* Unit testing for ThriftServerRunner.HBaseHandler, a part of the
@@ -82,9 +93,11 @@ public class TestThriftServer {
*
* @throws Exception
*/
+ @Test
public void testAll() throws Exception {
// Run all tests
doTestTableCreateDrop();
+ doTestThriftMetrics();
doTestTableMutations();
doTestTableTimestampsAndColumns();
doTestTableScanners();
@@ -98,7 +111,6 @@ public class TestThriftServer {
*
* @throws Exception
*/
- @Test
public void doTestTableCreateDrop() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
@@ -106,6 +118,50 @@ public class TestThriftServer {
dropTestTables(handler);
}
+ /**
+ * Tests if the metrics for thrift handler work correctly
+ */
+ public void doTestThriftMetrics() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ ThriftMetrics metrics = getMetrics(conf);
+ Hbase.Iface handler = getHandler(metrics, conf);
+ createTestTables(handler);
+ dropTestTables(handler);
+ verifyMetrics(metrics, "createTable_num_ops", 2);
+ verifyMetrics(metrics, "deleteTable_num_ops", 2);
+ verifyMetrics(metrics, "disableTable_num_ops", 2);
+ }
+
+ private static Hbase.Iface getHandler(ThriftMetrics metrics, Configuration
conf)
+ throws Exception {
+ Hbase.Iface handler =
+ new ThriftServerRunner.HBaseHandler(conf);
+ return HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
+ }
+
+ private static ThriftMetrics getMetrics(Configuration conf) throws Exception
{
+ setupMetricsContext();
+ return new ThriftMetrics(ThriftServerRunner.DEFAULT_LISTEN_PORT, conf);
+ }
+
+ private static void setupMetricsContext() throws IOException {
+ ContextFactory factory = ContextFactory.getFactory();
+ factory.setAttribute(ThriftMetrics.CONTEXT_NAME + ".class",
+ NoEmitMetricsContext.class.getName());
+ MetricsUtil.getContext(ThriftMetrics.CONTEXT_NAME)
+ .createRecord(ThriftMetrics.CONTEXT_NAME).remove();
+ }
+
+ private static void verifyMetrics(ThriftMetrics metrics, String name, int
expectValue)
+ throws Exception {
+ MetricsContext context = MetricsUtil.getContext(
+ ThriftMetrics.CONTEXT_NAME);
+ metrics.doUpdates(context);
+ OutputRecord record = context.getAllRecords().get(
+ ThriftMetrics.CONTEXT_NAME).iterator().next();
+ assertEquals(expectValue, record.getMetric(name).intValue());
+ }
+
public static void createTestTables(Hbase.Iface handler) throws Exception {
// Create/enable/disable/delete tables, ensure methods act correctly
assertEquals(handler.getTableNames().size(), 0);