This is an automated email from the ASF dual-hosted git repository. huaxiangsun pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new 66fed7c HBASE-26255 Add an option to use region location from meta table in TableSnapshotInputFormat (#3661) (#3670) 66fed7c is described below commit 66fed7c743c3e247a9eb1ee323e3f7c3a7f609c2 Author: huaxiangsun <huaxiang...@apache.org> AuthorDate: Thu Sep 9 10:42:49 2021 -0700 HBASE-26255 Add an option to use region location from meta table in TableSnapshotInputFormat (#3661) (#3670) Signed-off-by: Anoop Sam John <anoopsamj...@apache.org> --- .../mapreduce/TableSnapshotInputFormatImpl.java | 61 ++++++++++++++++++---- .../mapreduce/TestTableSnapshotInputFormat.java | 29 ++++++++++ 2 files changed, 79 insertions(+), 11 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 5ed9be6..c3f05f4 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -32,9 +32,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.ClientSideRegionScanner; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -102,6 +106,15 @@ public class TableSnapshotInputFormatImpl { public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; /** + * Whether to calculate the Snapshot region location by region location from meta. + * It is much faster than computing block locations for splits. + */ + public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION = + "hbase.TableSnapshotInputFormat.locality.by.region.location"; + + public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false; + + /** * In some scenario, scan limited rows on each InputSplit for sampling data extraction */ public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT = @@ -392,17 +405,49 @@ public class TableSnapshotInputFormatImpl { SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT); scan.setScanMetricsEnabled(scanMetricsEnabled); + boolean useRegionLoc = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, + SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); + + Connection connection = null; + RegionLocator regionLocator = null; + if (localityEnabled && useRegionLoc) { + Configuration newConf = new Configuration(conf); + newConf.setInt("hbase.hconnection.threads.max", 1); + try { + connection = ConnectionFactory.createConnection(newConf); + regionLocator = connection.getRegionLocator(htd.getTableName()); + + /* Get all locations for the table and cache it */ + regionLocator.getAllRegionLocations(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + List<InputSplit> splits = new ArrayList<>(); for (HRegionInfo hri : regionManifests) { // load region descriptor + List<String> hosts = null; + if (localityEnabled) { + if (regionLocator != null) { + /* Get Location from the local cache */ + HRegionLocation + location = regionLocator.getRegionLocation(hri.getStartKey(), false); + + hosts = new ArrayList<>(1); + hosts.add(location.getHostname()); + } else { + hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir); + } + } if (numSplits > 1) { byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); for (int i = 0; i < sp.length - 1; i++) { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])) { - List<String> hosts = - calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); Scan boundedScan = new Scan(scan); if (scan.getStartRow().length == 0) { @@ -425,8 +470,7 @@ public class TableSnapshotInputFormatImpl { } else { if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), hri.getEndKey())) { - List<String> hosts = - calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } @@ -440,14 +484,9 @@ public class TableSnapshotInputFormatImpl { * only when localityEnabled is true. */ private static List<String> calculateLocationsForInputSplit(Configuration conf, - TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled) + TableDescriptor htd, HRegionInfo hri, Path tableDir) throws IOException { - if (localityEnabled) { // care block locality - return getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); - } else { // do not care block locality - return null; - } + return getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); } /** diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index b1723bc..0820f3b 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduce; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; +import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -198,6 +200,18 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa } } + @Test + public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true); + try { + testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1, + true); + } finally { + conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION); + } + } + @Override public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, String snapshotName, Path tmpTableDir) throws Exception { @@ -218,6 +232,8 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa Configuration conf = util.getConfiguration(); conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); + conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, + SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); Job job = new Job(conf); Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan @@ -406,6 +422,9 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); + boolean byRegionLoc = + job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, + SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); for (int i = 0; i < splits.size(); i++) { // validate input split InputSplit split = splits.get(i); @@ -413,6 +432,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; if (localityEnabled) { Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); + if (byRegionLoc) { + // When it uses region location from meta, the hostname will be "localhost", + // the location from hdfs block location is "127.0.0.1". + Assert.assertEquals(1, split.getLocations().length); + Assert.assertTrue("Not using region location!", + split.getLocations()[0].equals("localhost")); + } else { + Assert.assertTrue("Not using region location!", + split.getLocations()[0].equals("127.0.0.1")); + } } else { Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); }