Author: stack
Date: Tue Apr 19 18:32:11 2011
New Revision: 1095160

URL: http://svn.apache.org/viewvc?rev=1095160&view=rev
Log:
HBASE-2939 Allow Client-Side Connection Pooling

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Modified: hbase/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Apr 19 18:32:11 2011
@@ -175,6 +175,7 @@ Release 0.91.0 - Unreleased
                (Erik Onnen)
    HBASE-3757  Upgrade to ZK 3.3.3
    HBASE-3609  Improve the selection of regions to balance; part 2 (Ted Yu)
+   HBASE-2939  Allow Client-Side Connection Pooling (Karthik Sankarachary)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Apr 
19 18:32:11 2011
@@ -134,6 +134,12 @@ public final class HConstants {
   /** Parameter name for HBase instance root directory */
   public static final String HBASE_DIR = "hbase.rootdir";
 
+  /** Parameter name for HBase client IPC pool type */
+  public static final String HBASE_CLIENT_IPC_POOL_TYPE = 
"hbase.client.ipc.pool.type";
+
+  /** Parameter name for HBase client IPC pool size */
+  public static final String HBASE_CLIENT_IPC_POOL_SIZE = 
"hbase.client.ipc.pool.size";
+
   /** Used to construct the name of the log directory for a region server
    * Use '.' as a special character to seperate the log files from table data 
*/
   public static final String HREGION_LOGDIR_NAME = ".logs";

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue 
Apr 19 18:32:11 2011
@@ -20,22 +20,6 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import javax.net.SocketFactory;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -50,10 +34,31 @@ import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -67,8 +72,7 @@ public class HBaseClient {
 
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
-  protected final Hashtable<ConnectionId, Connection> connections =
-    new Hashtable<ConnectionId, Connection>();
+  protected final Map<ConnectionId, Connection> connections;
 
   protected final Class<? extends Writable> valueClass;   // class of call 
values
   protected int counter;                            // counter for call ids
@@ -689,6 +693,8 @@ public class HBaseClient {
     this.conf = conf;
     this.socketFactory = factory;
     this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
+    this.connections = new PoolMap<ConnectionId, Connection>(
+        getPoolType(conf), getPoolSize(conf));
   }
 
   /**
@@ -700,6 +706,30 @@ public class HBaseClient {
     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
   }
 
+  /**
+   * Return the pool type specified in the configuration, if it roughly equals 
either
+   * the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}, 
otherwise
+   * default to the former type.
+   *
+   * @param config configuration
+   * @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}
+   */
+  private static PoolType getPoolType(Configuration config) {
+    return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
+        PoolType.RoundRobin, PoolType.ThreadLocal);
+  }
+
+  /**
+   * Return the pool size specified in the configuration, otherwise the 
maximum allowable 
+   * size (which for all intents and purposes represents an unbounded pool).
+   *
+   * @param config
+   * @return the maximum pool size
+   */
+  private static int getPoolSize(Configuration config) {
+    return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
+  }
+
   /** Return the socket factory of this client
    *
    * @return this client's socket factory

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
Tue Apr 19 18:32:11 2011
@@ -540,6 +540,33 @@ public class HBaseTestingUtility {
   /**
    * Create a table.
    * @param tableName
+   * @param families
+   * @param c Configuration to use
+   * @param numVersions
+   * @return An HTable instance for the created table.
+   * @throws IOException
+   */
+  public HTable createTable(byte[] tableName, byte[][] families,
+      final Configuration c, int numVersions)
+  throws IOException {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    for(byte[] family : families) {
+      HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
+          HColumnDescriptor.DEFAULT_COMPRESSION,
+          HColumnDescriptor.DEFAULT_IN_MEMORY,
+          HColumnDescriptor.DEFAULT_BLOCKCACHE,
+          HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL,
+          HColumnDescriptor.DEFAULT_BLOOMFILTER,
+          HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+      desc.addFamily(hcd);
+    }
+    getHBaseAdmin().createTable(desc);
+    return new HTable(c, tableName);
+  }
+
+  /**
+   * Create a table.
+   * @param tableName
    * @param family
    * @param numVersions
    * @return An HTable instance for the created table.

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1095160&r1=1095159&r2=1095160&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 Tue Apr 19 18:32:11 2011
@@ -38,6 +38,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 
@@ -4032,6 +4035,116 @@ public class TestFromClientSide {
     queue.put(new Object());
   }
 
+  @Test
+  public void testClientPoolRoundRobin() throws IOException {
+    final byte[] tableName = Bytes.toBytes("testClientPoolRoundRobin");
+
+    int poolSize = 3;
+    int numVersions = poolSize * 2;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "round-robin");
+    conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
+
+    HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY },
+        conf, Integer.MAX_VALUE);
+    table.setAutoFlush(true);
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, VALUE);
+
+    Get get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions();
+
+    for (int versions = 1; versions <= numVersions; versions++) {
+      table.put(put);
+
+      Result result = table.get(get);
+      NavigableMap<Long, byte[]> navigableMap = result.getMap().get(FAMILY)
+          .get(QUALIFIER);
+
+      assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
+          + " did not match " + versions, versions, navigableMap.size());
+      for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
+        assertTrue("The value at time " + entry.getKey()
+            + " did not match what was put",
+            Bytes.equals(VALUE, entry.getValue()));
+      }
+    }
+  }
+
+  @Test
+  public void testClientPoolThreadLocal() throws IOException {
+    final byte[] tableName = Bytes.toBytes("testClientPoolThreadLocal");
+
+    int poolSize = Integer.MAX_VALUE;
+    int numVersions = 3;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "thread-local");
+    conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
 
+    final HTable table = TEST_UTIL.createTable(tableName,
+        new byte[][] { FAMILY }, conf);
+    table.setAutoFlush(true);
+    final Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, VALUE);
+
+    final Get get = new Get(ROW);
+    get.addColumn(FAMILY, QUALIFIER);
+    get.setMaxVersions();
+
+    for (int versions = 1; versions <= numVersions; versions++) {
+      table.put(put);
+
+      Result result = table.get(get);
+      NavigableMap<Long, byte[]> navigableMap = result.getMap().get(FAMILY)
+          .get(QUALIFIER);
+
+      assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
+          + " did not match " + versions, versions, navigableMap.size());
+      for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
+        assertTrue("The value at time " + entry.getKey()
+            + " did not match what was put",
+            Bytes.equals(VALUE, entry.getValue()));
+      }
+    }
+
+    final Object waitLock = new Object();
+
+    ExecutorService executorService = 
Executors.newFixedThreadPool(numVersions);
+    for (int versions = numVersions; versions < numVersions * 2; versions++) {
+      final int versionsCopy = versions;
+      executorService.submit(new Callable<Void>() {
+        @Override
+        public Void call() {
+          try {
+            table.put(put);
+
+            Result result = table.get(get);
+            NavigableMap<Long, byte[]> navigableMap = result.getMap()
+                .get(FAMILY).get(QUALIFIER);
+
+            assertEquals("The number of versions of '" + FAMILY + ":"
+                + QUALIFIER + " did not match " + versionsCopy, versionsCopy,
+                navigableMap.size());
+            for (Map.Entry<Long, byte[]> entry : navigableMap.entrySet()) {
+              assertTrue("The value at time " + entry.getKey()
+                  + " did not match what was put",
+                  Bytes.equals(VALUE, entry.getValue()));
+            }
+            synchronized (waitLock) {
+              waitLock.wait();
+            }
+          } catch (Exception e) {
+          }
+
+          return null;
+        }
+      });
+    }
+    synchronized (waitLock) {
+      waitLock.notifyAll();
+    }
+    executorService.shutdownNow();
+  }
 }
 


Reply via email to