Author: liyin Date: Wed Apr 2 20:49:24 2014 New Revision: 1584168 URL: http://svn.apache.org/r1584168 Log: [master] Let async API return client back to pool after the call finishes
Author: fan Summary: Because nifty async client is not thread safe, we cannot multiplex connection :( Test Plan: TestSimpleOperations, TestHeaderSendReceive. Benchmark completes without problem now. Reviewers: adela, manukranthk, gauravm, daviddeng Reviewed By: manukranthk CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D1228338 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1584168&r1=1584167&r2=1584168&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Wed Apr 2 20:49:24 2014 @@ -135,9 +135,7 @@ public abstract class ServerCallable<T> ((TableServers)connection).handleThrowable(t, this, couldNotCommunicateWithServer); } - public void readHeader() { - if (server instanceof HBaseToThriftAdapter) { - ((HBaseToThriftAdapter)server).readHeader(); - } + public void postProcess() { + ((HBaseToThriftAdapter)server).postProcess(); } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1584168&r1=1584167&r2=1584168&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Wed Apr 2 20:49:24 2014 @@ -203,10 +203,7 @@ public class HBaseToThriftAdapter implem * Read data that the server has sent to the client * TODO: test how it works with async calls */ - public void readHeader() { - if (clientManager == null) { - return; - } + private void readHeader() { TTransport inputTransport = clientManager.getInputProtocol(connection) .getTransport(); TTransport outputTransport = clientManager.getOutputProtocol(connection) @@ -229,25 +226,19 @@ public class HBaseToThriftAdapter implem } } - private void postProcess() { - try { - if (this.useHeaderProtocol) { - readHeader(); + public void postProcess() { + if (this.clientManager != null && this.connection != null) { + try { + if (this.useHeaderProtocol) { + readHeader(); + } + HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz); + } catch (Exception e) { + throw new RuntimeException(e); } - HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz); - this.connection = null; - this.clientManager = null; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void putBackClient() { - try { - HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz); - } catch (Exception e) { - throw new RuntimeException(e); } + this.connection = null; + this.clientManager = null; } private void handleIOException(Exception e) throws IOException { @@ -357,11 +348,7 @@ public class HBaseToThriftAdapter implem public ListenableFuture<Result> getClosestRowBeforeAsync(byte[] regionName, byte[] row, byte[] family) { preProcess(); - try { - return connection.getClosestRowBeforeAsync(regionName, row, family); - } finally { - putBackClient(); - } + return connection.getClosestRowBeforeAsync(regionName, row, family); } // TODO: we will decide whether to remove it from HRegionInterface in the future @@ -582,11 +569,7 @@ public class HBaseToThriftAdapter implem public ListenableFuture<Result> getAsync(byte[] regionName, Get get) { preProcess(); - try { - return connection.getAsync(regionName, get); - } finally { - putBackClient(); - } + return connection.getAsync(regionName, get); } @Override @@ -657,11 +640,7 @@ public class HBaseToThriftAdapter implem public ListenableFuture<Void> deleteAsync(final byte[] regionName, final Delete delete) { preProcess(); - try { - return connection.deleteAsync(regionName, delete); - } finally { - putBackClient(); - } + return connection.deleteAsync(regionName, delete); } @Override @@ -782,8 +761,6 @@ public class HBaseToThriftAdapter implem return connection.mutateRowAsync(regionName, TRowMutations.Builder.createFromRowMutations(arm)); } catch (IOException e) { return Futures.immediateFailedFuture(e); - } finally { - putBackClient(); } } @@ -903,11 +880,7 @@ public class HBaseToThriftAdapter implem public ListenableFuture<RowLock> lockRowAsync(byte[] regionName, byte[] row) { preProcess(); - try { - return connection.lockRowAsync(regionName, row); - } finally { - putBackClient(); - } + return connection.lockRowAsync(regionName, row); } @Override @@ -929,11 +902,7 @@ public class HBaseToThriftAdapter implem public ListenableFuture<Void> unlockRowAsync(byte[] regionName, long lockId) { preProcess(); - try { - return connection.unlockRowAsync(regionName, lockId); - } finally { - putBackClient(); - } + return connection.unlockRowAsync(regionName, lockId); } @Override Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java?rev=1584168&r1=1584167&r2=1584168&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java Wed Apr 2 20:49:24 2014 @@ -55,7 +55,7 @@ public class ThriftClientCacheWithConnec public static final String MIN_IDLE = "hbase.client.cachepool.minIdle"; private final ThriftClientManager clientManager; - private final Map<Pair<InetSocketAddress, + private final ConcurrentHashMap<Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>, GenericObjectPool<ThriftClientInterface>> clientPools; @@ -75,12 +75,10 @@ public class ThriftClientCacheWithConnec address, clazz); GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key); if (clientPool == null) { - synchronized (clientPools) { - clientPool = clientPools.get(key); - if (clientPool == null) { - clientPool = createGenericObjectPool(address, clazz); - clientPools.put(key, clientPool); - } + clientPool = createGenericObjectPool(address, clazz); + GenericObjectPool<ThriftClientInterface> existing = clientPools.putIfAbsent(key, clientPool); + if (existing != null) { + clientPool = existing; } } return clientPool.borrowObject(); @@ -100,10 +98,10 @@ public class ThriftClientCacheWithConnec ThriftClientObjectFactory factory = new ThriftClientObjectFactory(address, clazz, this.clientManager, this.conf); GenericObjectPool.Config config = new GenericObjectPool.Config(); - long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 1000; + long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 5000; long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 30000; long DEFAULT_WHEN_EXHAUSTED_MAX_WAITTIME = 1000; - int DEFAULT_MAX_ACTIVE = 2000; + int DEFAULT_MAX_ACTIVE = 1000; // Keep some idle connections to prevent asynchronous client from contention on // a single connection to each region server. Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java?rev=1584168&r1=1584167&r2=1584168&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java Wed Apr 2 20:49:24 2014 @@ -161,7 +161,7 @@ public class SelfRetryingListenableFutur * @param v Result from server */ private void setSuccess(V v) { - callable.readHeader(); + postProcess(); downstream.set(v); } @@ -171,9 +171,14 @@ public class SelfRetryingListenableFutur * @param t The exception for client */ private void setFailure(Throwable t) { + postProcess(); downstream.setException(t); } + private void postProcess() { + callable.postProcess(); + } + /** * Unwrap exception if it's from server side and handle all scenarios. * Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java?rev=1584168&r1=1584167&r2=1584168&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java Wed Apr 2 20:49:24 2014 @@ -492,8 +492,7 @@ public class HBaseRPCBenchmarkTool exten setProfilingData(false); } } catch (Exception e) { - LOG.debug("Encountered exception while performing get"); - e.printStackTrace(); + LOG.debug("Encountered exception while performing get", e); break; } long delta = System.nanoTime() - opStartNs; @@ -546,7 +545,7 @@ public class HBaseRPCBenchmarkTool exten } public double getAverageLatency() { - return this.sumLatency.get() / (double)this.totalOps.get(); + return this.sumLatency.get() * (double)this.multigetBatch / (double)this.totalOps.get(); } public double getP95Latency() { @@ -561,9 +560,10 @@ public class HBaseRPCBenchmarkTool exten HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool(); int ret = tool.doStaticMain(args); System.out.println("Total throughput : " + tool.getThroughput()); - System.out.println("Avg Latency : " + tool.getAverageLatency()); - System.out.println("P99 latency : " + tool.getP99Latency()); - System.out.println("P95 latency : " + tool.getP95Latency()); + System.out.println("Latencies in ms --"); + System.out.println(" Avg Latency : " + tool.getAverageLatency() / 1000000.0); + System.out.println(" P99 latency : " + tool.getP99Latency() / 1000000.0); + System.out.println(" P95 latency : " + tool.getP95Latency() / 1000000.0); System.exit(ret); } }
