Repository: hbase Updated Branches: refs/heads/master 7c51d3f2e -> 9e53f2927
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 0fe79d1..199c2c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -60,13 +60,13 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java new file mode 100644 index 0000000..3ebda29 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.hbase.security.UserProvider; + +/** + * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying + * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used + * to do the authentication, which requires a Kerberos ticket (which we currently don't have in + * tests). + * <p> + * This should only be used for <b>TESTING</b>. + */ +public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider { + + @Override + public boolean isHBaseSecurityEnabled() { + return false; + } + + @Override + public boolean isHadoopSecurityEnabled() { + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 6583366..1e38179 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -25,6 +25,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -93,7 +99,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.locking.LockProcedure; @@ -118,11 +123,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; -import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.log4j.Level; @@ -134,11 +137,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; /** * Performs authorization checks for common operations, according to different http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java new file mode 100644 index 0000000..3f7d441 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; + +/** + * This class provides shims for HBase to interact with the Hadoop 1.0.x and the + * Hadoop 0.23.x series. + * + * NOTE: No testing done against 0.22.x, or 0.21.x. + */ +abstract public class MapreduceTestingShim { + private static MapreduceTestingShim instance; + private static Class[] emptyParam = new Class[] {}; + + static { + try { + // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x + Class c = Class + .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); + instance = new MapreduceV2Shim(); + } catch (Exception e) { + instance = new MapreduceV1Shim(); + } + } + + abstract public JobContext newJobContext(Configuration jobConf) + throws IOException; + + abstract public Job newJob(Configuration conf) throws IOException; + + abstract public JobConf obtainJobConf(MiniMRCluster cluster); + + abstract public String obtainMROutputDirProp(); + + public static JobContext createJobContext(Configuration jobConf) + throws IOException { + return instance.newJobContext(jobConf); + } + + public static JobConf getJobConf(MiniMRCluster cluster) { + return instance.obtainJobConf(cluster); + } + + public static Job createJob(Configuration conf) throws IOException { + return instance.newJob(conf); + } + + public static String getMROutputDirProp() { + return instance.obtainMROutputDirProp(); + } + + private static class MapreduceV1Shim extends MapreduceTestingShim { + public JobContext newJobContext(Configuration jobConf) throws IOException { + // Implementing: + // return new JobContext(jobConf, new JobID()); + JobID jobId = new JobID(); + Constructor<JobContext> c; + try { + c = JobContext.class.getConstructor(Configuration.class, JobID.class); + return c.newInstance(jobConf, jobId); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to instantiate new JobContext(jobConf, new JobID())", e); + } + } + + @Override + public Job newJob(Configuration conf) throws IOException { + // Implementing: + // return new Job(conf); + Constructor<Job> c; + try { + c = Job.class.getConstructor(Configuration.class); + return c.newInstance(conf); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to instantiate new Job(conf)", e); + } + } + + public JobConf obtainJobConf(MiniMRCluster cluster) { + if (cluster == null) return null; + try { + Object runner = cluster.getJobTrackerRunner(); + Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam); + Object tracker = meth.invoke(runner, new Object []{}); + Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam); + return (JobConf) m.invoke(tracker, new Object []{}); + } catch (NoSuchMethodException nsme) { + return null; + } catch (InvocationTargetException ite) { + return null; + } catch (IllegalAccessException iae) { + return null; + } + } + + @Override + public String obtainMROutputDirProp() { + return "mapred.output.dir"; + } + }; + + private static class MapreduceV2Shim extends MapreduceTestingShim { + public JobContext newJobContext(Configuration jobConf) { + return newJob(jobConf); + } + + @Override + public Job newJob(Configuration jobConf) { + // Implementing: + // return Job.getInstance(jobConf); + try { + Method m = Job.class.getMethod("getInstance", Configuration.class); + return (Job) m.invoke(null, jobConf); // static method, then arg + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException( + "Failed to return from Job.getInstance(jobConf)"); + } + } + + public JobConf obtainJobConf(MiniMRCluster cluster) { + try { + Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam); + return (JobConf) meth.invoke(cluster, new Object []{}); + } catch (NoSuchMethodException nsme) { + return null; + } catch (InvocationTargetException ite) { + return null; + } catch (IllegalAccessException iae) { + return null; + } + } + + @Override + public String obtainMROutputDirProp() { + // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR + // from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile. + return "mapreduce.output.fileoutputformat.outputdir"; + } + }; + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java new file mode 100644 index 0000000..7e4d40e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java @@ -0,0 +1,723 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run + * faster than the full MR cluster tests in TestHFileOutputFormat + */ +@Category({ MiscTests.class, LargeTests.class }) +public class TestLoadIncrementalHFiles { + @Rule + public TestName tn = new TestName(); + + private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); + private static final byte[] FAMILY = Bytes.toBytes("myfam"); + private static final String NAMESPACE = "bulkNS"; + + static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; + static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; + + private static final byte[][] SPLIT_KEYS = + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; + + static HBaseTestingUtility util = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); + // change default behavior so that tag values are returned with normal rpcs + util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, + KeyValueCodecWithTags.class.getCanonicalName()); + util.startMiniCluster(); + + setupNamespace(); + } + + protected static void setupNamespace() throws Exception { + util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test(timeout = 120000) + public void testSimpleLoadWithMap() throws Exception { + runTest("testSimpleLoadWithMap", BloomType.NONE, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, + true); + } + + /** + * Test case that creates some regions and loads HFiles that fit snugly inside those regions + */ + @Test(timeout = 120000) + public void testSimpleLoad() throws Exception { + runTest("testSimpleLoad", BloomType.NONE, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); + } + + @Test(timeout = 120000) + public void testSimpleLoadWithFileCopy() throws Exception { + String testName = tn.getMethodName(); + final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); + runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE, + false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, + false, true); + } + + /** + * Test case that creates some regions and loads HFiles that cross the boundaries of those regions + */ + @Test(timeout = 120000) + public void testRegionCrossingLoad() throws Exception { + runTest("testRegionCrossingLoad", BloomType.NONE, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, + new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); + } + + /** + * Test loading into a column family that has a ROW bloom filter. + */ + @Test(timeout = 60000) + public void testRegionCrossingRowBloom() throws Exception { + runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, + new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); + } + + /** + * Test loading into a column family that has a ROWCOL bloom filter. + */ + @Test(timeout = 120000) + public void testRegionCrossingRowColBloom() throws Exception { + runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, + new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); + } + + /** + * Test case that creates some regions and loads HFiles that have different region boundaries than + * the table pre-split. + */ + @Test(timeout = 120000) + public void testSimpleHFileSplit() throws Exception { + runTest("testHFileSplit", BloomType.NONE, + new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), + Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, + new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); + } + + /** + * Test case that creates some regions and loads HFiles that cross the boundaries and have + * different region boundaries than the table pre-split. + */ + @Test(timeout = 60000) + public void testRegionCrossingHFileSplit() throws Exception { + testRegionCrossingHFileSplit(BloomType.NONE); + } + + /** + * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom + * filter and a different region boundaries than the table pre-split. + */ + @Test(timeout = 120000) + public void testRegionCrossingHFileSplitRowBloom() throws Exception { + testRegionCrossingHFileSplit(BloomType.ROW); + } + + /** + * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL + * bloom filter and a different region boundaries than the table pre-split. + */ + @Test(timeout = 120000) + public void testRegionCrossingHFileSplitRowColBloom() throws Exception { + testRegionCrossingHFileSplit(BloomType.ROWCOL); + } + + @Test + public void testSplitALot() throws Exception { + runTest("testSplitALot", BloomType.NONE, + new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), + Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), + Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), + Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), + Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), + Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); + } + + private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { + runTest("testHFileSplit" + bloomType + "Bloom", bloomType, + new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), + Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, + new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, + new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); + } + + private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { + return TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) + .build(); + } + + private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) + throws Exception { + runTest(testName, bloomType, null, hfileRanges); + } + + private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) + throws Exception { + runTest(testName, bloomType, null, hfileRanges, useMap); + } + + private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, + byte[][][] hfileRanges) throws Exception { + runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); + } + + private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap) throws Exception { + final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); + final boolean preCreateTable = tableSplitKeys != null; + + // Run the test bulkloading the table to the default namespace + final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); + runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, + useMap); + + // Run the test bulkloading the table to the specified namespace + final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); + runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, + useMap); + } + + private void runTest(String testName, TableName tableName, BloomType bloomType, + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + throws Exception { + TableDescriptor htd = buildHTD(tableName, bloomType); + runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); + } + + public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, + int initRowCount, int factor) throws Exception { + Path dir = util.getDataTestDirOnTestFS(testName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path familyDir = new Path(dir, Bytes.toString(fam)); + + int hfileIdx = 0; + Map<byte[], List<Path>> map = null; + List<Path> list = null; + if (useMap || copyFiles) { + list = new ArrayList<>(); + } + if (useMap) { + map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + map.put(fam, list); + } + Path last = null; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + Path path = new Path(familyDir, "hfile_" + hfileIdx++); + HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); + if (useMap) { + last = path; + list.add(path); + } + } + int expectedRows = hfileIdx * factor; + + TableName tableName = htd.getTableName(); + if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { + util.getAdmin().createTable(htd, tableSplitKeys); + } + + Configuration conf = util.getConfiguration(); + if (copyFiles) { + conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); + } + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + String[] args = { dir.toString(), tableName.toString() }; + if (useMap) { + if (deleteFile) { + fs.delete(last, true); + } + Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName); + if (deleteFile) { + expectedRows -= 1000; + for (LoadQueueItem item : loaded.keySet()) { + if (item.getFilePath().getName().equals(last.getName())) { + fail(last + " should be missing"); + } + } + } + } else { + loader.run(args); + } + + if (copyFiles) { + for (Path p : list) { + assertTrue(p + " should exist", fs.exists(p)); + } + } + + Table table = util.getConnection().getTable(tableName); + try { + assertEquals(initRowCount + expectedRows, util.countRows(table)); + } finally { + table.close(); + } + + return expectedRows; + } + + private void runTest(String testName, TableDescriptor htd, BloomType bloomType, + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, + boolean copyFiles) throws Exception { + loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, + useMap, true, copyFiles, 0, 1000); + + final TableName tableName = htd.getTableName(); + // verify staging folder has been cleaned up + Path stagingBasePath = + new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME); + FileSystem fs = util.getTestFileSystem(); + if (fs.exists(stagingBasePath)) { + FileStatus[] files = fs.listStatus(stagingBasePath); + for (FileStatus file : files) { + assertTrue("Folder=" + file.getPath() + " is not cleaned up.", + file.getPath().getName() != "DONOTERASE"); + } + } + + util.deleteTable(tableName); + } + + /** + * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the + * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the + * responses. + */ + @Test(timeout = 60000) + public void testTagsSurviveBulkLoadSplit() throws Exception { + Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + // table has these split points + byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), + Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; + + // creating an hfile that has values that span the split points. + byte[] from = Bytes.toBytes("ddd"); + byte[] to = Bytes.toBytes("ooo"); + HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, + new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); + int expectedRows = 1000; + + TableName tableName = TableName.valueOf(tn.getMethodName()); + TableDescriptor htd = buildHTD(tableName, BloomType.NONE); + util.getAdmin().createTable(htd, tableSplitKeys); + + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); + String[] args = { dir.toString(), tableName.toString() }; + loader.run(args); + + Table table = util.getConnection().getTable(tableName); + try { + assertEquals(expectedRows, util.countRows(table)); + HFileTestUtil.verifyTags(table); + } finally { + table.close(); + } + + util.deleteTable(tableName); + } + + /** + * Test loading into a column family that does not exist. + */ + @Test(timeout = 60000) + public void testNonexistentColumnFamilyLoad() throws Exception { + String testName = tn.getMethodName(); + byte[][][] hFileRanges = + new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, + new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; + + byte[] TABLE = Bytes.toBytes("mytable_" + testName); + // set real family name to upper case in purpose to simulate the case that + // family name in HFiles is invalid + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) + .addColumnFamily(ColumnFamilyDescriptorBuilder + .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) + .build(); + + try { + runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false); + assertTrue("Loading into table with non-existent family should have failed", false); + } catch (Exception e) { + assertTrue("IOException expected", e instanceof IOException); + // further check whether the exception message is correct + String errMsg = e.getMessage(); + assertTrue( + "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + + "], current message: [" + errMsg + "]", + errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); + } + } + + @Test(timeout = 120000) + public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { + testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); + } + + @Test(timeout = 120000) + public void testNonHfileFolder() throws Exception { + testNonHfileFolder("testNonHfileFolder", false); + } + + /** + * Write a random data file and a non-file in a dir with a valid family name but not part of the + * table families. we should we able to bulkload without getting the unmatched family exception. + * HBASE-13037/HBASE-13227 + */ + private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { + Path dir = util.getDataTestDirOnTestFS(tableName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, + QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); + createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); + + final String NON_FAMILY_FOLDER = "_logs"; + Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); + fs.mkdirs(nonFamilyDir); + fs.mkdirs(new Path(nonFamilyDir, "non-file")); + createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); + + Table table = null; + try { + if (preCreateTable) { + table = util.createTable(TableName.valueOf(tableName), FAMILY); + } else { + table = util.getConnection().getTable(TableName.valueOf(tableName)); + } + + final String[] args = { dir.toString(), tableName }; + new LoadIncrementalHFiles(util.getConfiguration()).run(args); + assertEquals(500, util.countRows(table)); + } finally { + if (table != null) { + table.close(); + } + fs.delete(dir, true); + } + } + + private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { + FSDataOutputStream stream = fs.create(path); + try { + byte[] data = new byte[1024]; + for (int i = 0; i < data.length; ++i) { + data[i] = (byte) (i & 0xff); + } + while (size >= data.length) { + stream.write(data, 0, data.length); + size -= data.length; + } + if (size > 0) { + stream.write(data, 0, size); + } + } finally { + stream.close(); + } + } + + @Test(timeout = 120000) + public void testSplitStoreFile() throws IOException { + Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); + FileSystem fs = util.getTestFileSystem(); + Path testIn = new Path(dir, "testhfile"); + ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, + Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); + + Path bottomOut = new Path(dir, "bottom.out"); + Path topOut = new Path(dir, "top.out"); + + LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, + Bytes.toBytes("ggg"), bottomOut, topOut); + + int rowCount = verifyHFile(bottomOut); + rowCount += verifyHFile(topOut); + assertEquals(1000, rowCount); + } + + @Test + public void testSplitStoreFileWithNoneToNone() throws IOException { + testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); + } + + @Test + public void testSplitStoreFileWithEncodedToEncoded() throws IOException { + testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); + } + + @Test + public void testSplitStoreFileWithEncodedToNone() throws IOException { + testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); + } + + @Test + public void testSplitStoreFileWithNoneToEncoded() throws IOException { + testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); + } + + private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, + DataBlockEncoding cfEncoding) throws IOException { + Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); + FileSystem fs = util.getTestFileSystem(); + Path testIn = new Path(dir, "testhfile"); + ColumnFamilyDescriptor familyDesc = + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); + HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, + bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); + + Path bottomOut = new Path(dir, "bottom.out"); + Path topOut = new Path(dir, "top.out"); + + LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, + Bytes.toBytes("ggg"), bottomOut, topOut); + + int rowCount = verifyHFile(bottomOut); + rowCount += verifyHFile(topOut); + assertEquals(1000, rowCount); + } + + private int verifyHFile(Path p) throws IOException { + Configuration conf = util.getConfiguration(); + HFile.Reader reader = + HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); + reader.loadFileInfo(); + HFileScanner scanner = reader.getScanner(false, false); + scanner.seekTo(); + int count = 0; + do { + count++; + } while (scanner.next()); + assertTrue(count > 0); + reader.close(); + return count; + } + + private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { + Integer value = map.containsKey(first) ? map.get(first) : 0; + map.put(first, value + 1); + + value = map.containsKey(last) ? map.get(last) : 0; + map.put(last, value - 1); + } + + @Test(timeout = 120000) + public void testInferBoundaries() { + TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + /* + * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s + * u----w Should be inferred as: a-----------------k m-------------q r--------------t + * u---------x The output should be (m,r,u) + */ + + String first; + String last; + + first = "a"; + last = "e"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "r"; + last = "s"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "o"; + last = "p"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "g"; + last = "k"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "v"; + last = "x"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "c"; + last = "i"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "m"; + last = "q"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "s"; + last = "t"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + first = "u"; + last = "w"; + addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); + + byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); + byte[][] compare = new byte[3][]; + compare[0] = "m".getBytes(); + compare[1] = "r".getBytes(); + compare[2] = "u".getBytes(); + + assertEquals(keysArray.length, 3); + + for (int row = 0; row < keysArray.length; row++) { + assertArrayEquals(keysArray[row], compare[row]); + } + } + + @Test(timeout = 60000) + public void testLoadTooMayHFiles() throws Exception { + Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + + byte[] from = Bytes.toBytes("begin"); + byte[] to = Bytes.toBytes("end"); + for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), + FAMILY, QUALIFIER, from, to, 1000); + } + + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); + String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" }; + try { + loader.run(args); + fail("Bulk loading too many files should fail"); + } catch (IOException ie) { + assertTrue(ie.getMessage() + .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); + } + } + + @Test(expected = TableNotFoundException.class) + public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { + Configuration conf = util.getConfiguration(); + conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + String[] args = { "directory", "nonExistingTable" }; + loader.run(args); + } + + @Test(timeout = 120000) + public void testTableWithCFNameStartWithUnderScore() throws Exception { + Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + String family = "_cf"; + Path familyDir = new Path(dir, family); + + byte[] from = Bytes.toBytes("begin"); + byte[] to = Bytes.toBytes("end"); + Configuration conf = util.getConfiguration(); + String tableName = tn.getMethodName(); + Table table = util.createTable(TableName.valueOf(tableName), family); + HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), + QUALIFIER, from, to, 1000); + + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + String[] args = { dir.toString(), tableName }; + try { + loader.run(args); + assertEquals(1000, util.countRows(table)); + } finally { + if (null != table) { + table.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java new file mode 100644 index 0000000..414a6cb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java @@ -0,0 +1,628 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +/** + * Test cases for the atomic load error handling of the bulk load functionality. + */ +@Category({ MiscTests.class, LargeTests.class }) +public class TestLoadIncrementalHFilesSplitRecovery { + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); + + static HBaseTestingUtility util; + // used by secure subclass + static boolean useSecure = false; + + final static int NUM_CFS = 10; + final static byte[] QUAL = Bytes.toBytes("qual"); + final static int ROWCOUNT = 100; + + private final static byte[][] families = new byte[NUM_CFS][]; + + @Rule + public TestName name = new TestName(); + + static { + for (int i = 0; i < NUM_CFS; i++) { + families[i] = Bytes.toBytes(family(i)); + } + } + + static byte[] rowkey(int i) { + return Bytes.toBytes(String.format("row_%08d", i)); + } + + static String family(int i) { + return String.format("family_%04d", i); + } + + static byte[] value(int i) { + return Bytes.toBytes(String.format("%010d", i)); + } + + public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { + byte[] val = value(value); + for (int i = 0; i < NUM_CFS; i++) { + Path testIn = new Path(dir, family(i)); + + TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), + Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); + } + } + + private TableDescriptor createTableDesc(TableName name, int cfs) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); + IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) + .forEachOrdered(builder::addColumnFamily); + return builder.build(); + } + + /** + * Creates a table with given table name and specified number of column families if the table does + * not already exist. + */ + private void setupTable(final Connection connection, TableName table, int cfs) + throws IOException { + try { + LOG.info("Creating table " + table); + try (Admin admin = connection.getAdmin()) { + admin.createTable(createTableDesc(table, cfs)); + } + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + /** + * Creates a table with given table name,specified number of column families<br> + * and splitkeys if the table does not already exist. + * @param table + * @param cfs + * @param SPLIT_KEYS + */ + private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) + throws IOException { + try { + LOG.info("Creating table " + table); + util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + private Path buildBulkFiles(TableName table, int value) throws Exception { + Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); + Path bulk1 = new Path(dir, table.getNameAsString() + value); + FileSystem fs = util.getTestFileSystem(); + buildHFiles(fs, bulk1, value); + return bulk1; + } + + /** + * Populate table with known values. + */ + private void populateTable(final Connection connection, TableName table, int value) + throws Exception { + // create HFiles for different column families + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); + Path bulk1 = buildBulkFiles(table, value); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk1, admin, t, locator); + } + } + + /** + * Split the known table in half. (this is hard coded for this test suite) + */ + private void forceSplit(TableName table) { + try { + // need to call regions server to by synchronous but isn't visible. + HRegionServer hrs = util.getRSForFirstRegionInTable(table); + + for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { + if (hri.getTable().equals(table)) { + util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); + // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); + } + } + + // verify that split completed. + int regions; + do { + regions = 0; + for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { + if (hri.getTable().equals(table)) { + regions++; + } + } + if (regions != 2) { + LOG.info("Taking some time to complete split..."); + Thread.sleep(250); + } + } while (regions != 2); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @BeforeClass + public static void setupCluster() throws Exception { + util = new HBaseTestingUtility(); + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + util.startMiniCluster(1); + } + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdownMiniCluster(); + } + + /** + * Checks that all columns have the expected value and that there is the expected number of rows. + * @throws IOException + */ + void assertExpectedTable(TableName table, int count, int value) throws IOException { + List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString()); + assertEquals(htds.size(), 1); + try (Table t = util.getConnection().getTable(table); + ResultScanner sr = t.getScanner(new Scan())) { + int i = 0; + for (Result r; (r = sr.next()) != null;) { + r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) + .forEach(v -> assertArrayEquals(value(value), v)); + i++; + } + assertEquals(count, i); + } catch (IOException e) { + fail("Failed due to exception"); + } + } + + /** + * Test that shows that exception thrown from the RS side will result in an exception on the + * LIHFile client. + */ + @Test(expected = IOException.class, timeout = 120000) + public void testBulkLoadPhaseFailure() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + final AtomicInteger attmptedCalls = new AtomicInteger(); + final AtomicInteger failedCalls = new AtomicInteger(); + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, table, 10); + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override + protected List<LoadQueueItem> tryAtomicRegionLoad( + ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first, + Collection<LoadQueueItem> lqis) throws IOException { + int i = attmptedCalls.incrementAndGet(); + if (i == 1) { + Connection errConn; + try { + errConn = getMockedConnection(util.getConfiguration()); + serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true); + } catch (Exception e) { + LOG.fatal("mocking cruft, should never happen", e); + throw new RuntimeException("mocking cruft, should never happen"); + } + failedCalls.incrementAndGet(); + return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); + } + + return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); + } + }; + try { + // create HFiles for different column families + Path dir = buildBulkFiles(table, 1); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(dir, admin, t, locator); + } + } finally { + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + } + fail("doBulkLoad should have thrown an exception"); + } + } + + /** + * Test that shows that exception thrown from the RS side will result in the expected number of + * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when + * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set + */ + @Test + public void testRetryOnIOException() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + final AtomicInteger calls = new AtomicInteger(1); + final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); + final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override + protected List<LoadQueueItem> tryAtomicRegionLoad( + ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first, + Collection<LoadQueueItem> lqis) throws IOException { + if (calls.getAndIncrement() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - + 1) { + ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn, + tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(), + HConstants.PRIORITY_UNSET) { + @Override + public byte[] rpcCall() throws Exception { + throw new IOException("Error calling something on RegionServer"); + } + }; + return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); + } else { + return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); + } + } + }; + setupTable(conn, table, 10); + Path dir = buildBulkFiles(table, 1); + lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); + util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); + + } + + private ClusterConnection getMockedConnection(final Configuration conf) + throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException { + ClusterConnection c = Mockito.mock(ClusterConnection.class); + Mockito.when(c.getConfiguration()).thenReturn(conf); + Mockito.doNothing().when(c).close(); + // Make it so we return a particular location when asked. + final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, + ServerName.valueOf("example.org", 1234, 0)); + Mockito.when( + c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())) + .thenReturn(loc); + Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc); + ClientProtos.ClientService.BlockingInterface hri = + Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); + Mockito + .when( + hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) + .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); + Mockito.when(c.getClient(Mockito.any(ServerName.class))).thenReturn(hri); + return c; + } + + /** + * This test exercises the path where there is a split after initial validation but before the + * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a + * split just before the atomic region load. + */ + @Test(timeout = 120000) + public void testSplitWhileBulkLoadPhase() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, table, 10); + populateTable(connection, table, 1); + assertExpectedTable(table, ROWCOUNT, 1); + + // Now let's cause trouble. This will occur after checks and cause bulk + // files to fail when attempt to atomically import. This is recoverable. + final AtomicInteger attemptedCalls = new AtomicInteger(); + LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override + protected void bulkLoadPhase(final Table htable, final Connection conn, + ExecutorService pool, Deque<LoadQueueItem> queue, + final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, + Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { + int i = attemptedCalls.incrementAndGet(); + if (i == 1) { + // On first attempt force a split. + forceSplit(table); + } + super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); + } + }; + + // create HFiles for different column families + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + Path bulk = buildBulkFiles(table, 2); + lih2.doBulkLoad(bulk, admin, t, locator); + } + + // check that data was loaded + // The three expected attempts are 1) failure because need to split, 2) + // load of split top 3) load of split bottom + assertEquals(attemptedCalls.get(), 3); + assertExpectedTable(table, ROWCOUNT, 2); + } + } + + /** + * This test splits a table and attempts to bulk load. The bulk import files should be split + * before atomically importing. + */ + @Test(timeout = 120000) + public void testGroupOrSplitPresplit() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, table, 10); + populateTable(connection, table, 1); + assertExpectedTable(connection, table, ROWCOUNT, 1); + forceSplit(table); + + final AtomicInteger countedLqis = new AtomicInteger(); + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, + final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + Pair<List<LoadQueueItem>, String> lqis = + super.groupOrSplit(regionGroups, item, htable, startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); + } + return lqis; + } + }; + + // create HFiles for different column families + Path bulk = buildBulkFiles(table, 2); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk, admin, t, locator); + } + assertExpectedTable(connection, table, ROWCOUNT, 2); + assertEquals(20, countedLqis.get()); + } + } + + /** + * This test creates a table with many small regions. The bulk load files would be splitted + * multiple times before all of them can be loaded successfully. + */ + @Test(timeout = 120000) + public void testSplitTmpFileCleanUp() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), + Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), + Bytes.toBytes("row_00000050") }; + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTableWithSplitkeys(table, 10, SPLIT_KEYS); + + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); + + // create HFiles + Path bulk = buildBulkFiles(table, 2); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk, admin, t, locator); + } + // family path + Path tmpPath = new Path(bulk, family(0)); + // TMP_DIR under family path + tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); + FileSystem fs = bulk.getFileSystem(util.getConfiguration()); + // HFiles have been splitted, there is TMP_DIR + assertTrue(fs.exists(tmpPath)); + // TMP_DIR should have been cleaned-up + assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", + FSUtils.listStatus(fs, tmpPath)); + assertExpectedTable(connection, table, ROWCOUNT, 2); + } + } + + /** + * This simulates an remote exception which should cause LIHF to exit with an exception. + */ + @Test(expected = IOException.class, timeout = 120000) + public void testGroupOrSplitFailure() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, tableName, 10); + + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { + int i = 0; + + @Override + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, + final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + i++; + + if (i == 5) { + throw new IOException("failure"); + } + return super.groupOrSplit(regionGroups, item, table, startEndKeys); + } + }; + + // create HFiles for different column families + Path dir = buildBulkFiles(tableName, 1); + try (Table t = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(dir, admin, t, locator); + } + } + + fail("doBulkLoad should have thrown an exception"); + } + + @Test(timeout = 120000) + public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; + // Share connection. We were failing to find the table with our new reverse scan because it + // looks for first region, not any region -- that is how it works now. The below removes first + // region in test. Was reliant on the Connection caching having first region. + Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); + Table table = connection.getTable(tableName); + + setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); + Path dir = buildBulkFiles(tableName, 2); + + final AtomicInteger countedLqis = new AtomicInteger(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { + + @Override + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, + final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + Pair<List<LoadQueueItem>, String> lqis = + super.groupOrSplit(regionGroups, item, htable, startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); + } + return lqis; + } + }; + + // do bulkload when there is no region hole in hbase:meta. + try (Table t = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName); + Admin admin = connection.getAdmin()) { + loader.doBulkLoad(dir, admin, t, locator); + } catch (Exception e) { + LOG.error("exeception=", e); + } + // check if all the data are loaded into the table. + this.assertExpectedTable(tableName, ROWCOUNT, 2); + + dir = buildBulkFiles(tableName, 3); + + // Mess it up by leaving a hole in the hbase:meta + List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); + for (HRegionInfo regionInfo : regionInfos) { + if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { + MetaTableAccessor.deleteRegion(connection, regionInfo); + break; + } + } + + try (Table t = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName); + Admin admin = connection.getAdmin()) { + loader.doBulkLoad(dir, admin, t, locator); + } catch (Exception e) { + LOG.error("exception=", e); + assertTrue("IOException expected", e instanceof IOException); + } + + table.close(); + + // Make sure at least the one region that still exists can be found. + regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); + assertTrue(regionInfos.size() >= 1); + + this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); + connection.close(); + } + + /** + * Checks that all columns have the expected value and that there is the expected number of rows. + * @throws IOException + */ + void assertExpectedTable(final Connection connection, TableName table, int count, int value) + throws IOException { + List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString()); + assertEquals(htds.size(), 1); + try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { + int i = 0; + for (Result r; (r = sr.next()) != null;) { + r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) + .forEach(v -> assertArrayEquals(value(value), v)); + i++; + } + assertEquals(count, i); + } catch (IOException e) { + fail("Failed due to exception"); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java new file mode 100644 index 0000000..3d4f4c6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java @@ -0,0 +1,66 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. This suite is unable + * to verify the security handoff/turnover as miniCluster is running as system user thus has root + * privileges and delegation tokens don't seem to work on miniDFS. + * <p> + * Thus SecureBulkload can only be completely verified by running integration tests against a secure + * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be + * supported as part of a LoadIncrementalFiles call. + */ +@Category({ MiscTests.class, LargeTests.class }) +public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // set the always on security provider + UserProvider.setUserProviderForTesting(util.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(util.getConfiguration()); + util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); + // change default behavior so that tag values are returned with normal rpcs + util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, + KeyValueCodecWithTags.class.getCanonicalName()); + + util.startMiniCluster(); + + // Wait for the ACL table to become available + util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME); + + setupNamespace(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java new file mode 100644 index 0000000..58fea9d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Reruns TestSecureLoadIncrementalHFilesSplitRecovery using LoadIncrementalHFiles in secure mode. + * This suite is unable to verify the security handoff/turnove as miniCluster is running as system + * user thus has root privileges and delegation tokens don't seem to work on miniDFS. + * <p> + * Thus SecureBulkload can only be completely verified by running integration tests against a secure + * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be + * supported as part of a LoadIncrementalFiles call. + */ +@Category({ MiscTests.class, LargeTests.class }) +public class TestSecureLoadIncrementalHFilesSplitRecovery + extends TestLoadIncrementalHFilesSplitRecovery { + + // This "overrides" the parent static method + // make sure they are in sync + @BeforeClass + public static void setupCluster() throws Exception { + util = new HBaseTestingUtility(); + // set the always on security provider + UserProvider.setUserProviderForTesting(util.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(util.getConfiguration()); + + util.startMiniCluster(); + + // Wait for the ACL table to become available + util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME); + } + + // Disabling this test as it does not work in secure mode + @Test(timeout = 180000) + @Override + public void testBulkLoadPhaseFailure() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java index f45c0b9..33fbb68 100644 --- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java +++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java index 040546d..2adba32 100644 --- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java @@ -41,7 +41,7 @@ import java.util.List; * path/to/hbase-spark.jar {path/to/output/HFiles} * * This example will output put hfiles in {path/to/output/HFiles}, and user can run - * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example. + * 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example. */ final public class JavaHBaseBulkLoadExample { private JavaHBaseBulkLoadExample() {} http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java index bfacbe8..e383b5e 100644 --- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java +++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala index d2b707e..a427327 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hbase.client.{Get, ConnectionFactory} import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile} -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/src/main/asciidoc/_chapters/ops_mgt.adoc ---------------------------------------------------------------------- diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc index f96cd6c..6f7f9e0 100644 --- a/src/main/asciidoc/_chapters/ops_mgt.adoc +++ b/src/main/asciidoc/_chapters/ops_mgt.adoc @@ -577,7 +577,7 @@ There are two ways to invoke this utility, with explicit classname and via the d .Explicit Classname ---- -$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename> +$ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename> ---- .Driver