Author: stack
Date: Tue Apr 19 18:32:11 2011
New Revision: 1095160
URL: http://svn.apache.org/viewvc?rev=1095160&view=rev
Log:
HBASE-2939 Allow Client-Side Connection Pooling
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Apr 19 18:32:11 2011
@@ -175,6 +175,7 @@ Release 0.91.0 - Unreleased
(Erik Onnen)
HBASE-3757 Upgrade to ZK 3.3.3
HBASE-3609 Improve the selection of regions to balance; part 2 (Ted Yu)
+ HBASE-2939 Allow Client-Side Connection Pooling (Karthik Sankarachary)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Apr
19 18:32:11 2011
@@ -134,6 +134,12 @@ public final class HConstants {
/** Parameter name for HBase instance root directory */
public static final String HBASE_DIR = "hbase.rootdir";
+ /** Parameter name for HBase client IPC pool type */
+ public static final String HBASE_CLIENT_IPC_POOL_TYPE =
"hbase.client.ipc.pool.type";
+
+ /** Parameter name for HBase client IPC pool size */
+ public static final String HBASE_CLIENT_IPC_POOL_SIZE =
"hbase.client.ipc.pool.size";
+
/** Used to construct the name of the log directory for a region server
* Use '.' as a special character to seperate the log files from table data
*/
public static final String HREGION_LOGDIR_NAME = ".logs";
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue
Apr 19 18:32:11 2011
@@ -20,22 +20,6 @@
package org.apache.hadoop.hbase.ipc;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import javax.net.SocketFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -50,10 +34,31 @@ import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
@@ -67,8 +72,7 @@ public class HBaseClient {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
- protected final Hashtable<ConnectionId, Connection> connections =
- new Hashtable<ConnectionId, Connection>();
+ protected final Map<ConnectionId, Connection> connections;
protected final Class<? extends Writable> valueClass; // class of call
values
protected int counter; // counter for call ids
@@ -689,6 +693,8 @@ public class HBaseClient {
this.conf = conf;
this.socketFactory = factory;
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
+ this.connections = new PoolMap<ConnectionId, Connection>(
+ getPoolType(conf), getPoolSize(conf));
}
/**
@@ -700,6 +706,30 @@ public class HBaseClient {
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
+ /**
+ * Return the pool type specified in the configuration, if it roughly equals
either
+ * the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal},
otherwise
+ * default to the former type.
+ *
+ * @param config configuration
+ * @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}
+ */
+ private static PoolType getPoolType(Configuration config) {
+ return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
+ PoolType.RoundRobin, PoolType.ThreadLocal);
+ }
+
+ /**
+ * Return the pool size specified in the configuration, otherwise the
maximum allowable
+ * size (which for all intents and purposes represents an unbounded pool).
+ *
+ * @param config
+ * @return the maximum pool size
+ */
+ private static int getPoolSize(Configuration config) {
+ return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
+ }
+
/** Return the socket factory of this client
*
* @return this client's socket factory
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
Tue Apr 19 18:32:11 2011
@@ -540,6 +540,33 @@ public class HBaseTestingUtility {
/**
* Create a table.
* @param tableName
+ * @param families
+ * @param c Configuration to use
+ * @param numVersions
+ * @return An HTable instance for the created table.
+ * @throws IOException
+ */
+ public HTable createTable(byte[] tableName, byte[][] families,
+ final Configuration c, int numVersions)
+ throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for(byte[] family : families) {
+ HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+ desc.addFamily(hcd);
+ }
+ getHBaseAdmin().createTable(desc);
+ return new HTable(c, tableName);
+ }
+
+ /**
+ * Create a table.
+ * @param tableName
* @param family
* @param numVersions
* @return An HTable instance for the created table.
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Tue Apr 19 18:32:11 2011
@@ -38,6 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -4032,6 +4035,116 @@ public class TestFromClientSide {
queue.put(new Object());
}
+ @Test
+ public void testClientPoolRoundRobin() throws IOException {
+ final byte[] tableName = Bytes.toBytes("testClientPoolRoundRobin");
+
+ int poolSize = 3;
+ int numVersions = poolSize * 2;
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "round-robin");
+ conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
+
+ HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY },
+ conf, Integer.MAX_VALUE);
+ table.setAutoFlush(true);
+ Put put = new Put(ROW);
+ put.add(FAMILY, QUALIFIER, VALUE);
+
+ Get get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions();
+
+ for (int versions = 1; versions <= numVersions; versions++) {
+ table.put(put);
+
+ Result result = table.get(get);
+ NavigableMap<Long, byte[]> navigableMap = result.getMap().get(FAMILY)
+ .get(QUALIFIER);
+
+ assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
+ + " did not match " + versions, versions, navigableMap.size());
+ for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
+ assertTrue("The value at time " + entry.getKey()
+ + " did not match what was put",
+ Bytes.equals(VALUE, entry.getValue()));
+ }
+ }
+ }
+
+ @Test
+ public void testClientPoolThreadLocal() throws IOException {
+ final byte[] tableName = Bytes.toBytes("testClientPoolThreadLocal");
+
+ int poolSize = Integer.MAX_VALUE;
+ int numVersions = 3;
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "thread-local");
+ conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
+ final HTable table = TEST_UTIL.createTable(tableName,
+ new byte[][] { FAMILY }, conf);
+ table.setAutoFlush(true);
+ final Put put = new Put(ROW);
+ put.add(FAMILY, QUALIFIER, VALUE);
+
+ final Get get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions();
+
+ for (int versions = 1; versions <= numVersions; versions++) {
+ table.put(put);
+
+ Result result = table.get(get);
+ NavigableMap<Long, byte[]> navigableMap = result.getMap().get(FAMILY)
+ .get(QUALIFIER);
+
+ assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
+ + " did not match " + versions, versions, navigableMap.size());
+ for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
+ assertTrue("The value at time " + entry.getKey()
+ + " did not match what was put",
+ Bytes.equals(VALUE, entry.getValue()));
+ }
+ }
+
+ final Object waitLock = new Object();
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(numVersions);
+ for (int versions = numVersions; versions < numVersions * 2; versions++) {
+ final int versionsCopy = versions;
+ executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() {
+ try {
+ table.put(put);
+
+ Result result = table.get(get);
+ NavigableMap<Long, byte[]> navigableMap = result.getMap()
+ .get(FAMILY).get(QUALIFIER);
+
+ assertEquals("The number of versions of '" + FAMILY + ":"
+ + QUALIFIER + " did not match " + versionsCopy, versionsCopy,
+ navigableMap.size());
+ for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
+ assertTrue("The value at time " + entry.getKey()
+ + " did not match what was put",
+ Bytes.equals(VALUE, entry.getValue()));
+ }
+ synchronized (waitLock) {
+ waitLock.wait();
+ }
+ } catch (Exception e) {
+ }
+
+ return null;
+ }
+ });
+ }
+ synchronized (waitLock) {
+ waitLock.notifyAll();
+ }
+ executorService.shutdownNow();
+ }
}