Repository: tajo Updated Branches: refs/heads/master 3cf7f24bc -> 73a43d8b7
TAJO-1940: Implement HBaseTablespace::getTableVolume() method. Closes #910 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73a43d8b Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73a43d8b Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73a43d8b Branch: refs/heads/master Commit: 73a43d8b7bdba7c3963efd9df87f6e3d3a703cfd Parents: 3cf7f24 Author: Hyunsik Choi <[email protected]> Authored: Wed Jan 27 14:18:21 2016 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Wed Jan 27 14:18:21 2016 -0800 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/engine/query/TestHBaseTable.java | 65 ++++- .../org/apache/tajo/plan/StorageService.java | 3 +- .../plan/rewrite/TableStatUpdateRewriter.java | 2 +- .../org/apache/tajo/storage/Tablespace.java | 2 +- .../apache/tajo/storage/TablespaceManager.java | 5 +- .../tajo/storage/hbase/HBaseTablespace.java | 282 +++++++++++-------- .../storage/hbase/RegionSizeCalculator.java | 150 ++++++++++ .../org/apache/tajo/storage/FileTablespace.java | 4 +- .../tajo/storage/jdbc/JdbcTablespace.java | 2 +- 10 files changed, 377 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6bc9635..47beafb 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik) + TAJO-2061: Add description for EXPLAIN statement. (jaehwa) TAJO-2060: Upgrade geoip-api-java library. (Byunghwa Yun via jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 97feb65..d4712dc 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -210,7 +210,62 @@ public class TestHBaseTable extends QueryTestCaseBase { } finally { TablespaceManager.addTableSpaceForTest(existing.get()); } + } + private void putData(HTableInterface htable, int rownum) throws IOException { + for (int i = 0; i < rownum; i++) { + Put put = new Put(String.valueOf(i).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + } + + @Test + public void testGetTableVolume() throws Exception { + final String tableName = "external_hbase_table"; + + Optional<Tablespace> existing = TablespaceManager.removeTablespaceForTest("cluster1"); + assertTrue(existing.isPresent()); + + try { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf(tableName)); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " + + "LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + HBaseTablespace tablespace = (HBaseTablespace)existing.get(); + HConnection hconn = ((HBaseTablespace)existing.get()).getConnection(); + + try (HTableInterface htable = hconn.getTable(tableName)) { + htable.setAutoFlushTo(true); + putData(htable, 4000); + } + hconn.close(); + + Thread.sleep(3000); // sleep here for up-to-date region server load. It may not be a problem in real cluster. + + TableDesc createdTable = client.getTableDesc("external_hbase_mapped_table"); + assertNotNull(tablespace); + long volume = tablespace.getTableVolume(createdTable, Optional.empty()); + assertTrue(volume > 0 || volume == -1); + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + + } finally { + TablespaceManager.addTableSpaceForTest(existing.get()); + } } @Test @@ -236,15 +291,7 @@ public class TestHBaseTable extends QueryTestCaseBase { HConnection hconn = ((HBaseTablespace)existing.get()).getConnection(); try (HTableInterface htable = hconn.getTable("external_hbase_table")) { - for (int i = 0; i < 100; i++) { - Put put = new Put(String.valueOf(i).getBytes()); - put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); - put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); - put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); - put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); - put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); - htable.put(put); - } + putData(htable, 100); ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'"); assertResultSet(res); http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java index 10d11f0..cbb7387 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java @@ -18,6 +18,7 @@ package org.apache.tajo.plan; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; @@ -40,5 +41,5 @@ public interface StorageService { */ URI getTableURI(@Nullable String spaceName, String databaseName, String tableName); - long getTableVolumn(URI uri, Optional<EvalNode> filter) throws UnsupportedException; + long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java index 3683f60..238980b 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java @@ -110,7 +110,7 @@ public class TableStatUpdateRewriter implements LogicalPlanRewriteRule { private long getTableVolume(TableDesc table, Optional<EvalNode> filter) { try { if (table.getStats() != null) { - return storage.getTableVolumn(table.getUri(), filter); + return storage.getTableVolumn(table, filter); } } catch (UnsupportedException t) { LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()"); http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 6c97754..00e6d75 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -99,7 +99,7 @@ public abstract class Tablespace { return name + "=" + uri.toString(); } - public abstract long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException; + public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException; /** * if {@link StorageProperty#isArbitraryPathAllowed} is true, http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 12e283f..88410bb 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.MetadataProvider; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UndefinedTablespaceException; @@ -435,9 +436,9 @@ public class TablespaceManager implements StorageService { } @Override - public long getTableVolumn(URI tableUri, Optional<EvalNode> filter) + public long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { - return get(tableUri).getTableVolume(tableUri, filter); + return get(table.getUri()).getTableVolume(table, filter); } public static Iterable<Tablespace> getAllTablespaces() { http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 1b81531..132ceff 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.hbase; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import net.minidev.json.JSONObject; import org.apache.commons.logging.Log; @@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.util.RegionSizeCalculator; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.task.JobContextImpl; @@ -54,7 +54,10 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; import org.apache.tajo.plan.verifier.SyntaxErrorUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.*; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.Pair; import javax.annotation.Nullable; import java.io.BufferedReader; @@ -101,8 +104,19 @@ public class HBaseTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException { - throw new UnsupportedException(); + public long getTableVolume(TableDesc table, Optional<EvalNode> filter) { + long totalVolume; + try { + totalVolume = getRawSplits("", table, filter.orElse(null)).stream() + .map(f -> f.getLength()) + .filter(size -> size > 0) // eliminate unknown sizes (-1) + .reduce(0L, Long::sum); + } catch (TajoException e) { + throw new TajoRuntimeException(e); + } catch (Throwable ioe) { + throw new TajoInternalError(ioe); + } + return totalVolume; } @Override @@ -143,8 +157,8 @@ public class HBaseTablespace extends Tablespace { ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getPropertySet()); int numRowKeys = 0; boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - for (boolean isRowKeyMapping : isRowKeyMappings) { - if (isRowKeyMapping) { + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { numRowKeys++; } } @@ -412,143 +426,165 @@ public class HBaseTablespace extends Tablespace { return new Column[]{indexColumn}; } - @Override - public List<Fragment> getSplits(String inputSourceId, - TableDesc tableDesc, - @Nullable EvalNode filterCondition) - throws IOException, TajoException { + private Pair<List<byte []>, List<byte []>> getSelectedKeyRange( + ColumnMapping columnMap, + List<IndexPredication> predicates) { - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getPropertySet()); + final List<byte[]> startRows; + final List<byte[]> stopRows; - List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition); - HTable htable = null; + if (predicates != null && !predicates.isEmpty()) { + // indexPredications is Disjunctive set + startRows = predicates.stream() + .map(x -> { + if (x.getStartValue() != null) { + return serialize(columnMap, x, x.getStartValue()); + } else { + return HConstants.EMPTY_START_ROW; + } + }) + .collect(Collectors.toList()); - try { - htable = new HTable(hbaseConf, tableDesc.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY)); - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable); - - org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - List<Fragment> fragments = new ArrayList<>(1); - HBaseFragment fragment = new HBaseFragment( - tableDesc.getUri(), - inputSourceId, htable.getName().getNameAsString(), - HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, - regLoc.getHostname()); - long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - if (regionSize == 0) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } else { - fragment.setLength(regionSize); - } - fragments.add(fragment); - return fragments; - } + stopRows = predicates.stream() + .map(x -> { + if (x.getStopValue() != null) { + return serialize(columnMap, x, x.getStopValue()); + } else { + return HConstants.EMPTY_START_ROW; + } + }) + .collect(Collectors.toList()); - final List<byte[]> startRows; - final List<byte[]> stopRows; + } else { + startRows = EMPTY_START_ROW_KEY; + stopRows = EMPTY_END_ROW_KEY; + } - if (indexPredications != null && !indexPredications.isEmpty()) { - // indexPredications is Disjunctive set - startRows = indexPredications.stream() - .map(x -> { - if (x.getStartValue() != null) { - return serialize(columnMapping, x, x.getStartValue()); - } else { - return HConstants.EMPTY_START_ROW; - } - }) - .collect(Collectors.toList()); - - stopRows = indexPredications.stream() - .map(x -> { - if (x.getStopValue() != null) { - return serialize(columnMapping, x, x.getStopValue()); - } else { - return HConstants.EMPTY_START_ROW; - } - }) - .collect(Collectors.toList()); + return new Pair(startRows, stopRows); + } - } else { - startRows = EMPTY_START_ROW_KEY; - stopRows = EMPTY_END_ROW_KEY; + private boolean isEmptyRegion(org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange) { + return tableRange == null || tableRange.getFirst() == null || tableRange.getFirst().length == 0; + } + + private long getRegionSize(RegionSizeCalculator calculator, byte [] regionName) { + long regionSize = calculator.getRegionSize(regionName); + if (regionSize == 0) { + return TajoConstants.UNKNOWN_LENGTH; + } else { + return regionSize; + } + } + + private List<HBaseFragment> createEmptyFragment(TableDesc table, String sourceId, HTable htable, + RegionSizeCalculator sizeCalculator) throws IOException { + HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + + HBaseFragment fragment = new HBaseFragment( + table.getUri(), + sourceId, htable.getName().getNameAsString(), + HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, + regLoc.getHostname()); + + fragment.setLength(getRegionSize(sizeCalculator, regLoc.getRegionInfo().getRegionName())); + return ImmutableList.of(fragment); + } + + private Collection<HBaseFragment> convertRangeToFragment( + TableDesc table, String inputSourceId, HTable htable, RegionSizeCalculator sizeCalculator, + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange, + Pair<List<byte[]>, List<byte[]>> selectedRange) throws IOException { + + final Map<byte[], HBaseFragment> fragmentMap = new HashMap<>(); + + for (int i = 0; i < tableRange.getFirst().length; i++) { + HRegionLocation location = htable.getRegionLocation(tableRange.getFirst()[i], false); + if (location == null) { + throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(tableRange.getFirst()[i])); } - // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext) - // region startkey -> HBaseFragment - Map<byte[], HBaseFragment> fragmentMap = new HashMap<>(); - for (int i = 0; i < keys.getFirst().length; i++) { - HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); - if (null == location) { - throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(keys.getFirst()[i])); - } + final byte[] regionStartKey = tableRange.getFirst()[i]; + final byte[] regionStopKey = tableRange.getSecond()[i]; + + int startRowsSize = selectedRange.getFirst().size(); + for (int j = 0; j < startRowsSize; j++) { + byte[] startRow = selectedRange.getFirst().get(j); + byte[] stopRow = selectedRange.getSecond().get(j); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) + && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + final byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? + regionStartKey : startRow; + + final byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && + regionStopKey.length > 0 ? regionStopKey : stopRow; + + if (fragmentMap.containsKey(regionStartKey)) { + final HBaseFragment prevFragment = fragmentMap.get(regionStartKey); + if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { + prevFragment.setStartRow(fragmentStart); + } + if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { + prevFragment.setStopRow(fragmentStop); + } + } else { - byte[] regionStartKey = keys.getFirst()[i]; - byte[] regionStopKey = keys.getSecond()[i]; - - int startRowsSize = startRows.size(); - for (int j = 0; j < startRowsSize; j++) { - byte[] startRow = startRows.get(j); - byte[] stopRow = stopRows.get(j); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) - && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { - byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? - regionStartKey : startRow; - - byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && - regionStopKey.length > 0 ? regionStopKey : stopRow; - - if (fragmentMap.containsKey(regionStartKey)) { - HBaseFragment prevFragment = fragmentMap.get(regionStartKey); - if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { - prevFragment.setStartRow(fragmentStart); - } - if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { - prevFragment.setStopRow(fragmentStop); - } - } else { - byte[] regionName = location.getRegionInfo().getRegionName(); - long regionSize = sizeCalculator.getRegionSize(regionName); - - HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(), - inputSourceId, - htable.getName().getNameAsString(), - fragmentStart, - fragmentStop, - location.getHostname()); - if (regionSize == 0) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } else { - fragment.setLength(regionSize); - } + final HBaseFragment fragment = new HBaseFragment(table.getUri(), + inputSourceId, + htable.getName().getNameAsString(), + fragmentStart, + fragmentStop, + location.getHostname()); - fragmentMap.put(regionStartKey, fragment); - if (LOG.isDebugEnabled()) { - LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); - } + fragment.setLength(getRegionSize(sizeCalculator, location.getRegionInfo().getRegionName())); + fragmentMap.put(regionStartKey, fragment); + + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); } } } } + } + + return fragmentMap.values(); + } - List<HBaseFragment> fragments = new ArrayList<>(fragmentMap.values()); + @Override + public List<Fragment> getSplits(String inputSourceId, + TableDesc table, + @Nullable EvalNode filterCondition) throws IOException, TajoException { + return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition); + } + + private List<HBaseFragment> getRawSplits(String inputSourceId, + TableDesc table, + @Nullable EvalNode filterCondition) throws IOException, TajoException { + final ColumnMapping columnMapping = new ColumnMapping(table.getSchema(), table.getMeta().getPropertySet()); + + try (final HTable htable = new HTable(hbaseConf, table.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY))) { + final RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable); + final org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange = htable.getStartEndKeys(); + if (isEmptyRegion(tableRange)) { + return createEmptyFragment(table, inputSourceId, htable, sizeCalculator); + } + + final Pair<List<byte []>, List<byte []>> selectedRange = getSelectedKeyRange( + columnMapping, + getIndexPredications(columnMapping, table, filterCondition)); + + // region startkey -> HBaseFragment + List<HBaseFragment> fragments = new ArrayList<>(convertRangeToFragment(table, inputSourceId, htable, sizeCalculator, tableRange, selectedRange)); Collections.sort(fragments); if (!fragments.isEmpty()) { fragments.get(fragments.size() - 1).setLast(true); } - return (ArrayList<Fragment>) (ArrayList) fragments; - } finally { - if (htable != null) { - htable.close(); - } + return ImmutableList.copyOf(fragments); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java new file mode 100644 index 0000000..806320d --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is borrowed from Hbase, but it is modified in order to recognize + * the mem store size. + * + * Computes size of each region for given table and given column families. + * The value is used by MapReduce for better scheduling. + * */ [email protected] [email protected] +public class RegionSizeCalculator { + + private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class); + + /** + * Maps each region to its size in bytes. + * */ + private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + + static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable"; + + /** + * Computes size of each region for table and given column families. + * + * @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead. + */ + @Deprecated + public RegionSizeCalculator(HTable table) throws IOException { + HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); + try { + init(table.getRegionLocator(), admin); + } finally { + admin.close(); + } + } + + /** + * Computes size of each region for table and given column families. + * */ + public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException { + init(regionLocator, admin); + } + + private void init(RegionLocator regionLocator, Admin admin) + throws IOException { + if (!enabled(admin.getConfiguration())) { + LOG.info("Region size calculation disabled."); + return; + } + + LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\"."); + + //get regions for table + List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations(); + Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + for (HRegionLocation regionInfo : tableRegionInfos) { + tableRegions.add(regionInfo.getRegionInfo().getRegionName()); + } + + ClusterStatus clusterStatus = admin.getClusterStatus(); + Collection<ServerName> servers = clusterStatus.getServers(); + final long megaByte = 1024L * 1024L; + + //iterate all cluster regions, filter regions from our table and compute their size + for (ServerName serverName: servers) { + ServerLoad serverLoad = clusterStatus.getLoad(serverName); + + for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) { + byte[] regionId = regionLoad.getName(); + + if (tableRegions.contains(regionId)) { + + long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte; + sizeMap.put(regionId, regionSizeBytes); + + if (LOG.isDebugEnabled()) { + LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes); + } + } + } + } + LOG.debug("Region sizes calculated"); + } + + boolean enabled(Configuration configuration) { + return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true); + } + + /** + * Returns size of given region in bytes. Returns 0 if region was not found. + * */ + public long getRegionSize(byte[] regionId) { + Long size = sizeMap.get(regionId); + if (size == null) { + LOG.debug("Unknown region:" + Arrays.toString(regionId)); + return 0; + } else { + return size; + } + } + + public Map<byte[], Long> getRegionSizeMap() { + return Collections.unmodifiableMap(sizeMap); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index e50d587..35504af 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -125,8 +125,8 @@ public class FileTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException { - Path path = new Path(uri); + public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { + Path path = new Path(table.getUri()); ContentSummary summary; try { summary = fs.getContentSummary(path); http://git-wip-us.apache.org/repos/asf/tajo/blob/73a43d8b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index 1f7f299..fa6cf48 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -110,7 +110,7 @@ public abstract class JdbcTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException { + public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { throw new UnsupportedException(); }
