This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 2321d965b79 HBASE-29383 Some tests in TestHFileOutputFormat2 fails in
ShutdownHook.suppressHdfsShutdownHook (#7109)
2321d965b79 is described below
commit 2321d965b790dadac38b9b30183ef922a9f15d80
Author: Duo Zhang <[email protected]>
AuthorDate: Wed Jul 2 22:15:36 2025 +0800
HBASE-29383 Some tests in TestHFileOutputFormat2 fails in
ShutdownHook.suppressHdfsShutdownHook (#7109)
Split TestHFileOutputFormat2 to make it run faster and prevent the tests
affect each other
Signed-off-by: Peng Lu <[email protected]>
---
.../mapreduce/HFileOutputFormat2TestBase.java | 281 +++++++++
.../hbase/mapreduce/MRIncrementalLoadTestBase.java | 252 ++++++++
.../hbase/mapreduce/TestConfigurePartitioner.java | 121 ++++
.../hbase/mapreduce/TestHFileOutputFormat2.java | 655 ++-------------------
.../hbase/mapreduce/TestMRIncrementalLoad.java | 55 ++
.../TestMRIncrementalLoadWithLocality.java | 51 ++
6 files changed, 823 insertions(+), 592 deletions(-)
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2TestBase.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2TestBase.java
new file mode 100644
index 00000000000..ac9810a8825
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2TestBase.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public abstract class HFileOutputFormat2TestBase {
+
+ protected static final int ROWSPERSPLIT = 1024;
+ protected static final int DEFAULT_VALUE_LENGTH = 1000;
+
+ public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
+ protected static final byte[][] FAMILIES =
+ { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME,
Bytes.toBytes("-B")) };
+ protected static final TableName[] TABLE_NAMES = Stream
+ .of("TestTable", "TestTable2",
"TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
+
+ protected static HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ /**
+ * Simple mapper that makes KeyValue output.
+ */
+ protected static class RandomKVGeneratingMapper
+ extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
+
+ private int keyLength;
+ protected static final int KEYLEN_DEFAULT = 10;
+ protected static final String KEYLEN_CONF = "randomkv.key.length";
+
+ private int valLength;
+ private static final int VALLEN_DEFAULT = 10;
+ private static final String VALLEN_CONF = "randomkv.val.length";
+ private static final byte[] QUALIFIER = Bytes.toBytes("data");
+ private boolean multiTableMapper = false;
+ private TableName[] tables = null;
+
+ @Override
+ protected void setup(Context context) throws IOException,
InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ multiTableMapper =
+
conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
+ if (multiTableMapper) {
+ tables = TABLE_NAMES;
+ } else {
+ tables = new TableName[] { TABLE_NAMES[0] };
+ }
+ }
+
+ @Override
+ protected void map(NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context
context)
+ throws java.io.IOException, InterruptedException {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+ byte[] key;
+ for (int j = 0; j < tables.length; ++j) {
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ Bytes.random(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+ Bytes.random(valBytes);
+ key = keyBytes;
+ if (multiTableMapper) {
+ key =
MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
+ }
+
+ for (byte[] family : FAMILIES) {
+ Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+ context.write(new ImmutableBytesWritable(key), kv);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Simple mapper that makes Put output.
+ */
+ protected static class RandomPutGeneratingMapper
+ extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
+
+ private int keyLength;
+ protected static final int KEYLEN_DEFAULT = 10;
+ protected static final String KEYLEN_CONF = "randomkv.key.length";
+
+ private int valLength;
+ protected static final int VALLEN_DEFAULT = 10;
+ protected static final String VALLEN_CONF = "randomkv.val.length";
+ protected static final byte[] QUALIFIER = Bytes.toBytes("data");
+ private boolean multiTableMapper = false;
+ private TableName[] tables = null;
+
+ @Override
+ protected void setup(Context context) throws IOException,
InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ multiTableMapper =
+
conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
+ if (multiTableMapper) {
+ tables = TABLE_NAMES;
+ } else {
+ tables = new TableName[] { TABLE_NAMES[0] };
+ }
+ }
+
+ @Override
+ protected void map(NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context
context)
+ throws java.io.IOException, InterruptedException {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+ byte[] key;
+ for (int j = 0; j < tables.length; ++j) {
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ Bytes.random(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+ Bytes.random(valBytes);
+ key = keyBytes;
+ if (multiTableMapper) {
+ key =
MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
+ }
+
+ for (byte[] family : FAMILIES) {
+ Put p = new Put(keyBytes);
+ p.addColumn(family, QUALIFIER, valBytes);
+ // set TTL to very low so that the scan does not return any value
+ p.setTTL(1l);
+ context.write(new ImmutableBytesWritable(key), p);
+ }
+ }
+ }
+ }
+ }
+
+ protected static void setupRandomGeneratorMapper(Job job, boolean
putSortReducer) {
+ if (putSortReducer) {
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(RandomPutGeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+ } else {
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(RandomKVGeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ }
+ }
+
+ protected static byte[][] generateRandomStartKeys(int numKeys) {
+ Random random = ThreadLocalRandom.current();
+ byte[][] ret = new byte[numKeys][];
+ // first region start key is always empty
+ ret[0] = HConstants.EMPTY_BYTE_ARRAY;
+ for (int i = 1; i < numKeys; i++) {
+ ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
+ }
+ return ret;
+ }
+
+ /**
+ * This method takes some time and is done inline uploading data. For
example, doing the mapfile
+ * test, generation of the key and value consumes about 30% of CPU time.
+ * @return Generated random value to insert into a table cell.
+ */
+ protected static byte[] generateData(final Random r, int length) {
+ byte[] b = new byte[length];
+ int i;
+
+ for (i = 0; i < (length - 8); i += 8) {
+ b[i] = (byte) (65 + r.nextInt(26));
+ b[i + 1] = b[i];
+ b[i + 2] = b[i];
+ b[i + 3] = b[i];
+ b[i + 4] = b[i];
+ b[i + 5] = b[i];
+ b[i + 6] = b[i];
+ b[i + 7] = b[i];
+ }
+
+ byte a = (byte) (65 + r.nextInt(26));
+ for (; i < length; i++) {
+ b[i] = a;
+ }
+ return b;
+ }
+
+ protected static byte[][] generateRandomSplitKeys(int numKeys) {
+ Random random = ThreadLocalRandom.current();
+ byte[][] ret = new byte[numKeys][];
+ for (int i = 0; i < numKeys; i++) {
+ ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
+ }
+ return ret;
+ }
+
+ protected static void runIncrementalPELoad(Configuration conf,
+ List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean
putSortReducer)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = Job.getInstance(conf, "testLocalMRIncrementalLoad");
+
job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad"));
+ job.getConfiguration().setStrings("io.serializations",
conf.get("io.serializations"),
+ MutationSerialization.class.getName(),
ResultSerialization.class.getName(),
+ CellSerialization.class.getName());
+ setupRandomGeneratorMapper(job, putSortReducer);
+ if (tableInfo.size() > 1) {
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
+ int sum = 0;
+ for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
+ sum +=
tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
+ }
+ assertEquals(sum, job.getNumReduceTasks());
+ } else {
+ RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
+ HFileOutputFormat2.configureIncrementalLoad(job,
tableInfo.get(0).getTableDescriptor(),
+ regionLocator);
+ assertEquals(regionLocator.getAllRegionLocations().size(),
job.getNumReduceTasks());
+ }
+
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ assertFalse(UTIL.getTestFileSystem().exists(outDir));
+
+ assertTrue(job.waitForCompletion(true));
+ }
+}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MRIncrementalLoadTestBase.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MRIncrementalLoadTestBase.java
new file mode 100644
index 00000000000..ad2f841c19d
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MRIncrementalLoadTestBase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MRIncrementalLoadTestBase extends HFileOutputFormat2TestBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MRIncrementalLoadTestBase.class);
+
+ private static boolean SHOULD_KEEP_LOCALITY;
+
+ private static String[] HOSTNAMES;
+
+ @Parameter(0)
+ public boolean shouldChangeRegions;
+
+ @Parameter(1)
+ public boolean putSortReducer;
+
+ @Parameter(2)
+ public List<String> tableStr;
+
+ private Map<String, Table> allTables;
+
+ private List<HFileOutputFormat2.TableInfo> tableInfo;
+
+ private Path testDir;
+
+ protected static void setupCluster(boolean shouldKeepLocality) throws
Exception {
+ SHOULD_KEEP_LOCALITY = shouldKeepLocality;
+ Configuration conf = UTIL.getConfiguration();
+ conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY,
shouldKeepLocality);
+ // We should change host count higher than hdfs replica count when
MiniHBaseCluster supports
+ // explicit hostnames parameter just like MiniDFSCluster does.
+ int hostCount = shouldKeepLocality ? 3 : 1;
+
+ HOSTNAMES = new String[hostCount];
+ for (int i = 0; i < hostCount; ++i) {
+ HOSTNAMES[i] = "datanode_" + i;
+ }
+ StartTestingClusterOption option = StartTestingClusterOption.builder()
+ .numRegionServers(hostCount).dataNodeHosts(HOSTNAMES).build();
+ UTIL.getConfiguration().unset(HConstants.TEMPORARY_FS_DIRECTORY_KEY);
+ UTIL.startMiniCluster(option);
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ int regionNum = SHOULD_KEEP_LOCALITY ? 20 : 5;
+ allTables = new HashMap<>(tableStr.size());
+ tableInfo = new ArrayList<>(tableStr.size());
+ for (String tableStrSingle : tableStr) {
+ byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
+ TableName tableName = TableName.valueOf(tableStrSingle);
+ Table table = UTIL.createTable(tableName, FAMILIES, splitKeys);
+
+ RegionLocator r = UTIL.getConnection().getRegionLocator(tableName);
+ assertEquals("Should start with empty table", 0,
HBaseTestingUtil.countRows(table));
+ int numRegions = r.getStartKeys().length;
+ assertEquals("Should make " + regionNum + " regions", numRegions,
regionNum);
+
+ allTables.put(tableStrSingle, table);
+ tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
r));
+ }
+ testDir = UTIL.getDataTestDirOnTestFS(tableStr.get(0));
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
+ tableInfoSingle.getRegionLocator().close();
+ }
+ tableInfo.clear();
+ allTables.clear();
+ for (String tableStrSingle : tableStr) {
+ UTIL.deleteTable(TableName.valueOf(tableStrSingle));
+ }
+ }
+
+ @Test
+ public void doIncrementalLoadTest() throws Exception {
+ boolean writeMultipleTables = tableStr.size() > 1;
+ // Generate the bulk load files
+ runIncrementalPELoad(UTIL.getConfiguration(), tableInfo, testDir,
putSortReducer);
+ if (writeMultipleTables) {
+ testDir = new Path(testDir, "default");
+ }
+
+ for (Table tableSingle : allTables.values()) {
+ // This doesn't write into the table, just makes files
+ assertEquals("HFOF should not touch actual table", 0,
+ HBaseTestingUtil.countRows(tableSingle));
+ }
+ int numTableDirs = 0;
+ FileStatus[] fss =
testDir.getFileSystem(UTIL.getConfiguration()).listStatus(testDir);
+ for (FileStatus tf : fss) {
+ Path tablePath = testDir;
+ if (writeMultipleTables) {
+ if (allTables.containsKey(tf.getPath().getName())) {
+ ++numTableDirs;
+ tablePath = tf.getPath();
+ } else {
+ continue;
+ }
+ }
+
+ // Make sure that a directory was created for every CF
+ int dir = 0;
+ fss =
tablePath.getFileSystem(UTIL.getConfiguration()).listStatus(tablePath);
+ for (FileStatus f : fss) {
+ for (byte[] family : FAMILIES) {
+ if (Bytes.toString(family).equals(f.getPath().getName())) {
+ ++dir;
+ }
+ }
+ }
+ assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+ }
+ if (writeMultipleTables) {
+ assertEquals("Dir for all input tables not created", numTableDirs,
allTables.size());
+ }
+
+ Admin admin = UTIL.getAdmin();
+
+ // handle the split case
+ if (shouldChangeRegions) {
+ Table chosenTable = allTables.values().iterator().next();
+ // Choose a semi-random table if multiple tables are available
+ LOG.info("Changing regions in table " +
chosenTable.getName().getNameAsString());
+ admin.disableTable(chosenTable.getName());
+ UTIL.waitUntilNoRegionsInTransition();
+
+ UTIL.deleteTable(chosenTable.getName());
+ byte[][] newSplitKeys = generateRandomSplitKeys(14);
+ UTIL.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
+ UTIL.waitTableAvailable(chosenTable.getName());
+ }
+
+ // Perform the actual load
+ for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
+ Path tableDir = testDir;
+ String tableNameStr =
singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
+ LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
+ if (writeMultipleTables) {
+ tableDir = new Path(testDir, tableNameStr);
+ }
+ Table currentTable = allTables.get(tableNameStr);
+ TableName currentTableName = currentTable.getName();
+
BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(currentTableName,
tableDir);
+
+ // Ensure data shows up
+ int expectedRows = 0;
+ if (putSortReducer) {
+ // no rows should be extracted
+ assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows,
+ HBaseTestingUtil.countRows(currentTable));
+ } else {
+ expectedRows = NMapInputFormat.getNumMapTasks(UTIL.getConfiguration())
* ROWSPERSPLIT;
+ assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows,
+ HBaseTestingUtil.countRows(currentTable));
+ Scan scan = new Scan();
+ ResultScanner results = currentTable.getScanner(scan);
+ for (Result res : results) {
+ assertEquals(FAMILIES.length, res.rawCells().length);
+ Cell first = res.rawCells()[0];
+ for (Cell kv : res.rawCells()) {
+ assertTrue(CellUtil.matchingRows(first, kv));
+ assertTrue(Bytes.equals(CellUtil.cloneValue(first),
CellUtil.cloneValue(kv)));
+ }
+ }
+ results.close();
+ }
+ String tableDigestBefore = UTIL.checksumRows(currentTable);
+ // Check region locality
+ HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
+ for (HRegion region :
UTIL.getHBaseCluster().getRegions(currentTableName)) {
+ hbd.add(region.getHDFSBlocksDistribution());
+ }
+ for (String hostname : HOSTNAMES) {
+ float locality = hbd.getBlockLocalityIndex(hostname);
+ LOG.info("locality of [" + hostname + "]: " + locality);
+ assertEquals(100, (int) (locality * 100));
+ }
+
+ // Cause regions to reopen
+ admin.disableTable(currentTableName);
+ while (!admin.isTableDisabled(currentTableName)) {
+ Thread.sleep(200);
+ LOG.info("Waiting for table to disable");
+ }
+ admin.enableTable(currentTableName);
+ UTIL.waitTableAvailable(currentTableName);
+ assertEquals("Data should remain after reopening of regions",
tableDigestBefore,
+ UTIL.checksumRows(currentTable));
+ }
+ }
+}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestConfigurePartitioner.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestConfigurePartitioner.java
new file mode 100644
index 00000000000..49c08a463ab
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestConfigurePartitioner.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MapReduceTests.class, MediumTests.class })
+public class TestConfigurePartitioner {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestConfigurePartitioner.class);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestConfigurePartitioner.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ @Before
+ public void setUp() throws Exception {
+ UTIL.startMiniDFSCluster(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ UTIL.shutdownMiniDFSCluster();
+ }
+
+ @Test
+ public void testConfigurePartitioner() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ // Create a user who is not the current user
+ String fooUserName = "foo1234";
+ String fooGroupName = "group1";
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting(fooUserName, new String[] {
fooGroupName });
+ // Get user's home directory
+ Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
+ @Override
+ public Path run() {
+ try (FileSystem fs = FileSystem.get(conf)) {
+ return fs.makeQualified(fs.getHomeDirectory());
+ } catch (IOException ioe) {
+ LOG.error("Failed to get foo's home directory", ioe);
+ }
+ return null;
+ }
+ });
+ // create the home directory and chown
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(fooHomeDirectory);
+ fs.setOwner(fooHomeDirectory, fooUserName, fooGroupName);
+
+ Job job = Mockito.mock(Job.class);
+ Mockito.doReturn(conf).when(job).getConfiguration();
+ ImmutableBytesWritable writable = new ImmutableBytesWritable();
+ List<ImmutableBytesWritable> splitPoints = new
ArrayList<ImmutableBytesWritable>();
+ splitPoints.add(writable);
+
+ ugi.doAs(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ try {
+ HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
+ } catch (IOException ioe) {
+ LOG.error("Failed to configure partitioner", ioe);
+ }
+ return null;
+ }
+ });
+ // verify that the job uses TotalOrderPartitioner
+ verify(job).setPartitionerClass(TotalOrderPartitioner.class);
+ // verify that TotalOrderPartitioner.setPartitionFile() is called.
+ String partitionPathString =
conf.get("mapreduce.totalorderpartitioner.path");
+ assertNotNull(partitionPathString);
+ // Make sure the partion file is in foo1234's home directory, and that
+ // the file exists.
+ assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
+ assertTrue(fs.exists(new Path(partitionPathString)));
+ }
+}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index fb7dde1cc69..37096e408a7 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -24,27 +24,21 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.lang.reflect.Field;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -60,7 +54,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -82,9 +75,6 @@ import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -100,7 +90,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -115,15 +104,10 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
@@ -138,171 +122,14 @@ import org.slf4j.LoggerFactory;
* values.
*/
@Category({ VerySlowMapReduceTests.class, LargeTests.class })
-public class TestHFileOutputFormat2 {
+public class TestHFileOutputFormat2 extends HFileOutputFormat2TestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
- private final static int ROWSPERSPLIT = 1024;
- private static final int DEFAULT_VALUE_LENGTH = 1000;
-
- public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
- private static final byte[][] FAMILIES =
- { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME,
Bytes.toBytes("-B")) };
- private static final TableName[] TABLE_NAMES = Stream.of("TestTable",
"TestTable2", "TestTable3")
- .map(TableName::valueOf).toArray(TableName[]::new);
-
- private HBaseTestingUtil util = new HBaseTestingUtil();
-
private static final Logger LOG =
LoggerFactory.getLogger(TestHFileOutputFormat2.class);
- /**
- * Simple mapper that makes KeyValue output.
- */
- static class RandomKVGeneratingMapper
- extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
-
- private int keyLength;
- private static final int KEYLEN_DEFAULT = 10;
- private static final String KEYLEN_CONF = "randomkv.key.length";
-
- private int valLength;
- private static final int VALLEN_DEFAULT = 10;
- private static final String VALLEN_CONF = "randomkv.val.length";
- private static final byte[] QUALIFIER = Bytes.toBytes("data");
- private boolean multiTableMapper = false;
- private TableName[] tables = null;
-
- @Override
- protected void setup(Context context) throws IOException,
InterruptedException {
- super.setup(context);
-
- Configuration conf = context.getConfiguration();
- keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
- valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
- multiTableMapper =
-
conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
- if (multiTableMapper) {
- tables = TABLE_NAMES;
- } else {
- tables = new TableName[] { TABLE_NAMES[0] };
- }
- }
-
- @Override
- protected void map(NullWritable n1, NullWritable n2,
- Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context
context)
- throws java.io.IOException, InterruptedException {
-
- byte keyBytes[] = new byte[keyLength];
- byte valBytes[] = new byte[valLength];
-
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
- byte[] key;
- for (int j = 0; j < tables.length; ++j) {
- for (int i = 0; i < ROWSPERSPLIT; i++) {
- Bytes.random(keyBytes);
- // Ensure that unique tasks generate unique keys
- keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
- Bytes.random(valBytes);
- key = keyBytes;
- if (multiTableMapper) {
- key =
MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
- }
-
- for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
- Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
- context.write(new ImmutableBytesWritable(key), kv);
- }
- }
- }
- }
- }
-
- /**
- * Simple mapper that makes Put output.
- */
- static class RandomPutGeneratingMapper
- extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
-
- private int keyLength;
- private static final int KEYLEN_DEFAULT = 10;
- private static final String KEYLEN_CONF = "randomkv.key.length";
-
- private int valLength;
- private static final int VALLEN_DEFAULT = 10;
- private static final String VALLEN_CONF = "randomkv.val.length";
- private static final byte[] QUALIFIER = Bytes.toBytes("data");
- private boolean multiTableMapper = false;
- private TableName[] tables = null;
-
- @Override
- protected void setup(Context context) throws IOException,
InterruptedException {
- super.setup(context);
-
- Configuration conf = context.getConfiguration();
- keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
- valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
- multiTableMapper =
-
conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
- if (multiTableMapper) {
- tables = TABLE_NAMES;
- } else {
- tables = new TableName[] { TABLE_NAMES[0] };
- }
- }
-
- @Override
- protected void map(NullWritable n1, NullWritable n2,
- Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context
context)
- throws java.io.IOException, InterruptedException {
-
- byte keyBytes[] = new byte[keyLength];
- byte valBytes[] = new byte[valLength];
-
- int taskId = context.getTaskAttemptID().getTaskID().getId();
- assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
- byte[] key;
- for (int j = 0; j < tables.length; ++j) {
- for (int i = 0; i < ROWSPERSPLIT; i++) {
- Bytes.random(keyBytes);
- // Ensure that unique tasks generate unique keys
- keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
- Bytes.random(valBytes);
- key = keyBytes;
- if (multiTableMapper) {
- key =
MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
- }
-
- for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
- Put p = new Put(keyBytes);
- p.addColumn(family, QUALIFIER, valBytes);
- // set TTL to very low so that the scan does not return any value
- p.setTTL(1l);
- context.write(new ImmutableBytesWritable(key), p);
- }
- }
- }
- }
- }
-
- private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
- if (putSortReducer) {
- job.setInputFormatClass(NMapInputFormat.class);
- job.setMapperClass(RandomPutGeneratingMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(Put.class);
- } else {
- job.setInputFormatClass(NMapInputFormat.class);
- job.setMapperClass(RandomKVGeneratingMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
- }
- }
-
/**
* Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
passed a keyvalue whose
* timestamp is {@link HConstants#LATEST_TIMESTAMP}.
@@ -311,10 +138,10 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
@Test
public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
- Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
+ Path dir = UTIL.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
try {
Job job = new Job(conf);
FileOutputFormat.setOutputPath(job, dir);
@@ -361,10 +188,10 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
@Test
public void test_TIMERANGE() throws Exception {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
- Path dir = util.getDataTestDir("test_TIMERANGE_present");
+ Path dir = UTIL.getDataTestDir("test_TIMERANGE_present");
LOG.info("Timerange dir writing to dir: " + dir);
try {
// build a record writer using HFileOutputFormat2
@@ -425,8 +252,8 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
@Test
public void testWritingPEData() throws Exception {
- Configuration conf = util.getConfiguration();
- Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
+ Configuration conf = UTIL.getConfiguration();
+ Path testDir = UTIL.getDataTestDirOnTestFS("testWritingPEData");
FileSystem fs = testDir.getFileSystem(conf);
// Set down this value or we OOME in eclipse.
@@ -487,12 +314,12 @@ public class TestHFileOutputFormat2 {
*/
@Test
public void test_WritingTagData() throws Exception {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY,
HFile.MIN_FORMAT_VERSION_WITH_TAGS);
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
- Path dir = util.getDataTestDir("WritingTagData");
+ Path dir = UTIL.getDataTestDir("WritingTagData");
try {
conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY,
TABLE_NAMES[0].getNameAsString());
// turn locality off to eliminate getRegionLocation fail-and-retry time
when writing kvs
@@ -534,11 +361,11 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
@Test
public void testJobConfiguration() throws Exception {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- util.getDataTestDir("testJobConfiguration").toString());
+ UTIL.getDataTestDir("testJobConfiguration").toString());
Job job = new Job(conf);
- job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
+ job.setWorkingDirectory(UTIL.getDataTestDir("testJobConfiguration"));
Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
setupMockStartKeys(regionLocator);
@@ -547,304 +374,6 @@ public class TestHFileOutputFormat2 {
assertEquals(job.getNumReduceTasks(), 4);
}
- private byte[][] generateRandomStartKeys(int numKeys) {
- Random random = ThreadLocalRandom.current();
- byte[][] ret = new byte[numKeys][];
- // first region start key is always empty
- ret[0] = HConstants.EMPTY_BYTE_ARRAY;
- for (int i = 1; i < numKeys; i++) {
- ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
- }
- return ret;
- }
-
- /*
- * This method takes some time and is done inline uploading data. For
example, doing the mapfile
- * test, generation of the key and value consumes about 30% of CPU time.
- * @return Generated random value to insert into a table cell.
- */
- public static byte[] generateData(final Random r, int length) {
- byte[] b = new byte[length];
- int i;
-
- for (i = 0; i < (length - 8); i += 8) {
- b[i] = (byte) (65 + r.nextInt(26));
- b[i + 1] = b[i];
- b[i + 2] = b[i];
- b[i + 3] = b[i];
- b[i + 4] = b[i];
- b[i + 5] = b[i];
- b[i + 6] = b[i];
- b[i + 7] = b[i];
- }
-
- byte a = (byte) (65 + r.nextInt(26));
- for (; i < length; i++) {
- b[i] = a;
- }
- return b;
- }
-
- private byte[][] generateRandomSplitKeys(int numKeys) {
- Random random = ThreadLocalRandom.current();
- byte[][] ret = new byte[numKeys][];
- for (int i = 0; i < numKeys; i++) {
- ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
- }
- return ret;
- }
-
- @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
- @Test
- public void testMRIncrementalLoad() throws Exception {
- LOG.info("\nStarting test testMRIncrementalLoad\n");
- doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
- }
-
- @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
- @Test
- public void testMRIncrementalLoadWithSplit() throws Exception {
- LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
- doIncrementalLoadTest(true, false, false,
"testMRIncrementalLoadWithSplit");
- }
-
- /**
- * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test
could only check the
- * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to
true. Because
- * MiniHBaseCluster always run with single hostname (and different ports),
it's not possible to
- * check the region locality by comparing region locations and DN hostnames.
When MiniHBaseCluster
- * supports explicit hostnames parameter (just like MiniDFSCluster does), we
could test region
- * locality features more easily.
- */
- @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
- @Test
- public void testMRIncrementalLoadWithLocality() throws Exception {
- LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
- doIncrementalLoadTest(false, true, false,
"testMRIncrementalLoadWithLocality1");
- doIncrementalLoadTest(true, true, false,
"testMRIncrementalLoadWithLocality2");
- }
-
- // @Ignore("Wahtevs")
- @Test
- public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
- LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
- doIncrementalLoadTest(false, false, true,
"testMRIncrementalLoadWithPutSortReducer");
- }
-
- private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean
shouldKeepLocality,
- boolean putSortReducer, String tableStr) throws Exception {
- doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality,
putSortReducer,
- Arrays.asList(tableStr));
- }
-
- @Test
- public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
- LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
- doIncrementalLoadTest(false, false, true,
-
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
- }
-
- private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean
shouldKeepLocality,
- boolean putSortReducer, List<String> tableStr) throws Exception {
- util = new HBaseTestingUtil();
- Configuration conf = util.getConfiguration();
- conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY,
shouldKeepLocality);
- int hostCount = 1;
- int regionNum = 5;
- if (shouldKeepLocality) {
- // We should change host count higher than hdfs replica count when
MiniHBaseCluster supports
- // explicit hostnames parameter just like MiniDFSCluster does.
- hostCount = 3;
- regionNum = 20;
- }
-
- String[] hostnames = new String[hostCount];
- for (int i = 0; i < hostCount; ++i) {
- hostnames[i] = "datanode_" + i;
- }
- StartTestingClusterOption option = StartTestingClusterOption.builder()
- .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
- util.startMiniCluster(option);
-
- Map<String, Table> allTables = new HashMap<>(tableStr.size());
- List<HFileOutputFormat2.TableInfo> tableInfo = new
ArrayList<>(tableStr.size());
- boolean writeMultipleTables = tableStr.size() > 1;
- for (String tableStrSingle : tableStr) {
- byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
- TableName tableName = TableName.valueOf(tableStrSingle);
- Table table = util.createTable(tableName, FAMILIES, splitKeys);
-
- RegionLocator r = util.getConnection().getRegionLocator(tableName);
- assertEquals("Should start with empty table", 0, util.countRows(table));
- int numRegions = r.getStartKeys().length;
- assertEquals("Should make " + regionNum + " regions", numRegions,
regionNum);
-
- allTables.put(tableStrSingle, table);
- tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
r));
- }
- Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
- // Generate the bulk load files
- runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
- if (writeMultipleTables) {
- testDir = new Path(testDir, "default");
- }
-
- for (Table tableSingle : allTables.values()) {
- // This doesn't write into the table, just makes files
- assertEquals("HFOF should not touch actual table", 0,
util.countRows(tableSingle));
- }
- int numTableDirs = 0;
- FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
- for (FileStatus tf : fss) {
- Path tablePath = testDir;
- if (writeMultipleTables) {
- if (allTables.containsKey(tf.getPath().getName())) {
- ++numTableDirs;
- tablePath = tf.getPath();
- } else {
- continue;
- }
- }
-
- // Make sure that a directory was created for every CF
- int dir = 0;
- fss = tablePath.getFileSystem(conf).listStatus(tablePath);
- for (FileStatus f : fss) {
- for (byte[] family : FAMILIES) {
- if (Bytes.toString(family).equals(f.getPath().getName())) {
- ++dir;
- }
- }
- }
- assertEquals("Column family not found in FS.", FAMILIES.length, dir);
- }
- if (writeMultipleTables) {
- assertEquals("Dir for all input tables not created", numTableDirs,
allTables.size());
- }
-
- Admin admin = util.getConnection().getAdmin();
- try {
- // handle the split case
- if (shouldChangeRegions) {
- Table chosenTable = allTables.values().iterator().next();
- // Choose a semi-random table if multiple tables are available
- LOG.info("Changing regions in table " +
chosenTable.getName().getNameAsString());
- admin.disableTable(chosenTable.getName());
- util.waitUntilNoRegionsInTransition();
-
- util.deleteTable(chosenTable.getName());
- byte[][] newSplitKeys = generateRandomSplitKeys(14);
- Table table = util.createTable(chosenTable.getName(), FAMILIES,
newSplitKeys);
-
- while (
-
util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
- .size() != 15 || !admin.isTableAvailable(table.getName())
- ) {
- Thread.sleep(200);
- LOG.info("Waiting for new region assignment to happen");
- }
- }
-
- // Perform the actual load
- for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
- Path tableDir = testDir;
- String tableNameStr =
singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
- LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
- if (writeMultipleTables) {
- tableDir = new Path(testDir, tableNameStr);
- }
- Table currentTable = allTables.get(tableNameStr);
- TableName currentTableName = currentTable.getName();
- BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
-
- // Ensure data shows up
- int expectedRows = 0;
- if (putSortReducer) {
- // no rows should be extracted
- assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows,
- util.countRows(currentTable));
- } else {
- expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows,
- util.countRows(currentTable));
- Scan scan = new Scan();
- ResultScanner results = currentTable.getScanner(scan);
- for (Result res : results) {
- assertEquals(FAMILIES.length, res.rawCells().length);
- Cell first = res.rawCells()[0];
- for (Cell kv : res.rawCells()) {
- assertTrue(CellUtil.matchingRows(first, kv));
- assertTrue(Bytes.equals(CellUtil.cloneValue(first),
CellUtil.cloneValue(kv)));
- }
- }
- results.close();
- }
- String tableDigestBefore = util.checksumRows(currentTable);
- // Check region locality
- HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
- for (HRegion region :
util.getHBaseCluster().getRegions(currentTableName)) {
- hbd.add(region.getHDFSBlocksDistribution());
- }
- for (String hostname : hostnames) {
- float locality = hbd.getBlockLocalityIndex(hostname);
- LOG.info("locality of [" + hostname + "]: " + locality);
- assertEquals(100, (int) (locality * 100));
- }
-
- // Cause regions to reopen
- admin.disableTable(currentTableName);
- while (!admin.isTableDisabled(currentTableName)) {
- Thread.sleep(200);
- LOG.info("Waiting for table to disable");
- }
- admin.enableTable(currentTableName);
- util.waitTableAvailable(currentTableName);
- assertEquals("Data should remain after reopening of regions",
tableDigestBefore,
- util.checksumRows(currentTable));
- }
- } finally {
- for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
- tableInfoSingle.getRegionLocator().close();
- }
- for (Entry<String, Table> singleTable : allTables.entrySet()) {
- singleTable.getValue().close();
- util.deleteTable(singleTable.getValue().getName());
- }
- testDir.getFileSystem(conf).delete(testDir, true);
- util.shutdownMiniCluster();
- }
- }
-
- private void runIncrementalPELoad(Configuration conf,
- List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean
putSortReducer)
- throws IOException, InterruptedException, ClassNotFoundException {
- Job job = new Job(conf, "testLocalMRIncrementalLoad");
-
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
- job.getConfiguration().setStrings("io.serializations",
conf.get("io.serializations"),
- MutationSerialization.class.getName(),
ResultSerialization.class.getName(),
- CellSerialization.class.getName());
- setupRandomGeneratorMapper(job, putSortReducer);
- if (tableInfo.size() > 1) {
- MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
- int sum = 0;
- for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
- sum +=
tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
- }
- assertEquals(sum, job.getNumReduceTasks());
- } else {
- RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
- HFileOutputFormat2.configureIncrementalLoad(job,
tableInfo.get(0).getTableDescriptor(),
- regionLocator);
- assertEquals(regionLocator.getAllRegionLocations().size(),
job.getNumReduceTasks());
- }
-
- FileOutputFormat.setOutputPath(job, outDir);
-
- assertFalse(util.getTestFileSystem().exists(outDir));
-
- assertTrue(job.waitForCompletion(true));
- }
-
/**
* Test for {@link
HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
* family compression map is correctly serialized into and deserialized from
configuration
@@ -853,7 +382,7 @@ public class TestHFileOutputFormat2 {
@Test
public void testSerializeDeserializeFamilyCompressionMap() throws
IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
Map<String, Compression.Algorithm> familyToCompression =
getMockColumnFamiliesForCompression(numCfs);
Table table = Mockito.mock(Table.class);
@@ -919,7 +448,7 @@ public class TestHFileOutputFormat2 {
@Test
public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
for (int numCfs = 0; numCfs <= 2; numCfs++) {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
Map<String, BloomType> familyToBloomType =
getMockColumnFamiliesForBloomType(numCfs);
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForBloomType(table, familyToBloomType);
@@ -980,7 +509,7 @@ public class TestHFileOutputFormat2 {
@Test
public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
Map<String, Integer> familyToBlockSize =
getMockColumnFamiliesForBlockSize(numCfs);
Table table = Mockito.mock(Table.class);
setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
@@ -1045,7 +574,7 @@ public class TestHFileOutputFormat2 {
@Test
public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws
IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
Map<String, DataBlockEncoding> familyToDataBlockEncoding =
getMockColumnFamiliesForDataBlockEncoding(numCfs);
Table table = Mockito.mock(Table.class);
@@ -1124,10 +653,10 @@ public class TestHFileOutputFormat2 {
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
@Test
public void testColumnFamilySettings() throws Exception {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
- Path dir = util.getDataTestDir("testColumnFamilySettings");
+ Path dir = UTIL.getDataTestDir("testColumnFamilySettings");
// Setup table descriptor
Table table = Mockito.mock(Table.class);
@@ -1153,7 +682,7 @@ public class TestHFileOutputFormat2 {
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
Job job = new Job(conf, "testLocalMRIncrementalLoad");
-
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
+
job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job, false);
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
regionLocator);
FileOutputFormat.setOutputPath(job, dir);
@@ -1231,16 +760,16 @@ public class TestHFileOutputFormat2 {
@Ignore("Flakey: See HBASE-9051")
@Test
public void testExcludeAllFromMinorCompaction() throws Exception {
- Configuration conf = util.getConfiguration();
+ Configuration conf = UTIL.getConfiguration();
conf.setInt("hbase.hstore.compaction.min", 2);
generateRandomStartKeys(5);
- util.startMiniCluster();
+ UTIL.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection(); Admin admin =
conn.getAdmin();
- Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
+ Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES);
RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
- final FileSystem fs = util.getDFSCluster().getFileSystem();
- assertEquals("Should start with empty table", 0, util.countRows(table));
+ final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+ assertEquals("Should start with empty table", 0, UTIL.countRows(table));
// deep inspection: get the StoreFile dir
final Path storePath =
@@ -1253,7 +782,7 @@ public class TestHFileOutputFormat2 {
conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
true);
for (int i = 0; i < 2; i++) {
- Path testDir =
util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
+ Path testDir =
UTIL.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
runIncrementalPELoad(conf,
Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
conn.getRegionLocator(TABLE_NAMES[0]))),
@@ -1265,7 +794,7 @@ public class TestHFileOutputFormat2 {
// Ensure data shows up
int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) *
ROWSPERSPLIT;
assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows,
- util.countRows(table));
+ UTIL.countRows(table));
// should have a second StoreFile now
assertEquals(2, fs.listStatus(storePath).length);
@@ -1276,7 +805,7 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- List<HRegion> regions =
util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
+ List<HRegion> regions =
UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (HStore store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
@@ -1295,7 +824,7 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- List<HRegion> regions =
util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
+ List<HRegion> regions =
UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (HStore store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
@@ -1306,24 +835,24 @@ public class TestHFileOutputFormat2 {
}, 5000);
} finally {
- util.shutdownMiniCluster();
+ UTIL.shutdownMiniCluster();
}
}
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
@Test
public void testExcludeMinorCompaction() throws Exception {
- Configuration conf = util.getConfiguration();
+ Configuration conf = UTIL.getConfiguration();
conf.setInt("hbase.hstore.compaction.min", 2);
generateRandomStartKeys(5);
- util.startMiniCluster();
+ UTIL.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin()) {
- Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
- final FileSystem fs = util.getDFSCluster().getFileSystem();
- Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
- assertEquals("Should start with empty table", 0, util.countRows(table));
+ Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeMinorCompaction");
+ final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+ Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES);
+ assertEquals("Should start with empty table", 0, UTIL.countRows(table));
// deep inspection: get the StoreFile dir
final Path storePath =
@@ -1337,7 +866,7 @@ public class TestHFileOutputFormat2 {
p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
table.put(p);
admin.flush(TABLE_NAMES[0]);
- assertEquals(1, util.countRows(table));
+ assertEquals(1, UTIL.countRows(table));
quickPoll(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -1359,7 +888,7 @@ public class TestHFileOutputFormat2 {
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows + 1,
- util.countRows(table));
+ UTIL.countRows(table));
// should have a second StoreFile now
assertEquals(2, fs.listStatus(storePath).length);
@@ -1388,7 +917,7 @@ public class TestHFileOutputFormat2 {
}, 5000);
} finally {
- util.shutdownMiniCluster();
+ UTIL.shutdownMiniCluster();
}
}
@@ -1410,11 +939,11 @@ public class TestHFileOutputFormat2 {
public void manualTest(String args[]) throws Exception {
Configuration conf = HBaseConfiguration.create();
- util = new HBaseTestingUtil(conf);
+ UTIL = new HBaseTestingUtil(conf);
if ("newtable".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]);
byte[][] splitKeys = generateRandomSplitKeys(4);
- Table table = util.createTable(tname, FAMILIES, splitKeys);
+ Table table = UTIL.createTable(tname, FAMILIES, splitKeys);
} else if ("incremental".equals(args[0])) {
TableName tname = TableName.valueOf(args[1]);
try (Connection c = ConnectionFactory.createConnection(conf); Admin
admin = c.getAdmin();
@@ -1432,18 +961,18 @@ public class TestHFileOutputFormat2 {
@Test
public void testBlockStoragePolicy() throws Exception {
- util = new HBaseTestingUtil();
- Configuration conf = util.getConfiguration();
+ UTIL = new HBaseTestingUtil();
+ Configuration conf = UTIL.getConfiguration();
conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
conf.set(
HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
.toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(),
FAMILIES[0])),
"ONE_SSD");
- Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
- Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
- util.startMiniDFSCluster(3);
- FileSystem fs = util.getDFSCluster().getFileSystem();
+ Path cf1Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[0]));
+ Path cf2Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[1]));
+ UTIL.startMiniDFSCluster(3);
+ FileSystem fs = UTIL.getDFSCluster().getFileSystem();
try {
fs.mkdirs(cf1Dir);
fs.mkdirs(cf2Dir);
@@ -1472,7 +1001,7 @@ public class TestHFileOutputFormat2 {
} finally {
fs.delete(cf1Dir, true);
fs.delete(cf2Dir, true);
- util.shutdownMiniDFSCluster();
+ UTIL.shutdownMiniDFSCluster();
}
}
@@ -1515,70 +1044,12 @@ public class TestHFileOutputFormat2 {
return null;
}
- @Test
- public void testConfigurePartitioner() throws Exception {
- util.startMiniDFSCluster(1);
- try {
- Configuration conf = util.getConfiguration();
- // Create a user who is not the current user
- String fooUserName = "foo1234";
- String fooGroupName = "group1";
- UserGroupInformation ugi =
- UserGroupInformation.createUserForTesting(fooUserName, new String[] {
fooGroupName });
- // Get user's home directory
- Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
- @Override
- public Path run() {
- try (FileSystem fs = FileSystem.get(conf)) {
- return fs.makeQualified(fs.getHomeDirectory());
- } catch (IOException ioe) {
- LOG.error("Failed to get foo's home directory", ioe);
- }
- return null;
- }
- });
- // create the home directory and chown
- FileSystem fs = FileSystem.get(conf);
- fs.mkdirs(fooHomeDirectory);
- fs.setOwner(fooHomeDirectory, fooUserName, fooGroupName);
-
- Job job = Mockito.mock(Job.class);
- Mockito.doReturn(conf).when(job).getConfiguration();
- ImmutableBytesWritable writable = new ImmutableBytesWritable();
- List<ImmutableBytesWritable> splitPoints = new
ArrayList<ImmutableBytesWritable>();
- splitPoints.add(writable);
-
- ugi.doAs(new PrivilegedAction<Void>() {
- @Override
- public Void run() {
- try {
- HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
- } catch (IOException ioe) {
- LOG.error("Failed to configure partitioner", ioe);
- }
- return null;
- }
- });
- // verify that the job uses TotalOrderPartitioner
- verify(job).setPartitionerClass(TotalOrderPartitioner.class);
- // verify that TotalOrderPartitioner.setPartitionFile() is called.
- String partitionPathString =
conf.get("mapreduce.totalorderpartitioner.path");
- Assert.assertNotNull(partitionPathString);
- // Make sure the partion file is in foo1234's home directory, and that
- // the file exists.
-
Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
- Assert.assertTrue(fs.exists(new Path(partitionPathString)));
- } finally {
- util.shutdownMiniDFSCluster();
- }
- }
-
@Test
public void TestConfigureCompression() throws Exception {
- Configuration conf = new Configuration(this.util.getConfiguration());
+ Configuration conf = new Configuration(this.UTIL.getConfiguration());
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
- Path dir = util.getDataTestDir("TestConfigureCompression");
+ Path dir = UTIL.getDataTestDir("TestConfigureCompression");
String hfileoutputformatCompression = "gz";
try {
@@ -1619,8 +1090,8 @@ public class TestHFileOutputFormat2 {
@Test
public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception
{
// Start cluster A
- util = new HBaseTestingUtil();
- Configuration confA = util.getConfiguration();
+ UTIL = new HBaseTestingUtil();
+ Configuration confA = UTIL.getConfiguration();
int hostCount = 3;
int regionNum = 20;
String[] hostnames = new String[hostCount];
@@ -1629,27 +1100,27 @@ public class TestHFileOutputFormat2 {
}
StartTestingClusterOption option = StartTestingClusterOption.builder()
.numRegionServers(hostCount).dataNodeHosts(hostnames).build();
- util.startMiniCluster(option);
+ UTIL.startMiniCluster(option);
// Start cluster B
- HBaseTestingUtil utilB = new HBaseTestingUtil();
- Configuration confB = utilB.getConfiguration();
- utilB.startMiniCluster(option);
+ HBaseTestingUtil UTILB = new HBaseTestingUtil();
+ Configuration confB = UTILB.getConfiguration();
+ UTILB.startMiniCluster(option);
- Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+ Path testDir = UTIL.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
TableName tableName = TableName.valueOf("table");
// Create table in cluster B
- try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
- RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
+ try (Table table = UTILB.createTable(tableName, FAMILIES, splitKeys);
+ RegionLocator r = UTILB.getConnection().getRegionLocator(tableName)) {
// Generate the bulk load files
// Job has zookeeper configuration for cluster A
// Assume reading from cluster A by TableInputFormat and creating hfiles
to cluster B
Job job = new Job(confA, "testLocalMRIncrementalLoad");
Configuration jobConf = job.getConfiguration();
final UUID key =
ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
-
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
+
job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad"));
setupRandomGeneratorMapper(job, false);
HFileOutputFormat2.configureIncrementalLoad(job, table, r);
@@ -1667,7 +1138,7 @@ public class TestHFileOutputFormat2 {
FileOutputFormat.setOutputPath(job, testDir);
- assertFalse(util.getTestFileSystem().exists(testDir));
+ assertFalse(UTIL.getTestFileSystem().exists(testDir));
assertTrue(job.waitForCompletion(true));
@@ -1686,10 +1157,10 @@ public class TestHFileOutputFormat2 {
assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
}
} finally {
- utilB.deleteTable(tableName);
+ UTILB.deleteTable(tableName);
testDir.getFileSystem(confA).delete(testDir, true);
- util.shutdownMiniCluster();
- utilB.shutdownMiniCluster();
+ UTIL.shutdownMiniCluster();
+ UTILB.shutdownMiniCluster();
}
}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMRIncrementalLoad.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMRIncrementalLoad.java
new file mode 100644
index 00000000000..1e7cb0e4103
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMRIncrementalLoad.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestMRIncrementalLoad extends MRIncrementalLoadTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMRIncrementalLoad.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupCluster(false);
+ }
+
+ @Parameters(name = "{index}: shouldChangeRegions={0}, putSortReducer={1}," +
" tableStr={2}")
+ public static List<Object[]> params() {
+ return Arrays.asList(new Object[] { false, false,
Arrays.asList("testMRIncrementalLoad") },
+ new Object[] { true, false,
Arrays.asList("testMRIncrementalLoadWithSplit") },
+ new Object[] { false, true,
Arrays.asList("testMRIncrementalLoadWithPutSortReducer") },
+ new Object[] { false, true,
+
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())
});
+ }
+}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMRIncrementalLoadWithLocality.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMRIncrementalLoadWithLocality.java
new file mode 100644
index 00000000000..e27273b1510
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMRIncrementalLoadWithLocality.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestMRIncrementalLoadWithLocality extends
MRIncrementalLoadTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMRIncrementalLoadWithLocality.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupCluster(true);
+ }
+
+ @Parameters(name = "{index}: shouldChangeRegions={0}, putSortReducer={1}," +
" tableStr={2}")
+ public static List<Object[]> params() {
+ return Arrays.asList(
+ new Object[] { false, false,
Arrays.asList("testMRIncrementalLoadWithLocality1") },
+ new Object[] { true, false,
Arrays.asList("testMRIncrementalLoadWithLocality2") });
+ }
+}