Author: jyates
Date: Thu Oct 17 18:10:16 2013
New Revision: 1533185
URL: http://svn.apache.org/r1533185
Log:
HBASE-9749: Custom threadpool for Coprocessor obtained HTables
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
Thu Oct 17 18:10:16 2013
@@ -17,6 +17,7 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -49,4 +50,11 @@ public interface CoprocessorEnvironment
* @throws IOException
*/
public HTableInterface getTable(byte[] tableName) throws IOException;
-}
+
+ /**
+ * @return an interface for accessing the given table using the passed
executor to run batch
+ * operations
+ * @throws IOException
+ */
+ public HTableInterface getTable(byte[] tableName, ExecutorService service)
throws IOException;
+}
\ No newline at end of file
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Thu Oct 17 18:10:16 2013
@@ -175,7 +175,7 @@ public class HTable implements HTableInt
this.finishSetup();
}
- private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
+ public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
int maxThreads = conf.getInt("hbase.htable.threads.max",
Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = 1; // is there a better default?
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
Thu Oct 17 18:10:16 2013
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -363,9 +364,10 @@ public abstract class CoprocessorHost<E
private HTable table;
private HConnection connection;
- public HTableWrapper(byte[] tableName, HConnection connection) throws
IOException {
+ public HTableWrapper(byte[] tableName, HConnection connection,
ExecutorService executor)
+ throws IOException {
this.tableName = tableName;
- this.table = new HTable(tableName, connection);
+ this.table = new HTable(tableName, connection, executor);
this.connection = connection;
openTables.add(this);
}
@@ -680,7 +682,19 @@ public abstract class CoprocessorHost<E
*/
@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
- return new HTableWrapper(tableName,
CoprocessorHConnection.getConnectionForEnvironment(this));
+ return this.getTable(tableName,
HTable.getDefaultExecutor(getConfiguration()));
+ }
+
+ /**
+ * Open a table from within the Coprocessor environment
+ * @param tableName the table name
+ * @return an interface for manipulating the table
+ * @exception java.io.IOException Exception
+ */
+ @Override
+ public HTableInterface getTable(byte[] tableName, ExecutorService pool)
throws IOException {
+ return new HTableWrapper(tableName,
CoprocessorHConnection.getConnectionForEnvironment(this),
+ pool);
}
}
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java?rev=1533185&r1=1533184&r2=1533185&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
Thu Oct 17 18:10:16 2013
@@ -21,6 +21,11 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -33,11 +38,12 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -48,9 +54,10 @@ import org.junit.experimental.categories
public class TestOpenTableInCoprocessor {
private static final byte[] otherTable = Bytes.toBytes("otherTable");
+ private static final byte[] primaryTable = Bytes.toBytes("primary");
private static final byte[] family = new byte[] { 'f' };
- private static boolean completed = false;
+ private static boolean [] completed = new boolean[1];
/**
* Custom coprocessor that just copies the write to another table.
@@ -65,28 +72,93 @@ public class TestOpenTableInCoprocessor
p.add(family, null, new byte[] { 'a' });
table.put(put);
table.flushCommits();
- completed = true;
+ completed[0] = true;
table.close();
}
}
+ private static boolean [] completedWithPool = new boolean [1] ;
+
+ public static class CustomThreadPoolCoprocessor extends BaseRegionObserver {
+
+ /**
+ * Get a pool that has only ever one thread. A second action added to the
pool (running
+ * concurrently), will cause an exception.
+ * @return
+ */
+ private ExecutorService getPool() {
+ int maxThreads = 1;
+ long keepAliveTime = 60;
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
keepAliveTime,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ Threads.newDaemonThreadFactory("hbase-table"));
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put
put, WALEdit edit,
+ boolean writeToWAL) throws IOException {
+ HTableInterface table = e.getEnvironment().getTable(otherTable,
getPool());
+ Put p = new Put(new byte[] { 'a' });
+ p.add(family, null, new byte[] { 'a' });
+ try {
+ table.batch(Collections.singletonList(put));
+ } catch (InterruptedException e1) {
+ throw new IOException(e1);
+ }
+ completedWithPool[0] = true;
+ table.close();
+ }
+ }
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @After
+ public void cleanupTestTable() throws Exception {
+ UTIL.getHBaseAdmin().disableTable(primaryTable);
+ UTIL.getHBaseAdmin().deleteTable(primaryTable);
+
+ UTIL.getHBaseAdmin().disableTable(otherTable);
+ UTIL.getHBaseAdmin().deleteTable(otherTable);
+
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception{
+ UTIL.shutdownMiniCluster();
+ }
+
@Test
public void testCoprocessorCanCreateConnectionToRemoteTable() throws
Throwable {
- HBaseTestingUtility UTIL = new HBaseTestingUtility();
- HTableDescriptor primary = new HTableDescriptor("primary");
+ runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class,
completed);
+ }
+
+ @Test
+ public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool()
throws Throwable {
+ runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class,
completedWithPool);
+ }
+
+ private void runCoprocessorConnectionToRemoteTable(Class<? extends
BaseRegionObserver> clazz,
+ boolean[] completeCheck) throws Throwable {
+ HTableDescriptor primary = new HTableDescriptor(primaryTable);
primary.addFamily(new HColumnDescriptor(family));
// add our coprocessor
- primary.addCoprocessor(SendToOtherTableCoprocessor.class.getName());
+ primary.addCoprocessor(clazz.getName());
HTableDescriptor other = new HTableDescriptor(otherTable);
other.addFamily(new HColumnDescriptor(family));
- UTIL.startMiniCluster();
+
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.createTable(primary);
admin.createTable(other);
- admin.close();
HTable table = new HTable(UTIL.getConfiguration(), "primary");
Put p = new Put(new byte[] { 'a' });
@@ -96,11 +168,10 @@ public class TestOpenTableInCoprocessor
table.close();
HTable target = new HTable(UTIL.getConfiguration(), otherTable);
- assertTrue("Didn't complete update to target table!", completed);
+ assertTrue("Didn't complete update to target table!", completeCheck[0]);
assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
target.close();
- UTIL.shutdownMiniCluster();
}
/**