http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java new file mode 100644 index 0000000..f5d2a20 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java @@ -0,0 +1,563 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.TestServerCustomProtocol; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.ClassLoaderTestHelper; +import org.apache.hadoop.hbase.util.CoprocessorClassLoader; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.RegionLoad; + +import java.io.*; +import java.util.*; + +import org.junit.*; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +/** + * Test coprocessors class loading. + */ +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestClassLoading { + private static final Log LOG = LogFactory.getLog(TestClassLoading.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static MiniDFSCluster cluster; + + static final TableName tableName = TableName.valueOf("TestClassLoading"); + static final String cpName1 = "TestCP1"; + static final String cpName2 = "TestCP2"; + static final String cpName3 = "TestCP3"; + static final String cpName4 = "TestCP4"; + static final String cpName5 = "TestCP5"; + static final String cpName6 = "TestCP6"; + + private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class; + // TOOD: Fix the import of this handler. It is coming in from a package that is far away. + private static Class<?> regionCoprocessor2 = TestServerCustomProtocol.PingHandler.class; + private static Class<?> regionServerCoprocessor = SampleRegionWALObserver.class; + private static Class<?> masterCoprocessor = BaseMasterObserver.class; + + private static final String[] regionServerSystemCoprocessors = + new String[]{ + regionServerCoprocessor.getSimpleName() + }; + + private static final String[] masterRegionServerSystemCoprocessors = new String[] { + regionCoprocessor1.getSimpleName(), MultiRowMutationEndpoint.class.getSimpleName(), + regionServerCoprocessor.getSimpleName() }; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // regionCoprocessor1 will be loaded on all regionservers, since it is + // loaded for any tables (user or meta). + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + regionCoprocessor1.getName()); + + // regionCoprocessor2 will be loaded only on regionservers that serve a + // user table region. Therefore, if there are no user tables loaded, + // this coprocessor will not be loaded on any regionserver. + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + regionCoprocessor2.getName()); + + conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + regionServerCoprocessor.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + masterCoprocessor.getName()); + TEST_UTIL.startMiniCluster(1); + cluster = TEST_UTIL.getDFSCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + static File buildCoprocessorJar(String className) throws Exception { + String code = "import org.apache.hadoop.hbase.coprocessor.*;" + + "public class " + className + " extends BaseRegionObserver {}"; + return ClassLoaderTestHelper.buildJar( + TEST_UTIL.getDataTestDir().toString(), className, code); + } + + @Test + // HBASE-3516: Test CP Class loading from HDFS + public void testClassLoadingFromHDFS() throws Exception { + FileSystem fs = cluster.getFileSystem(); + + File jarFile1 = buildCoprocessorJar(cpName1); + File jarFile2 = buildCoprocessorJar(cpName2); + + // copy the jars into dfs + fs.copyFromLocalFile(new Path(jarFile1.getPath()), + new Path(fs.getUri().toString() + Path.SEPARATOR)); + String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR + + jarFile1.getName(); + Path pathOnHDFS1 = new Path(jarFileOnHDFS1); + assertTrue("Copy jar file to HDFS failed.", + fs.exists(pathOnHDFS1)); + LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1); + + fs.copyFromLocalFile(new Path(jarFile2.getPath()), + new Path(fs.getUri().toString() + Path.SEPARATOR)); + String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR + + jarFile2.getName(); + Path pathOnHDFS2 = new Path(jarFileOnHDFS2); + assertTrue("Copy jar file to HDFS failed.", + fs.exists(pathOnHDFS2)); + LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2); + + // create a table that references the coprocessors + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("test")); + // without configuration values + htd.setValue("COPROCESSOR$1", jarFileOnHDFS1.toString() + "|" + cpName1 + + "|" + Coprocessor.PRIORITY_USER); + // with configuration values + htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 + + "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3"); + Admin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + CoprocessorClassLoader.clearCache(); + byte[] startKey = {10, 63}; + byte[] endKey = {12, 43}; + admin.createTable(htd, startKey, endKey, 4); + waitForTable(htd.getTableName()); + + // verify that the coprocessors were loaded + boolean foundTableRegion=false; + boolean found1 = true, found2 = true, found2_k1 = true, found2_k2 = true, found2_k3 = true; + Map<Region, Set<ClassLoader>> regionsActiveClassLoaders = + new HashMap<Region, Set<ClassLoader>>(); + MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); + for (Region region: + hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) { + foundTableRegion = true; + CoprocessorEnvironment env; + env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1); + found1 = found1 && (env != null); + env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2); + found2 = found2 && (env != null); + if (env != null) { + Configuration conf = env.getConfiguration(); + found2_k1 = found2_k1 && (conf.get("k1") != null); + found2_k2 = found2_k2 && (conf.get("k2") != null); + found2_k3 = found2_k3 && (conf.get("k3") != null); + } else { + found2_k1 = found2_k2 = found2_k3 = false; + } + regionsActiveClassLoaders + .put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders()); + } + } + + assertTrue("No region was found for table " + tableName, foundTableRegion); + assertTrue("Class " + cpName1 + " was missing on a region", found1); + assertTrue("Class " + cpName2 + " was missing on a region", found2); + assertTrue("Configuration key 'k1' was missing on a region", found2_k1); + assertTrue("Configuration key 'k2' was missing on a region", found2_k2); + assertTrue("Configuration key 'k3' was missing on a region", found2_k3); + // check if CP classloaders are cached + assertNotNull(jarFileOnHDFS1 + " was not cached", + CoprocessorClassLoader.getIfCached(pathOnHDFS1)); + assertNotNull(jarFileOnHDFS2 + " was not cached", + CoprocessorClassLoader.getIfCached(pathOnHDFS2)); + //two external jar used, should be one classloader per jar + assertEquals("The number of cached classloaders should be equal to the number" + + " of external jar files", + 2, CoprocessorClassLoader.getAllCached().size()); + //check if region active classloaders are shared across all RS regions + Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>( + CoprocessorClassLoader.getAllCached()); + for (Map.Entry<Region, Set<ClassLoader>> regionCP : regionsActiveClassLoaders.entrySet()) { + assertTrue("Some CP classloaders for region " + regionCP.getKey() + " are not cached." + + " ClassLoader Cache:" + externalClassLoaders + + " Region ClassLoaders:" + regionCP.getValue(), + externalClassLoaders.containsAll(regionCP.getValue())); + } + } + + private String getLocalPath(File file) { + return new Path(file.toURI()).toString(); + } + + @Test + // HBASE-3516: Test CP Class loading from local file system + public void testClassLoadingFromLocalFS() throws Exception { + File jarFile = buildCoprocessorJar(cpName3); + + // create a table that references the jar + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName3)); + htd.addFamily(new HColumnDescriptor("test")); + htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" + + Coprocessor.PRIORITY_USER); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(htd); + waitForTable(htd.getTableName()); + + // verify that the coprocessor was loaded + boolean found = false; + MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); + for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName3)) { + found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null); + } + } + assertTrue("Class " + cpName3 + " was missing on a region", found); + } + + @Test + // HBASE-6308: Test CP classloader is the CoprocessorClassLoader + public void testPrivateClassLoader() throws Exception { + File jarFile = buildCoprocessorJar(cpName4); + + // create a table that references the jar + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName4)); + htd.addFamily(new HColumnDescriptor("test")); + htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName4 + "|" + + Coprocessor.PRIORITY_USER); + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(htd); + waitForTable(htd.getTableName()); + + // verify that the coprocessor was loaded correctly + boolean found = false; + MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); + for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName4)) { + Coprocessor cp = region.getCoprocessorHost().findCoprocessor(cpName4); + if (cp != null) { + found = true; + assertEquals("Class " + cpName4 + " was not loaded by CoprocessorClassLoader", + cp.getClass().getClassLoader().getClass(), CoprocessorClassLoader.class); + } + } + } + assertTrue("Class " + cpName4 + " was missing on a region", found); + } + + @Test + // HBase-3810: Registering a Coprocessor at HTableDescriptor should be + // less strict + public void testHBase3810() throws Exception { + // allowed value pattern: [path] | class name | [priority] | [key values] + + File jarFile1 = buildCoprocessorJar(cpName1); + File jarFile2 = buildCoprocessorJar(cpName2); + File jarFile5 = buildCoprocessorJar(cpName5); + File jarFile6 = buildCoprocessorJar(cpName6); + + String cpKey1 = "COPROCESSOR$1"; + String cpKey2 = " Coprocessor$2 "; + String cpKey3 = " coprocessor$03 "; + + String cpValue1 = getLocalPath(jarFile1) + "|" + cpName1 + "|" + + Coprocessor.PRIORITY_USER; + String cpValue2 = getLocalPath(jarFile2) + " | " + cpName2 + " | "; + // load from default class loader + String cpValue3 = + " | org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver | | k=v "; + + // create a table that references the jar + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("test")); + + // add 3 coprocessors by setting htd attributes directly. + htd.setValue(cpKey1, cpValue1); + htd.setValue(cpKey2, cpValue2); + htd.setValue(cpKey3, cpValue3); + + // add 2 coprocessor by using new htd.addCoprocessor() api + htd.addCoprocessor(cpName5, new Path(getLocalPath(jarFile5)), + Coprocessor.PRIORITY_USER, null); + Map<String, String> kvs = new HashMap<String, String>(); + kvs.put("k1", "v1"); + kvs.put("k2", "v2"); + kvs.put("k3", "v3"); + htd.addCoprocessor(cpName6, new Path(getLocalPath(jarFile6)), + Coprocessor.PRIORITY_USER, kvs); + + Admin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + admin.createTable(htd); + waitForTable(htd.getTableName()); + + // verify that the coprocessor was loaded + boolean found_2 = false, found_1 = false, found_3 = false, + found_5 = false, found_6 = false; + boolean found6_k1 = false, found6_k2 = false, found6_k3 = false, + found6_k4 = false; + + MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); + for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) { + found_1 = found_1 || + (region.getCoprocessorHost().findCoprocessor(cpName1) != null); + found_2 = found_2 || + (region.getCoprocessorHost().findCoprocessor(cpName2) != null); + found_3 = found_3 || + (region.getCoprocessorHost().findCoprocessor("SimpleRegionObserver") + != null); + found_5 = found_5 || + (region.getCoprocessorHost().findCoprocessor(cpName5) != null); + + CoprocessorEnvironment env = + region.getCoprocessorHost().findCoprocessorEnvironment(cpName6); + if (env != null) { + found_6 = true; + Configuration conf = env.getConfiguration(); + found6_k1 = conf.get("k1") != null; + found6_k2 = conf.get("k2") != null; + found6_k3 = conf.get("k3") != null; + } + } + } + + assertTrue("Class " + cpName1 + " was missing on a region", found_1); + assertTrue("Class " + cpName2 + " was missing on a region", found_2); + assertTrue("Class SimpleRegionObserver was missing on a region", found_3); + assertTrue("Class " + cpName5 + " was missing on a region", found_5); + assertTrue("Class " + cpName6 + " was missing on a region", found_6); + + assertTrue("Configuration key 'k1' was missing on a region", found6_k1); + assertTrue("Configuration key 'k2' was missing on a region", found6_k2); + assertTrue("Configuration key 'k3' was missing on a region", found6_k3); + assertFalse("Configuration key 'k4' wasn't configured", found6_k4); + } + + @Test + public void testClassLoadingFromLibDirInJar() throws Exception { + loadingClassFromLibDirInJar("/lib/"); + } + + @Test + public void testClassLoadingFromRelativeLibDirInJar() throws Exception { + loadingClassFromLibDirInJar("lib/"); + } + + void loadingClassFromLibDirInJar(String libPrefix) throws Exception { + FileSystem fs = cluster.getFileSystem(); + + File innerJarFile1 = buildCoprocessorJar(cpName1); + File innerJarFile2 = buildCoprocessorJar(cpName2); + File outerJarFile = new File(TEST_UTIL.getDataTestDir().toString(), "outer.jar"); + + ClassLoaderTestHelper.addJarFilesToJar( + outerJarFile, libPrefix, innerJarFile1, innerJarFile2); + + // copy the jars into dfs + fs.copyFromLocalFile(new Path(outerJarFile.getPath()), + new Path(fs.getUri().toString() + Path.SEPARATOR)); + String jarFileOnHDFS = fs.getUri().toString() + Path.SEPARATOR + + outerJarFile.getName(); + assertTrue("Copy jar file to HDFS failed.", + fs.exists(new Path(jarFileOnHDFS))); + LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS); + + // create a table that references the coprocessors + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("test")); + // without configuration values + htd.setValue("COPROCESSOR$1", jarFileOnHDFS.toString() + "|" + cpName1 + + "|" + Coprocessor.PRIORITY_USER); + // with configuration values + htd.setValue("COPROCESSOR$2", jarFileOnHDFS.toString() + "|" + cpName2 + + "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3"); + Admin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + admin.createTable(htd); + waitForTable(htd.getTableName()); + + // verify that the coprocessors were loaded + boolean found1 = false, found2 = false, found2_k1 = false, + found2_k2 = false, found2_k3 = false; + MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster(); + for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) { + if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) { + CoprocessorEnvironment env; + env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1); + if (env != null) { + found1 = true; + } + env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2); + if (env != null) { + found2 = true; + Configuration conf = env.getConfiguration(); + found2_k1 = conf.get("k1") != null; + found2_k2 = conf.get("k2") != null; + found2_k3 = conf.get("k3") != null; + } + } + } + assertTrue("Class " + cpName1 + " was missing on a region", found1); + assertTrue("Class " + cpName2 + " was missing on a region", found2); + assertTrue("Configuration key 'k1' was missing on a region", found2_k1); + assertTrue("Configuration key 'k2' was missing on a region", found2_k2); + assertTrue("Configuration key 'k3' was missing on a region", found2_k3); + } + + @Test + public void testRegionServerCoprocessorsReported() throws Exception { + // This was a test for HBASE-4070. + // We are removing coprocessors from region load in HBASE-5258. + // Therefore, this test now only checks system coprocessors. + assertAllRegionServers(null); + } + + /** + * return the subset of all regionservers + * (actually returns set of ServerLoads) + * which host some region in a given table. + * used by assertAllRegionServers() below to + * test reporting of loaded coprocessors. + * @param tableName : given table. + * @return subset of all servers. + */ + Map<ServerName, ServerLoad> serversForTable(String tableName) { + Map<ServerName, ServerLoad> serverLoadHashMap = + new HashMap<ServerName, ServerLoad>(); + for(Map.Entry<ServerName,ServerLoad> server: + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager(). + getOnlineServers().entrySet()) { + for( Map.Entry<byte[], RegionLoad> region: + server.getValue().getRegionsLoad().entrySet()) { + if (region.getValue().getNameAsString().equals(tableName)) { + // this server hosts a region of tableName: add this server.. + serverLoadHashMap.put(server.getKey(),server.getValue()); + // .. and skip the rest of the regions that it hosts. + break; + } + } + } + return serverLoadHashMap; + } + + void assertAllRegionServers(String tableName) throws InterruptedException { + Map<ServerName, ServerLoad> servers; + String[] actualCoprocessors = null; + boolean success = false; + String[] expectedCoprocessors = regionServerSystemCoprocessors; + if (tableName == null) { + // if no tableName specified, use all servers. + servers = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().getOnlineServers(); + } else { + servers = serversForTable(tableName); + } + for (int i = 0; i < 5; i++) { + boolean any_failed = false; + for(Map.Entry<ServerName,ServerLoad> server: servers.entrySet()) { + actualCoprocessors = server.getValue().getRsCoprocessors(); + if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) { + LOG.debug("failed comparison: actual: " + + Arrays.toString(actualCoprocessors) + + " ; expected: " + Arrays.toString(expectedCoprocessors)); + any_failed = true; + expectedCoprocessors = switchExpectedCoprocessors(expectedCoprocessors); + break; + } + expectedCoprocessors = switchExpectedCoprocessors(expectedCoprocessors); + } + if (any_failed == false) { + success = true; + break; + } + LOG.debug("retrying after failed comparison: " + i); + Thread.sleep(1000); + } + assertTrue(success); + } + + private String[] switchExpectedCoprocessors(String[] expectedCoprocessors) { + if (Arrays.equals(regionServerSystemCoprocessors, expectedCoprocessors)) { + expectedCoprocessors = masterRegionServerSystemCoprocessors; + } else { + expectedCoprocessors = regionServerSystemCoprocessors; + } + return expectedCoprocessors; + } + + @Test + public void testMasterCoprocessorsReported() { + // HBASE 4070: Improve region server metrics to report loaded coprocessors + // to master: verify that the master is reporting the correct set of + // loaded coprocessors. + final String loadedMasterCoprocessorsVerify = + "[" + masterCoprocessor.getSimpleName() + "]"; + String loadedMasterCoprocessors = + java.util.Arrays.toString( + TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessors()); + assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors); + } + + @Test + public void testFindCoprocessors() { + // HBASE 12277: + CoprocessorHost masterCpHost = + TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost(); + + List<MasterObserver> masterObservers = masterCpHost.findCoprocessors(MasterObserver.class); + + assertTrue(masterObservers != null && masterObservers.size() > 0); + assertEquals(masterCoprocessor.getSimpleName(), + masterObservers.get(0).getClass().getSimpleName()); + } + + private void waitForTable(TableName name) throws InterruptedException, IOException { + // First wait until all regions are online + TEST_UTIL.waitTableEnabled(name); + // Now wait a bit longer for the coprocessor hosts to load the CPs + Thread.sleep(1000); + } + +} +
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java new file mode 100644 index 0000000..7e2577a --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java @@ -0,0 +1,349 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * TestEndpoint: test cases to verify coprocessor Endpoint + */ +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestCoprocessorEndpoint { + private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class); + + private static final TableName TEST_TABLE = + TableName.valueOf("TestCoprocessorEndpoint"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static byte[] ROW = Bytes.toBytes("testRow"); + + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 5; + private static final int rowSeperator2 = 12; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + // set configure to indicate which cp should be loaded + Configuration conf = util.getConfiguration(); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(), + ProtobufCoprocessorService.class.getName()); + conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + util.startMiniCluster(2); + Admin admin = util.getHBaseAdmin(); + HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); + admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); + util.waitUntilAllRegionsAssigned(TEST_TABLE); + + Table table = util.getConnection().getTable(TEST_TABLE); + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + table.close(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + private Map<byte [], Long> sum(final Table table, final byte [] family, + final byte [] qualifier, final byte [] start, final byte [] end) + throws ServiceException, Throwable { + return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, + start, end, + new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() { + @Override + public Long call(ColumnAggregationProtos.ColumnAggregationService instance) + throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse>(); + ColumnAggregationProtos.SumRequest.Builder builder = + ColumnAggregationProtos.SumRequest.newBuilder(); + builder.setFamily(ByteStringer.wrap(family)); + if (qualifier != null && qualifier.length > 0) { + builder.setQualifier(ByteStringer.wrap(qualifier)); + } + instance.sum(null, builder.build(), rpcCallback); + return rpcCallback.get().getSum(); + } + }); + } + + @Test + public void testAggregation() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, + ROWS[0], ROWS[ROWS.length-1]); + int sumResult = 0; + int expectedResult = 0; + for (Map.Entry<byte[], Long> e : results.entrySet()) { + LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue(); + } + for (int i = 0; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + + results.clear(); + + // scan: for region 2 and region 3 + results = sum(table, TEST_FAMILY, TEST_QUALIFIER, + ROWS[rowSeperator1], ROWS[ROWS.length-1]); + sumResult = 0; + expectedResult = 0; + for (Map.Entry<byte[], Long> e : results.entrySet()) { + LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); + sumResult += e.getValue(); + } + for (int i = rowSeperator1; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + table.close(); + } + + @Test + public void testCoprocessorService() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + + List<HRegionLocation> regions; + try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) { + regions = rl.getAllRegionLocations(); + } + final TestProtos.EchoRequestProto request = + TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); + final Map<byte[], String> results = Collections.synchronizedMap( + new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR)); + try { + // scan: for all regions + final RpcController controller = new ServerRpcController(); + table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, + ROWS[0], ROWS[ROWS.length - 1], + new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() { + public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) + throws IOException { + LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); + CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback = + new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>(); + instance.echo(controller, request, callback); + TestProtos.EchoResponseProto response = callback.get(); + LOG.debug("Batch.Call returning result " + response); + return response; + } + }, + new Batch.Callback<TestProtos.EchoResponseProto>() { + public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { + assertNotNull(result); + assertEquals("hello", result.getMessage()); + results.put(region, result.getMessage()); + } + } + ); + for (Map.Entry<byte[], String> e : results.entrySet()) { + LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); + } + assertEquals(3, results.size()); + for (HRegionLocation info : regions) { + LOG.info("Region info is "+info.getRegionInfo().getRegionNameAsString()); + assertTrue(results.containsKey(info.getRegionInfo().getRegionName())); + } + results.clear(); + + // scan: for region 2 and region 3 + table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, + ROWS[rowSeperator1], ROWS[ROWS.length - 1], + new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() { + public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) + throws IOException { + LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); + CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback = + new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>(); + instance.echo(controller, request, callback); + TestProtos.EchoResponseProto response = callback.get(); + LOG.debug("Batch.Call returning result " + response); + return response; + } + }, + new Batch.Callback<TestProtos.EchoResponseProto>() { + public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { + assertNotNull(result); + assertEquals("hello", result.getMessage()); + results.put(region, result.getMessage()); + } + } + ); + for (Map.Entry<byte[], String> e : results.entrySet()) { + LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); + } + assertEquals(2, results.size()); + } finally { + table.close(); + } + } + + @Test + public void testCoprocessorServiceNullResponse() throws Throwable { + Table table = util.getConnection().getTable(TEST_TABLE); + List<HRegionLocation> regions; + try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) { + regions = rl.getAllRegionLocations(); + } + + final TestProtos.EchoRequestProto request = + TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); + try { + // scan: for all regions + final RpcController controller = new ServerRpcController(); + // test that null results are supported + Map<byte[], String> results = + table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, + ROWS[0], ROWS[ROWS.length - 1], + new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() { + public String call(TestRpcServiceProtos.TestProtobufRpcProto instance) + throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback = + new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>(); + instance.echo(controller, request, callback); + TestProtos.EchoResponseProto response = callback.get(); + LOG.debug("Batch.Call got result " + response); + return null; + } + } + ); + for (Map.Entry<byte[], String> e : results.entrySet()) { + LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); + } + assertEquals(3, results.size()); + for (HRegionLocation region : regions) { + HRegionInfo info = region.getRegionInfo(); + LOG.info("Region info is "+info.getRegionNameAsString()); + assertTrue(results.containsKey(info.getRegionName())); + assertNull(results.get(info.getRegionName())); + } + } finally { + table.close(); + } + } + + @Test + public void testMasterCoprocessorService() throws Throwable { + Admin admin = util.getHBaseAdmin(); + final TestProtos.EchoRequestProto request = + TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = + TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); + assertEquals("hello", service.echo(null, request).getMessage()); + } + + @Test + public void testCoprocessorError() throws Exception { + Configuration configuration = new Configuration(util.getConfiguration()); + // Make it not retry forever + configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + Table table = util.getConnection().getTable(TEST_TABLE); + + try { + CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]); + + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = + TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol); + + service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance()); + fail("Should have thrown an exception"); + } catch (ServiceException e) { + } finally { + table.close(); + } + } + + @Test + public void testMasterCoprocessorError() throws Throwable { + Admin admin = util.getHBaseAdmin(); + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = + TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); + try { + service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance()); + fail("Should have thrown an exception"); + } catch (ServiceException e) { + } + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); + } + return ret; + } + +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java new file mode 100644 index 0000000..4913acf --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestCoprocessorTableEndpoint { + + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static final byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 5; + private static final int rowSeperator2 = 12; + private static final byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testCoprocessorTableEndpoint() throws Throwable { + final TableName tableName = TableName.valueOf("testCoprocessorTableEndpoint"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); + desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName()); + + createTable(desc); + verifyTable(tableName); + } + + @Test + public void testDynamicCoprocessorTableEndpoint() throws Throwable { + final TableName tableName = TableName.valueOf("testDynamicCoprocessorTableEndpoint"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); + + createTable(desc); + + desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName()); + updateTable(desc); + + verifyTable(tableName); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); + } + return ret; + } + + private static Map<byte [], Long> sum(final Table table, final byte [] family, + final byte [] qualifier, final byte [] start, final byte [] end) + throws ServiceException, Throwable { + return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, + start, end, + new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() { + @Override + public Long call(ColumnAggregationProtos.ColumnAggregationService instance) + throws IOException { + CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse>(); + ColumnAggregationProtos.SumRequest.Builder builder = + ColumnAggregationProtos.SumRequest.newBuilder(); + builder.setFamily(ByteString.copyFrom(family)); + if (qualifier != null && qualifier.length > 0) { + builder.setQualifier(ByteString.copyFrom(qualifier)); + } + instance.sum(null, builder.build(), rpcCallback); + return rpcCallback.get().getSum(); + } + }); + } + + private static final void createTable(HTableDescriptor desc) throws Exception { + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); + TEST_UTIL.waitUntilAllRegionsAssigned(desc.getTableName()); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + try { + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + } finally { + table.close(); + } + } + + private static void updateTable(HTableDescriptor desc) throws Exception { + Admin admin = TEST_UTIL.getHBaseAdmin(); + admin.disableTable(desc.getTableName()); + admin.modifyTable(desc.getTableName(), desc); + admin.enableTable(desc.getTableName()); + } + + private static final void verifyTable(TableName tableName) throws Throwable { + Table table = TEST_UTIL.getConnection().getTable(tableName); + try { + Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], + ROWS[ROWS.length-1]); + int sumResult = 0; + int expectedResult = 0; + for (Map.Entry<byte[], Long> e : results.entrySet()) { + sumResult += e.getValue(); + } + for (int i = 0; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + + // scan: for region 2 and region 3 + results.clear(); + results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length-1]); + sumResult = 0; + expectedResult = 0; + for (Map.Entry<byte[], Long> e : results.entrySet()) { + sumResult += e.getValue(); + } + for (int i = rowSeperator1; i < ROWSIZE; i++) { + expectedResult += i; + } + assertEquals("Invalid result", expectedResult, sumResult); + } finally { + table.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java new file mode 100644 index 0000000..31646f8 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestRegionServerCoprocessorEndpoint { + public static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt"); + private static HBaseTestingUtility TEST_UTIL = null; + private static Configuration CONF = null; + private static final String DUMMY_VALUE = "val"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + CONF = TEST_UTIL.getConfiguration(); + CONF.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + DummyRegionServerEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testEndpoint() throws Exception { + final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + final ServerRpcController controller = new ServerRpcController(); + final CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse> + rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>(); + DummyRegionServerEndpointProtos.DummyService service = + ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class, + TEST_UTIL.getHBaseAdmin().coprocessorService(serverName)); + service.dummyCall(controller, + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback); + assertEquals(DUMMY_VALUE, rpcCallback.get().getValue()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + } + + @Test + public void testEndpointExceptions() throws Exception { + final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + final ServerRpcController controller = new ServerRpcController(); + final CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse> + rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>(); + DummyRegionServerEndpointProtos.DummyService service = + ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class, + TEST_UTIL.getHBaseAdmin().coprocessorService(serverName)); + service.dummyThrow(controller, + DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback); + assertEquals(null, rpcCallback.get()); + assertTrue(controller.failedOnException()); + assertEquals(WHAT_TO_THROW.getClass().getName().trim(), + ((RemoteWithExtrasException) controller.getFailedOn().getCause()).getClassName().trim()); + } + + static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService { + + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void dummyCall(RpcController controller, DummyRequest request, + RpcCallback<DummyResponse> callback) { + callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); + } + + @Override + public void dummyThrow(RpcController controller, + DummyRequest request, + RpcCallback<DummyResponse> done) { + CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java new file mode 100644 index 0000000..7cae0bc --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -0,0 +1,665 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService; +import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Verifies ProcessEndpoint works. + * The tested RowProcessor performs two scans and a read-modify-write. + */ +@Category({CoprocessorTests.class, MediumTests.class}) +public class TestRowProcessorEndpoint { + + private static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class); + + private static final TableName TABLE = TableName.valueOf("testtable"); + private final static byte[] ROW = Bytes.toBytes("testrow"); + private final static byte[] ROW2 = Bytes.toBytes("testrow2"); + private final static byte[] FAM = Bytes.toBytes("friendlist"); + + // Column names + private final static byte[] A = Bytes.toBytes("a"); + private final static byte[] B = Bytes.toBytes("b"); + private final static byte[] C = Bytes.toBytes("c"); + private final static byte[] D = Bytes.toBytes("d"); + private final static byte[] E = Bytes.toBytes("e"); + private final static byte[] F = Bytes.toBytes("f"); + private final static byte[] G = Bytes.toBytes("g"); + private final static byte[] COUNTER = Bytes.toBytes("counter"); + private final static AtomicLong myTimer = new AtomicLong(0); + private final AtomicInteger failures = new AtomicInteger(0); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static volatile int expectedCounter = 0; + private static int rowSize, row2Size; + + private volatile static Table table = null; + private volatile static boolean swapped = false; + private volatile CountDownLatch startSignal; + private volatile CountDownLatch doneSignal; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = util.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + RowProcessorEndpoint.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + conf.setLong("hbase.hregion.row.processor.timeout", 1000L); + conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + public void prepareTestData() throws Exception { + try { + util.getHBaseAdmin().disableTable(TABLE); + util.getHBaseAdmin().deleteTable(TABLE); + } catch (Exception e) { + // ignore table not found + } + table = util.createTable(TABLE, FAM); + { + Put put = new Put(ROW); + put.addColumn(FAM, A, Bytes.add(B, C)); // B, C are friends of A + put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B + put.addColumn(FAM, C, G); // G is a friend of C + table.put(put); + rowSize = put.size(); + } + Put put = new Put(ROW2); + put.addColumn(FAM, D, E); + put.addColumn(FAM, F, G); + table.put(put); + row2Size = put.size(); + } + + @Test + public void testDoubleScan() throws Throwable { + prepareTestData(); + + CoprocessorRpcChannel channel = table.coprocessorService(ROW); + RowProcessorEndpoint.FriendsOfFriendsProcessor processor = + new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); + ProcessResponse protoResult = service.process(null, request); + FriendsOfFriendsProcessorResponse response = + FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult()); + Set<String> result = new HashSet<String>(); + result.addAll(response.getResultList()); + Set<String> expected = + new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"})); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells())); + assertEquals(expected, result); + } + + @Test + public void testReadModifyWrite() throws Throwable { + prepareTestData(); + failures.set(0); + int numThreads = 100; + concurrentExec(new IncrementRunner(), numThreads); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells())); + int finalCounter = incrementCounter(table); + assertEquals(numThreads + 1, finalCounter); + assertEquals(0, failures.get()); + } + + class IncrementRunner implements Runnable { + @Override + public void run() { + try { + incrementCounter(table); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + private int incrementCounter(Table table) throws Throwable { + CoprocessorRpcChannel channel = table.coprocessorService(ROW); + RowProcessorEndpoint.IncrementCounterProcessor processor = + new RowProcessorEndpoint.IncrementCounterProcessor(ROW); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); + ProcessResponse protoResult = service.process(null, request); + IncCounterProcessorResponse response = IncCounterProcessorResponse + .parseFrom(protoResult.getRowProcessorResult()); + Integer result = response.getResponse(); + return result; + } + + private void concurrentExec( + final Runnable task, final int numThreads) throws Throwable { + startSignal = new CountDownLatch(numThreads); + doneSignal = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; ++i) { + new Thread(new Runnable() { + @Override + public void run() { + try { + startSignal.countDown(); + startSignal.await(); + task.run(); + } catch (Throwable e) { + failures.incrementAndGet(); + e.printStackTrace(); + } + doneSignal.countDown(); + } + }).start(); + } + doneSignal.await(); + } + + @Test + public void testMultipleRows() throws Throwable { + prepareTestData(); + failures.set(0); + int numThreads = 100; + concurrentExec(new SwapRowsRunner(), numThreads); + LOG.debug("row keyvalues:" + + stringifyKvs(table.get(new Get(ROW)).listCells())); + LOG.debug("row2 keyvalues:" + + stringifyKvs(table.get(new Get(ROW2)).listCells())); + assertEquals(rowSize, table.get(new Get(ROW)).listCells().size()); + assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size()); + assertEquals(0, failures.get()); + } + + class SwapRowsRunner implements Runnable { + @Override + public void run() { + try { + swapRows(table); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + private void swapRows(Table table) throws Throwable { + CoprocessorRpcChannel channel = table.coprocessorService(ROW); + RowProcessorEndpoint.RowSwapProcessor processor = + new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); + service.process(null, request); + } + + @Test + public void testTimeout() throws Throwable { + prepareTestData(); + CoprocessorRpcChannel channel = table.coprocessorService(ROW); + RowProcessorEndpoint.TimeoutProcessor processor = + new RowProcessorEndpoint.TimeoutProcessor(ROW); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); + boolean exceptionCaught = false; + try { + service.process(null, request); + } catch (Exception e) { + exceptionCaught = true; + } + assertTrue(exceptionCaught); + } + + /** + * This class defines two RowProcessors: + * IncrementCounterProcessor and FriendsOfFriendsProcessor. + * + * We define the RowProcessors as the inner class of the endpoint. + * So they can be loaded with the endpoint on the coprocessor. + */ + public static class RowProcessorEndpoint<S extends Message,T extends Message> + extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService { + public static class IncrementCounterProcessor extends + BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest, + IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> { + int counter = 0; + byte[] row = new byte[0]; + + /** + * Empty constructor for Writable + */ + IncrementCounterProcessor() { + } + + IncrementCounterProcessor(byte[] row) { + this.row = row; + } + + @Override + public Collection<byte[]> getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public IncCounterProcessorResponse getResult() { + IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder(); + i.setResponse(counter); + return i.build(); + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, HRegion region, + List<Mutation> mutations, WALEdit walEdit) throws IOException { + // Scan current counter + List<Cell> kvs = new ArrayList<Cell>(); + Scan scan = new Scan(row, row); + scan.addColumn(FAM, COUNTER); + doScan(region, scan, kvs); + counter = kvs.size() == 0 ? 0 : + Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next())); + + // Assert counter value + assertEquals(expectedCounter, counter); + + // Increment counter and send it to both memstore and wal edit + counter += 1; + expectedCounter += 1; + + + Put p = new Put(row); + KeyValue kv = + new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter)); + p.add(kv); + mutations.add(p); + walEdit.add(kv); + + // We can also inject some meta data to the walEdit + KeyValue metaKv = new KeyValue( + row, WALEdit.METAFAMILY, + Bytes.toBytes("I just increment counter"), + Bytes.toBytes(counter)); + walEdit.add(metaKv); + } + + @Override + public IncCounterProcessorRequest getRequestData() throws IOException { + IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder(); + builder.setCounter(counter); + builder.setRow(ByteStringer.wrap(row)); + return builder.build(); + } + + @Override + public void initialize(IncCounterProcessorRequest msg) { + this.row = msg.getRow().toByteArray(); + this.counter = msg.getCounter(); + } + } + + public static class FriendsOfFriendsProcessor extends + BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> { + byte[] row = null; + byte[] person = null; + final Set<String> result = new HashSet<String>(); + + /** + * Empty constructor for Writable + */ + FriendsOfFriendsProcessor() { + } + + FriendsOfFriendsProcessor(byte[] row, byte[] person) { + this.row = row; + this.person = person; + } + + @Override + public Collection<byte[]> getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public FriendsOfFriendsProcessorResponse getResult() { + FriendsOfFriendsProcessorResponse.Builder builder = + FriendsOfFriendsProcessorResponse.newBuilder(); + builder.addAllResult(result); + return builder.build(); + } + + @Override + public boolean readOnly() { + return true; + } + + @Override + public void process(long now, HRegion region, + List<Mutation> mutations, WALEdit walEdit) throws IOException { + List<Cell> kvs = new ArrayList<Cell>(); + { // First scan to get friends of the person + Scan scan = new Scan(row, row); + scan.addColumn(FAM, person); + doScan(region, scan, kvs); + } + + // Second scan to get friends of friends + Scan scan = new Scan(row, row); + for (Cell kv : kvs) { + byte[] friends = CellUtil.cloneValue(kv); + for (byte f : friends) { + scan.addColumn(FAM, new byte[]{f}); + } + } + doScan(region, scan, kvs); + + // Collect result + result.clear(); + for (Cell kv : kvs) { + for (byte b : CellUtil.cloneValue(kv)) { + result.add((char)b + ""); + } + } + } + + @Override + public FriendsOfFriendsProcessorRequest getRequestData() throws IOException { + FriendsOfFriendsProcessorRequest.Builder builder = + FriendsOfFriendsProcessorRequest.newBuilder(); + builder.setPerson(ByteStringer.wrap(person)); + builder.setRow(ByteStringer.wrap(row)); + builder.addAllResult(result); + FriendsOfFriendsProcessorRequest f = builder.build(); + return f; + } + + @Override + public void initialize(FriendsOfFriendsProcessorRequest request) + throws IOException { + this.person = request.getPerson().toByteArray(); + this.row = request.getRow().toByteArray(); + result.clear(); + result.addAll(request.getResultList()); + } + } + + public static class RowSwapProcessor extends + BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> { + byte[] row1 = new byte[0]; + byte[] row2 = new byte[0]; + + /** + * Empty constructor for Writable + */ + RowSwapProcessor() { + } + + RowSwapProcessor(byte[] row1, byte[] row2) { + this.row1 = row1; + this.row2 = row2; + } + + @Override + public Collection<byte[]> getRowsToLock() { + List<byte[]> rows = new ArrayList<byte[]>(); + rows.add(row1); + rows.add(row2); + return rows; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public RowSwapProcessorResponse getResult() { + return RowSwapProcessorResponse.getDefaultInstance(); + } + + @Override + public void process(long now, HRegion region, + List<Mutation> mutations, WALEdit walEdit) throws IOException { + + // Override the time to avoid race-condition in the unit test caused by + // inacurate timer on some machines + now = myTimer.getAndIncrement(); + + // Scan both rows + List<Cell> kvs1 = new ArrayList<Cell>(); + List<Cell> kvs2 = new ArrayList<Cell>(); + doScan(region, new Scan(row1, row1), kvs1); + doScan(region, new Scan(row2, row2), kvs2); + + // Assert swapped + if (swapped) { + assertEquals(rowSize, kvs2.size()); + assertEquals(row2Size, kvs1.size()); + } else { + assertEquals(rowSize, kvs1.size()); + assertEquals(row2Size, kvs2.size()); + } + swapped = !swapped; + + // Add and delete keyvalues + List<List<Cell>> kvs = new ArrayList<List<Cell>>(); + kvs.add(kvs1); + kvs.add(kvs2); + byte[][] rows = new byte[][]{row1, row2}; + for (int i = 0; i < kvs.size(); ++i) { + for (Cell kv : kvs.get(i)) { + // Delete from the current row and add to the other row + Delete d = new Delete(rows[i]); + KeyValue kvDelete = + new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), + kv.getTimestamp(), KeyValue.Type.Delete); + d.addDeleteMarker(kvDelete); + Put p = new Put(rows[1 - i]); + KeyValue kvAdd = + new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), + now, CellUtil.cloneValue(kv)); + p.add(kvAdd); + mutations.add(d); + walEdit.add(kvDelete); + mutations.add(p); + walEdit.add(kvAdd); + } + } + } + + @Override + public String getName() { + return "swap"; + } + + @Override + public RowSwapProcessorRequest getRequestData() throws IOException { + RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder(); + builder.setRow1(ByteStringer.wrap(row1)); + builder.setRow2(ByteStringer.wrap(row2)); + return builder.build(); + } + + @Override + public void initialize(RowSwapProcessorRequest msg) { + this.row1 = msg.getRow1().toByteArray(); + this.row2 = msg.getRow2().toByteArray(); + } + } + + public static class TimeoutProcessor extends + BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> { + + byte[] row = new byte[0]; + + /** + * Empty constructor for Writable + */ + public TimeoutProcessor() { + } + + public TimeoutProcessor(byte[] row) { + this.row = row; + } + + public Collection<byte[]> getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public TimeoutProcessorResponse getResult() { + return TimeoutProcessorResponse.getDefaultInstance(); + } + + @Override + public void process(long now, HRegion region, + List<Mutation> mutations, WALEdit walEdit) throws IOException { + try { + // Sleep for a long time so it timeout + Thread.sleep(100 * 1000L); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public boolean readOnly() { + return true; + } + + @Override + public String getName() { + return "timeout"; + } + + @Override + public TimeoutProcessorRequest getRequestData() throws IOException { + TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder(); + builder.setRow(ByteStringer.wrap(row)); + return builder.build(); + } + + @Override + public void initialize(TimeoutProcessorRequest msg) throws IOException { + this.row = msg.getRow().toByteArray(); + } + } + + public static void doScan( + HRegion region, Scan scan, List<Cell> result) throws IOException { + InternalScanner scanner = null; + try { + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + scanner = region.getScanner(scan); + result.clear(); + scanner.next(result); + } finally { + if (scanner != null) scanner.close(); + } + } + } + + static String stringifyKvs(Collection<Cell> kvs) { + StringBuilder out = new StringBuilder(); + out.append("["); + if (kvs != null) { + for (Cell kv : kvs) { + byte[] col = CellUtil.cloneQualifier(kv); + byte[] val = CellUtil.cloneValue(kv); + if (Bytes.equals(col, COUNTER)) { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toInt(val) + " "); + } else { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toStringBinary(val) + " "); + } + } + } + out.append("]"); + return out.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java new file mode 100644 index 0000000..15a2747 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java @@ -0,0 +1,45 @@ +/* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.Descriptors; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestCoprocessorRpcUtils { + @Test + public void testServiceName() throws Exception { + // verify that we de-namespace build in HBase rpc services + Descriptors.ServiceDescriptor authService = + AuthenticationProtos.AuthenticationService.getDescriptor(); + assertEquals(authService.getName(), CoprocessorRpcUtils.getServiceName(authService)); + + // non-hbase rpc services should remain fully qualified + Descriptors.ServiceDescriptor dummyService = + DummyRegionServerEndpointProtos.DummyService.getDescriptor(); + assertEquals(dummyService.getFullName(), CoprocessorRpcUtils.getServiceName(dummyService)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java new file mode 100644 index 0000000..a82900d --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.token.Token; + +/** + * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint + * @deprecated Use for backward compatibility testing only. Will be removed when + * SecureBulkLoadEndpoint is not supported. + */ [email protected] +public class SecureBulkLoadEndpointClient { + private Table table; + + public SecureBulkLoadEndpointClient(Table table) { + this.table = table; + } + + public String prepareBulkLoad(final TableName tableName) throws IOException { + try { + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse>(); + + PrepareBulkLoadRequest request = + PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); + + instance.prepareBulkLoad(controller, request, rpcCallback); + + PrepareBulkLoadResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + + return response.getBulkToken(); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + public void cleanupBulkLoad(final String bulkToken) throws IOException { + try { + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse>(); + + CleanupBulkLoadRequest request = + CleanupBulkLoadRequest.newBuilder() + .setBulkToken(bulkToken).build(); + + instance.cleanupBulkLoad(controller, + request, + rpcCallback); + + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths, + final Token<?> userToken, + final String bulkToken, + final byte[] startRow) throws IOException { + // we never want to send a batch of HFiles to all regions, thus cannot call + // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 + try { + CoprocessorRpcChannel channel = table.coprocessorService(startRow); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + DelegationToken protoDT = + DelegationToken.newBuilder().build(); + if(userToken != null) { + protoDT = + DelegationToken.newBuilder() + .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) + .setPassword(ByteStringer.wrap(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + } + + List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths = + new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>(); + for(Pair<byte[], String> el: familyPaths) { + protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() + .setFamily(ByteStringer.wrap(el.getFirst())) + .setPath(el.getSecond()).build()); + } + + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() + .setFsToken(protoDT) + .addAllFamilyPath(protoFamilyPaths) + .setBulkToken(bulkToken).build(); + + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> + rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>(); + instance.secureBulkLoadHFiles(controller, + request, + rpcCallback); + + SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return response.getLoaded(); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + +}
