Author: jyates
Date: Thu Oct 17 18:13:18 2013
New Revision: 1533187
URL: http://svn.apache.org/r1533187
Log:
HBASE-9749: Custom threadpool for Coprocessor obtained HTables
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
Thu Oct 17 18:13:18 2013
@@ -16,6 +16,7 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -50,4 +51,11 @@ public interface CoprocessorEnvironment
* @throws IOException
*/
HTableInterface getTable(TableName tableName) throws IOException;
+
+ /**
+ * @return an interface for accessing the given table using the passed
executor to run batch
+ * operations
+ * @throws IOException
+ */
+ public HTableInterface getTable(TableName tableName, ExecutorService
service) throws IOException;
}
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Thu Oct 17 18:13:18 2013
@@ -210,7 +210,7 @@ public class HTable implements HTableInt
this.finishSetup();
}
- private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
+ public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
int maxThreads = conf.getInt("hbase.htable.threads.max",
Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = 1; // is there a better default?
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
Thu Oct 17 18:13:18 2013
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -376,9 +377,10 @@ public abstract class CoprocessorHost<E
private HTable table;
private HConnection connection;
- public HTableWrapper(TableName tableName, HConnection connection) throws
IOException {
+ public HTableWrapper(TableName tableName, HConnection connection,
ExecutorService pool)
+ throws IOException {
this.tableName = tableName;
- this.table = new HTable(tableName, connection);
+ this.table = new HTable(tableName, connection, pool);
this.connection = connection;
openTables.add(this);
}
@@ -709,7 +711,19 @@ public abstract class CoprocessorHost<E
*/
@Override
public HTableInterface getTable(TableName tableName) throws IOException {
- return new HTableWrapper(tableName,
CoprocessorHConnection.getConnectionForEnvironment(this));
+ return this.getTable(tableName,
HTable.getDefaultExecutor(getConfiguration()));
+ }
+
+ /**
+ * Open a table from within the Coprocessor environment
+ * @param tableName the table name
+ * @return an interface for manipulating the table
+ * @exception java.io.IOException Exception
+ */
+ @Override
+ public HTableInterface getTable(TableName tableName, ExecutorService pool)
throws IOException {
+ return new HTableWrapper(tableName,
CoprocessorHConnection.getConnectionForEnvironment(this),
+ pool);
}
}
Modified:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
(original)
+++
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOpenTableInCoprocessor.java
Thu Oct 17 18:13:18 2013
@@ -22,12 +22,17 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
@@ -37,7 +42,10 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -47,53 +55,113 @@ import org.junit.experimental.categories
@Category(MediumTests.class)
public class TestOpenTableInCoprocessor {
- private static final TableName otherTable =
- TableName.valueOf("otherTable");
+ private static final TableName otherTable = TableName.valueOf("otherTable");
+ private static final TableName primaryTable = TableName.valueOf("primary");
private static final byte[] family = new byte[] { 'f' };
- private static boolean completed = false;
-
+ private static boolean[] completed = new boolean[1];
/**
* Custom coprocessor that just copies the write to another table.
*/
public static class SendToOtherTableCoprocessor extends BaseRegionObserver {
@Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put
put, WALEdit edit,
- final Durability durability) throws IOException {
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put,
+ final WALEdit edit, final Durability durability) throws IOException {
HTableInterface table = e.getEnvironment().getTable(otherTable);
Put p = new Put(new byte[] { 'a' });
p.add(family, null, new byte[] { 'a' });
table.put(put);
table.flushCommits();
- completed = true;
+ completed[0] = true;
+ table.close();
+ }
+
+ }
+
+ private static boolean[] completedWithPool = new boolean[1];
+ /**
+ * Coprocessor that creates an HTable with a pool to write to another table
+ */
+ public static class CustomThreadPoolCoprocessor extends BaseRegionObserver {
+
+ /**
+ * Get a pool that has only ever one thread. A second action added to the
pool (running
+ * concurrently), will cause an exception.
+ * @return
+ */
+ private ExecutorService getPool() {
+ int maxThreads = 1;
+ long keepAliveTime = 60;
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("hbase-table"));
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put,
+ final WALEdit edit, final Durability durability) throws IOException {
+ HTableInterface table = e.getEnvironment().getTable(otherTable,
getPool());
+ Put p = new Put(new byte[] { 'a' });
+ p.add(family, null, new byte[] { 'a' });
+ try {
+ table.batch(Collections.singletonList(put));
+ } catch (InterruptedException e1) {
+ throw new IOException(e1);
+ }
+ completedWithPool[0] = true;
table.close();
}
+ }
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ UTIL.startMiniCluster();
}
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ @After
+ public void cleanupTestTable() throws Exception {
+ UTIL.getHBaseAdmin().disableTable(primaryTable);
+ UTIL.getHBaseAdmin().deleteTable(primaryTable);
+
+ UTIL.getHBaseAdmin().disableTable(otherTable);
+ UTIL.getHBaseAdmin().deleteTable(otherTable);
+
+ }
@AfterClass
- public static void cleanup() throws Exception {
- UTIL.getHBaseAdmin().close();
+ public static void teardownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
}
@Test
public void testCoprocessorCanCreateConnectionToRemoteTable() throws
Throwable {
- HTableDescriptor primary = new
HTableDescriptor(TableName.valueOf("primary"));
+ runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class,
completed);
+ }
+
+ @Test
+ public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool()
throws Throwable {
+ runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class,
completedWithPool);
+ }
+
+ private void runCoprocessorConnectionToRemoteTable(Class<? extends
BaseRegionObserver> clazz,
+ boolean[] completeCheck) throws Throwable {
+ HTableDescriptor primary = new HTableDescriptor(primaryTable);
primary.addFamily(new HColumnDescriptor(family));
// add our coprocessor
- primary.addCoprocessor(SendToOtherTableCoprocessor.class.getName());
+ primary.addCoprocessor(clazz.getName());
HTableDescriptor other = new HTableDescriptor(otherTable);
other.addFamily(new HColumnDescriptor(family));
- UTIL.startMiniCluster();
+
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.createTable(primary);
admin.createTable(other);
- admin.close();
HTable table = new HTable(UTIL.getConfiguration(), "primary");
Put p = new Put(new byte[] { 'a' });
@@ -103,11 +171,9 @@ public class TestOpenTableInCoprocessor
table.close();
HTable target = new HTable(UTIL.getConfiguration(), otherTable);
- assertTrue("Didn't complete update to target table!", completed);
+ assertTrue("Didn't complete update to target table!", completeCheck[0]);
assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
target.close();
-
- UTIL.shutdownMiniCluster();
}
/**
Modified:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1533187&r1=1533186&r2=1533187&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
(original)
+++
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
Thu Oct 17 18:13:18 2013
@@ -28,6 +28,7 @@ import java.security.PrivilegedException
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -215,6 +216,12 @@ public class TestTokenAuthentication {
@Override
public HTableInterface getTable(TableName tableName) throws IOException
{ return null; }
+
+ @Override
+ public HTableInterface getTable(TableName tableName, ExecutorService
service)
+ throws IOException {
+ return null;
+ }
});
started = true;