This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 210f22edd32d8ce50a871a4f190d67e3e03af1a6 Author: niuyulin <nyl...@163.com> AuthorDate: Sun May 24 20:55:48 2020 -0500 HBASE-24387 TableSnapshotInputFormatImpl support row limit on each InputSplit (#1731) Signed-off-by: Guanghao Zhang <zg...@apache.org> --- .../mapreduce/TableSnapshotInputFormatImpl.java | 23 +++++++--- .../mapreduce/TestTableSnapshotInputFormat.java | 51 ++++++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 9758f15..28b832e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -102,6 +102,12 @@ public class TableSnapshotInputFormatImpl { public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; /** + * In some scenario, scan limited rows on each InputSplit for sampling data extraction + */ + public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT = + "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit"; + + /** * Implementation class for InputSplit logic common between mapred and mapreduce. */ public static class InputSplit implements Writable { @@ -213,6 +219,8 @@ public class TableSnapshotInputFormatImpl { private Result result = null; private ImmutableBytesWritable row = null; private ClientSideRegionScanner scanner; + private int numOfCompleteRows = 0; + private int rowLimitPerSplit; public ClientSideRegionScanner getScanner() { return scanner; @@ -221,6 +229,7 @@ public class TableSnapshotInputFormatImpl { public void initialize(InputSplit split, Configuration conf) throws IOException { this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); this.split = split; + this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0); TableDescriptor htd = split.htd; HRegionInfo hri = this.split.getRegionInfo(); FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf); @@ -244,6 +253,9 @@ public class TableSnapshotInputFormatImpl { return false; } + if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) { + return false; + } if (this.row == null) { this.row = new ImmutableBytesWritable(); } @@ -296,10 +308,11 @@ public class TableSnapshotInputFormatImpl { return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits); } - public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{ + public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException { String splitAlgoClassName = conf.get(SPLIT_ALGO); - if (splitAlgoClassName == null) + if (splitAlgoClassName == null) { return null; + } try { return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class) .getDeclaredConstructor().newInstance(); @@ -511,9 +524,9 @@ public class TableSnapshotInputFormatImpl { * Configures the job to use TableSnapshotInputFormat to read from a snapshot. * @param conf the job to configure * @param snapshotName the name of the snapshot to read from - * @param restoreDir a temporary directory to restore the snapshot into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restoreDir can be deleted. + * @param restoreDir a temporary directory to restore the snapshot into. Current user should have + * write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. * @param numSplitsPerRegion how many input splits to generate per one region * @param splitAlgo SplitAlgorithm to be used when generating InputSplits * @throws IOException if an error occurs diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 5f187c6..d98340f 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -305,6 +306,56 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } @Test + public void testScanLimit() throws Exception { + setupCluster(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + final String snapshotName = tableName + "Snapshot"; + Table table = null; + try { + UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10); + if (UTIL.getAdmin().tableExists(tableName)) { + UTIL.deleteTable(tableName); + } + + UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy }); + + Admin admin = UTIL.getAdmin(); + + int regionNum = admin.getRegions(tableName).size(); + // put some stuff in the table + table = UTIL.getConnection().getTable(tableName); + UTIL.loadTable(table, FAMILIES); + + Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), + null, snapshotName, rootDir, fs, true); + + Job job = new Job(UTIL.getConfiguration()); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + Scan scan = new Scan(); + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), + TestTableSnapshotInputFormat.class); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true, + tmpTableDir); + Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertEquals(10 * regionNum, + job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue()); + } finally { + if (table != null) { + table.close(); + } + UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT); + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Test public void testNoDuplicateResultsWhenSplitting() throws Exception { setupCluster(); TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");