Repository: hbase Updated Branches: refs/heads/master 5051ab4e7 -> 70f330dc8
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java new file mode 100644 index 0000000..d55adef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.RegionServerCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import com.google.common.collect.Lists; + +/** + * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be + * removed when old non-secure client for backward compatibility is not supported. + */ +@RunWith(Parameterized.class) +@Category({RegionServerTests.class, LargeTests.class}) +public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad { + public TestHRegionServerBulkLoadWithOldClient(int duration) { + super(duration); + } + + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldClient.class); + + public static class AtomicHFileLoader extends RepeatingTestThread { + final AtomicLong numBulkLoads = new AtomicLong(); + final AtomicLong numCompactions = new AtomicLong(); + private TableName tableName; + + public AtomicHFileLoader(TableName tableName, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.tableName = tableName; + } + + public void doAnAction() throws Exception { + long iteration = numBulkLoads.getAndIncrement(); + Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", + iteration)); + + // create HFiles for different column families + FileSystem fs = UTIL.getTestFileSystem(); + byte[] val = Bytes.toBytes(String.format("%010d", iteration)); + final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>( + NUM_CFS); + for (int i = 0; i < NUM_CFS; i++) { + Path hfile = new Path(dir, family(i)); + byte[] fam = Bytes.toBytes(family(i)); + createHFile(fs, hfile, fam, QUAL, val, 1000); + famPaths.add(new Pair<>(fam, hfile.toString())); + } + + // bulk load HFiles + final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); + RegionServerCallable<Void> callable = + new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + @Override + public Void call(int callTimeout) throws Exception { + LOG.info("Non-secure old client"); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + BulkLoadHFileRequest request = + RequestConverter + .buildBulkLoadHFileRequest(famPaths, regionName, true, null, null); + getStub().bulkLoadHFile(null, request); + return null; + } + }; + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); + caller.callWithRetries(callable, Integer.MAX_VALUE); + + // Periodically do compaction to reduce the number of open file handles. + if (numBulkLoads.get() % 5 == 0) { + // 5 * 50 = 250 open file handles! + callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + @Override + public Void call(int callTimeout) throws Exception { + LOG.debug("compacting " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + AdminProtos.AdminService.BlockingInterface server = + conn.getAdmin(getLocation().getServerName()); + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest( + getLocation().getRegionInfo().getRegionName(), true, null); + server.compactRegion(null, request); + numCompactions.incrementAndGet(); + return null; + } + }; + caller.callWithRetries(callable, Integer.MAX_VALUE); + } + } + } + + void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) + throws Exception { + setupTable(tableName, 10); + + TestContext ctx = new TestContext(UTIL.getConfiguration()); + + AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); + ctx.addThread(loader); + + List<AtomicScanReader> scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Loaders:"); + LOG.info(" loaded " + loader.numBulkLoads.get()); + LOG.info(" compations " + loader.numCompactions.get()); + + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java new file mode 100644 index 0000000..6de6261 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.RegionServerCallable; +import org.apache.hadoop.hbase.client.RpcRetryingCaller; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.SecureBulkLoadClient; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.common.collect.Lists; + +/** + * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be + * removed when old non-secure client for backward compatibility is not supported. + */ +@RunWith(Parameterized.class) +@Category({RegionServerTests.class, LargeTests.class}) +public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad { + public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) { + super(duration); + } + + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + conf.setInt("hbase.rpc.timeout", 10 * 1000); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + } + + public static class AtomicHFileLoader extends RepeatingTestThread { + final AtomicLong numBulkLoads = new AtomicLong(); + final AtomicLong numCompactions = new AtomicLong(); + private TableName tableName; + + public AtomicHFileLoader(TableName tableName, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.tableName = tableName; + } + + public void doAnAction() throws Exception { + long iteration = numBulkLoads.getAndIncrement(); + Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", + iteration)); + + // create HFiles for different column families + FileSystem fs = UTIL.getTestFileSystem(); + byte[] val = Bytes.toBytes(String.format("%010d", iteration)); + final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>( + NUM_CFS); + for (int i = 0; i < NUM_CFS; i++) { + Path hfile = new Path(dir, family(i)); + byte[] fam = Bytes.toBytes(family(i)); + createHFile(fs, hfile, fam, QUAL, val, 1000); + famPaths.add(new Pair<>(fam, hfile.toString())); + } + + // bulk load HFiles + final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); + Table table = conn.getTable(tableName); + final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); + RegionServerCallable<Void> callable = + new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + @Override + public Void call(int callTimeout) throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + try (Table table = conn.getTable(getTableName())) { + boolean loaded = + new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null, + bulkToken, getLocation().getRegionInfo().getStartKey()); + } + return null; + } + }; + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); + caller.callWithRetries(callable, Integer.MAX_VALUE); + + // Periodically do compaction to reduce the number of open file handles. + if (numBulkLoads.get() % 5 == 0) { + // 5 * 50 = 250 open file handles! + callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + @Override + public Void call(int callTimeout) throws Exception { + LOG.debug("compacting " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + AdminProtos.AdminService.BlockingInterface server = + conn.getAdmin(getLocation().getServerName()); + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest( + getLocation().getRegionInfo().getRegionName(), true, null); + server.compactRegion(null, request); + numCompactions.incrementAndGet(); + return null; + } + }; + caller.callWithRetries(callable, Integer.MAX_VALUE); + } + } + } + + void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) + throws Exception { + setupTable(tableName, 10); + + TestContext ctx = new TestContext(UTIL.getConfiguration()); + + AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); + ctx.addThread(loader); + + List<AtomicScanReader> scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Loaders:"); + LOG.info(" loaded " + loader.numBulkLoads.get()); + LOG.info(" compations " + loader.numCompactions.get()); + + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index edad059..0e60877 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; -import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.CoordinatedStateManager; @@ -61,6 +61,8 @@ public class TestPriorityRpc { public void setup() { Configuration conf = HBaseConfiguration.create(); conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK + final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.getDataTestDir(this.getClass().getName()); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf, cp); priority = regionServer.rpcServices.getPriority(); http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java index f0e7ac9..274fe37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java @@ -99,8 +99,7 @@ public class SecureTestUtil { conf.set("hadoop.security.authentication", "simple"); conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName() + "," + MasterSyncObserver.class.getName()); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName() + - "," + SecureBulkLoadEndpoint.class.getName()); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName()); conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName()); // Need HFile V3 for tags for security features conf.setInt(HFile.FORMAT_VERSION_KEY, 3);