Repository: hbase Updated Branches: refs/heads/branch-1.4 b93a80596 -> 846753c18
HBASE-19245 MultiTableInputFormatBase#getSplits creates a Connection per Table We make one Connection only instead of a Connection per table (Change is just moving one line it involves right-shifting body of the function) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/846753c1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/846753c1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/846753c1 Branch: refs/heads/branch-1.4 Commit: 846753c185cb6771b7d64cbdae82cbc38e213c94 Parents: b93a805 Author: Michael Stack <st...@apache.org> Authored: Mon Nov 13 11:42:10 2017 -0800 Committer: Michael Stack <st...@apache.org> Committed: Tue Nov 14 21:51:25 2017 -0800 ---------------------------------------------------------------------- .../mapreduce/MultiTableInputFormatBase.java | 108 ++++++++++--------- 1 file changed, 55 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/846753c1/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index 4931c3f..e91d20a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -181,60 +181,62 @@ public abstract class MultiTableInputFormatBase extends List<InputSplit> splits = new ArrayList<InputSplit>(); Iterator iter = tableMaps.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); - TableName tableName = entry.getKey(); - List<Scan> scanList = entry.getValue(); - - try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( - regionLocator, conn.getAdmin()); - Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); - for (Scan scan : scanList) { - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - throw new IOException("Expecting at least one region for table : " - + tableName.getNameAsString()); - } - int count = 0; - - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - - for (int i = 0; i < keys.getFirst().length; i++) { - if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { - continue; + // Make a single Connection to the Cluster and use it across all tables. + try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) { + while (iter.hasNext()) { + Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); + TableName tableName = entry.getKey(); + List<Scan> scanList = entry.getValue(); + try (Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( + regionLocator, conn.getAdmin()); + Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); + for (Scan scan : scanList) { + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region for table : " + + tableName.getNameAsString()); } - - if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || - Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && - (stopRow.length == 0 || Bytes.compareTo(stopRow, - keys.getFirst()[i]) > 0)) { - byte[] splitStart = startRow.length == 0 || - Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? - keys.getFirst()[i] : startRow; - byte[] splitStop = (stopRow.length == 0 || - Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && - keys.getSecond()[i].length > 0 ? - keys.getSecond()[i] : stopRow; - - HRegionLocation hregionLocation = regionLocator.getRegionLocation( - keys.getFirst()[i], false); - String regionHostname = hregionLocation.getHostname(); - HRegionInfo regionInfo = hregionLocation.getRegionInfo(); - String encodedRegionName = regionInfo.getEncodedName(); - long regionSize = sizeCalculator.getRegionSize( - regionInfo.getRegionName()); - - TableSplit split = new TableSplit(table.getName(), - scan, splitStart, splitStop, regionHostname, - encodedRegionName, regionSize); - - splits.add(split); - - if (LOG.isDebugEnabled()) - LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + int count = 0; + + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || Bytes.compareTo(stopRow, + keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; + + HRegionLocation hregionLocation = regionLocator.getRegionLocation( + keys.getFirst()[i], false); + String regionHostname = hregionLocation.getHostname(); + HRegionInfo regionInfo = hregionLocation.getRegionInfo(); + String encodedRegionName = regionInfo.getEncodedName(); + long regionSize = sizeCalculator.getRegionSize( + regionInfo.getRegionName()); + + TableSplit split = new TableSplit(table.getName(), + scan, splitStart, splitStop, regionHostname, + encodedRegionName, regionSize); + + splits.add(split); + + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } + } } } }