Repository: tajo Updated Branches: refs/heads/branch-0.11.1 39f7b4747 -> 080507523
TAJO-1940: Implement HBaseTablespace::getTableVolume() method. Closes #910 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/08050752 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/08050752 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/08050752 Branch: refs/heads/branch-0.11.1 Commit: 08050752369ad0203444e5de1b3604f2b12c8921 Parents: 39f7b47 Author: Hyunsik Choi <[email protected]> Authored: Wed Jan 27 14:18:21 2016 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Thu Jan 28 11:53:20 2016 -0800 ---------------------------------------------------------------------- CHANGES | 3 + .../tajo/engine/query/TestHBaseTable.java | 107 +++++-- .../org/apache/tajo/plan/StorageService.java | 3 +- .../plan/rewrite/TableStatUpdateRewriter.java | 2 +- .../org/apache/tajo/storage/Tablespace.java | 2 +- .../apache/tajo/storage/TablespaceManager.java | 5 +- .../tajo/storage/hbase/HBaseTablespace.java | 300 ++++++++++--------- .../storage/hbase/RegionSizeCalculator.java | 150 ++++++++++ .../org/apache/tajo/storage/FileTablespace.java | 4 +- .../tajo/storage/jdbc/JdbcTablespace.java | 2 +- 10 files changed, 404 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 008b987..62c0504 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,9 @@ Release 0.11.1 - unreleased IMPROVEMENT + TAJO-1940: Implement HBaseTablespace::getTableVolume() method. + (hyunsik) + TAJO-1991: Tablespace::getVolume should take filter predication. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 7454927..ffa48aa 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -54,7 +54,6 @@ import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.hbase.*; import org.apache.tajo.util.Bytes; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.TUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -65,10 +64,7 @@ import java.net.InetAddress; import java.net.URI; import java.sql.ResultSet; import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.junit.Assert.*; @@ -221,7 +217,62 @@ public class TestHBaseTable extends QueryTestCaseBase { } finally { TablespaceManager.addTableSpaceForTest(existing.get()); } + } + private void putData(HTableInterface htable, int rownum) throws IOException { + for (int i = 0; i < rownum; i++) { + Put put = new Put(String.valueOf(i).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + htable.put(put); + } + } + + @Test + public void testGetTableVolume() throws Exception { + final String tableName = "external_hbase_table"; + + Optional<Tablespace> existing = TablespaceManager.removeTablespaceForTest("cluster1"); + assertTrue(existing.isPresent()); + + try { + HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf(tableName)); + hTableDesc.addFamily(new HColumnDescriptor("col1")); + hTableDesc.addFamily(new HColumnDescriptor("col2")); + hTableDesc.addFamily(new HColumnDescriptor("col3")); + testingCluster.getHBaseUtil().createTable(hTableDesc); + + String sql = String.format( + "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " + + "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " + + "LOCATION '%s/external_hbase_table'", tableSpaceUri); + executeString(sql).close(); + + assertTableExists("external_hbase_mapped_table"); + + HBaseTablespace tablespace = (HBaseTablespace)existing.get(); + HConnection hconn = ((HBaseTablespace)existing.get()).getConnection(); + + try (HTableInterface htable = hconn.getTable(tableName)) { + htable.setAutoFlushTo(true); + putData(htable, 4000); + } + hconn.close(); + + Thread.sleep(3000); // sleep here for up-to-date region server load. It may not be a problem in real cluster. + + TableDesc createdTable = client.getTableDesc("external_hbase_mapped_table"); + assertNotNull(tablespace); + long volume = tablespace.getTableVolume(createdTable, Optional.<EvalNode>absent()); + assertTrue(volume > 0 || volume == -1); + executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); + + } finally { + TablespaceManager.addTableSpaceForTest(existing.get()); + } } @Test @@ -248,22 +299,14 @@ public class TestHBaseTable extends QueryTestCaseBase { HTableInterface htable = hconn.getTable("external_hbase_table"); try { - for (int i = 0; i < 100; i++) { - Put put = new Put(String.valueOf(i).getBytes()); - put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); - put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); - put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); - put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); - put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); - htable.put(put); - } - + putData(htable, 100); ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'"); assertResultSet(res); cleanupQuery(res); } finally { executeString("DROP TABLE external_hbase_mapped_table PURGE").close(); htable.close(); + hconn.close(); } } finally { TablespaceManager.addTableSpaceForTest(existing.get()); @@ -793,13 +836,13 @@ public class TestHBaseTable extends QueryTestCaseBase { Schema schema = new Schema(); schema.addColumn("id", Type.TEXT); schema.addColumn("name", Type.TEXT); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); DecimalFormat df = new DecimalFormat("000"); for (int i = 99; i >= 0; i--) { datas.add(df.format(i) + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into hbase_mapped_table " + "select id, name from base_table ").close(); @@ -849,12 +892,12 @@ public class TestHBaseTable extends QueryTestCaseBase { Schema schema = new Schema(); schema.addColumn("id", Type.TEXT); schema.addColumn("name", Type.TEXT); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); for (int i = 99; i >= 0; i--) { datas.add(i + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into hbase_mapped_table " + "select id, name from base_table ").close(); @@ -907,13 +950,13 @@ public class TestHBaseTable extends QueryTestCaseBase { Schema schema = new Schema(); schema.addColumn("id", Type.TEXT); schema.addColumn("name", Type.TEXT); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); DecimalFormat df = new DecimalFormat("000"); for (int i = 99; i >= 0; i--) { datas.add(df.format(i) + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into hbase_mapped_table " + "select id, name from base_table ").close(); @@ -967,12 +1010,12 @@ public class TestHBaseTable extends QueryTestCaseBase { schema.addColumn("id2", Type.TEXT); schema.addColumn("name", Type.TEXT); DecimalFormat df = new DecimalFormat("000"); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); for (int i = 99; i >= 0; i--) { datas.add(df.format(i) + "|" + (i + 100) + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into hbase_mapped_table " + "select id1, id2, name from base_table ").close(); @@ -1022,12 +1065,12 @@ public class TestHBaseTable extends QueryTestCaseBase { Schema schema = new Schema(); schema.addColumn("id", Type.INT4); schema.addColumn("name", Type.TEXT); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); for (int i = 99; i >= 0; i--) { datas.add(i + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into hbase_mapped_table " + "select id, name from base_table ").close(); @@ -1080,14 +1123,14 @@ public class TestHBaseTable extends QueryTestCaseBase { schema.addColumn("col2_key", Type.TEXT); schema.addColumn("col2_value", Type.TEXT); schema.addColumn("col3", Type.TEXT); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); for (int i = 20; i >= 0; i--) { for (int j = 0; j < 3; j++) { datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i); } } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into hbase_mapped_table " + "select rk, col2_key, col2_value, col3 from base_table ").close(); @@ -1167,12 +1210,12 @@ public class TestHBaseTable extends QueryTestCaseBase { Schema schema = new Schema(); schema.addColumn("id", Type.INT4); schema.addColumn("name", Type.TEXT); - List<String> datas = new ArrayList<String>(); + List<String> datas = new ArrayList<>(); for (int i = 99; i >= 0; i--) { datas.add(i + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); try { executeString("insert into hbase_mapped_table " + @@ -1245,7 +1288,7 @@ public class TestHBaseTable extends QueryTestCaseBase { datas.add(df.format(i) + "|value" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString( "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " + @@ -1330,7 +1373,7 @@ public class TestHBaseTable extends QueryTestCaseBase { } finally { executeString("DROP TABLE hbase_mapped_table PURGE").close(); - client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE)); + client.unsetSessionVariables(Arrays.asList(HBaseStorageConstants.INSERT_PUT_MODE)); if (scanner != null) { scanner.close(); @@ -1367,7 +1410,7 @@ public class TestHBaseTable extends QueryTestCaseBase { datas.add(df.format(i) + "|value" + i + "|comment-" + i); } TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table", - schema, tableOptions, datas.toArray(new String[]{}), 2); + schema, tableOptions, datas.toArray(new String[datas.size()]), 2); executeString("insert into location '/tmp/hfile_test' " + "select id, name, comment from base_table ").close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java index f27aeff..3720dca 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/StorageService.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan; import com.google.common.base.Optional; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; @@ -40,5 +41,5 @@ public interface StorageService { */ URI getTableURI(@Nullable String spaceName, String databaseName, String tableName); - long getTableVolumn(URI uri, Optional<EvalNode> filter) throws UnsupportedException; + long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java index a1e9a6d..df65482 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/TableStatUpdateRewriter.java @@ -110,7 +110,7 @@ public class TableStatUpdateRewriter implements LogicalPlanRewriteRule { private long getTableVolume(TableDesc table, Optional<EvalNode> filter) { try { if (table.getStats() != null) { - return storage.getTableVolumn(table.getUri(), filter); + return storage.getTableVolumn(table, filter); } } catch (UnsupportedException t) { LOG.warn(table.getName() + " does not support Tablespace::getTableVolume()"); http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 01ef9de..8919d87 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -99,7 +99,7 @@ public abstract class Tablespace { return name + "=" + uri.toString(); } - public abstract long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException; + public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException; /** * if {@link StorageProperty#isArbitraryPathAllowed} is true, http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 33cd7a3..798de1d 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.MetadataProvider; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UndefinedTablespaceException; @@ -439,9 +440,9 @@ public class TablespaceManager implements StorageService { } @Override - public long getTableVolumn(URI tableUri, Optional<EvalNode> filter) + public long getTableVolumn(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { - return get(tableUri).getTableVolume(tableUri, filter); + return get(table.getUri()).getTableVolume(table, filter); } public static Iterable<Tablespace> getAllTablespaces() { http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 8aa51e6..d522fe2 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.hbase; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import net.minidev.json.JSONObject; import org.apache.commons.logging.Log; @@ -34,7 +35,6 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.util.RegionSizeCalculator; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.task.JobContextImpl; @@ -74,6 +74,8 @@ public class HBaseTablespace extends Tablespace { new StorageProperty("hbase", false, true, false, false); public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true); public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false); + public static final List<byte []> EMPTY_START_ROW_KEY = Arrays.asList(new byte [0]); + public static final List<byte []> EMPTY_END_ROW_KEY = Arrays.asList(new byte [0]); private Configuration hbaseConf; @@ -99,8 +101,20 @@ public class HBaseTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException { - throw new UnsupportedException(); + public long getTableVolume(TableDesc table, Optional<EvalNode> filter) { + long totalVolume = 0; + try { + for (HBaseFragment f : getRawSplits("", table, filter.orNull())) { + if (f.getLength() > 0) { + totalVolume += f.getLength(); + } + } + } catch (TajoException e) { + throw new TajoRuntimeException(e); + } catch (Throwable ioe) { + throw new TajoInternalError(ioe); + } + return totalVolume; } @Override @@ -141,8 +155,8 @@ public class HBaseTablespace extends Tablespace { ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions()); int numRowKeys = 0; boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - for (int i = 0; i < isRowKeyMappings.length; i++) { - if (isRowKeyMappings[i]) { + for (boolean isRowKeyMapping : isRowKeyMappings) { + if (isRowKeyMapping) { numRowKeys++; } } @@ -180,7 +194,7 @@ public class HBaseTablespace extends Tablespace { tableColumnFamilies.add(eachColumn.getNameAsString()); } - Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames(); + Collection<String> mappingColumnFamilies = columnMapping.getColumnFamilyNames(); if (mappingColumnFamilies.isEmpty()) { throw new MissingTablePropertyException(HBaseStorageConstants.META_COLUMNS_KEY, hbaseTableName); } @@ -272,13 +286,15 @@ public class HBaseTablespace extends Tablespace { // If there is many split keys, Tajo allows to define in the file. Path path = new Path(splitRowKeysFile); FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path)) { throw new MissingTablePropertyException("hbase.split.rowkeys.file=" + path.toString() + " not exists.", hbaseTableName); } - SortedSet<String> splitKeySet = new TreeSet<String>(); + SortedSet<String> splitKeySet = new TreeSet<>(); BufferedReader reader = null; + try { reader = new BufferedReader(new InputStreamReader(fs.open(path))); String line = null; @@ -415,147 +431,166 @@ public class HBaseTablespace extends Tablespace { return new Column[]{indexColumn}; } - @Override - public List<Fragment> getSplits(String inputSourceId, - TableDesc tableDesc, - @Nullable EvalNode filterCondition) - throws IOException, TajoException { - - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions()); - - List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition); - HTable htable = null; + private Pair<List<byte []>, List<byte []>> getSelectedKeyRange( + ColumnMapping columnMap, + List<IndexPredication> predicates) { - try { + final List<byte[]> startRows; + final List<byte[]> stopRows; - htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable); + if (predicates != null && !predicates.isEmpty()) { + startRows = new ArrayList<>(); + stopRows = new ArrayList<>(); - org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - if (null == regLoc) { - throw new IOException("Expecting at least one region."); + // indexPredications is Disjunctive set + for (IndexPredication pred: predicates) { + if (pred.getStartValue() != null) { + startRows.add(serialize(columnMap, pred, pred.getStartValue())); + } else { + startRows.add(HConstants.EMPTY_START_ROW); } - List<Fragment> fragments = new ArrayList<Fragment>(1); - HBaseFragment fragment = new HBaseFragment( - tableDesc.getUri(), - inputSourceId, htable.getName().getNameAsString(), - HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, - regLoc.getHostname()); - long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); - if (regionSize == 0) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + if (pred.getStopValue() != null) { + stopRows.add(serialize(columnMap, pred, pred.getStopValue())); } else { - fragment.setLength(regionSize); + stopRows.add(HConstants.EMPTY_START_ROW); } - fragments.add(fragment); - return fragments; } + } else { + startRows = EMPTY_START_ROW_KEY; + stopRows = EMPTY_END_ROW_KEY; + } - List<byte[]> startRows; - List<byte[]> stopRows; - - if (indexPredications != null && !indexPredications.isEmpty()) { - // indexPredications is Disjunctive set - startRows = new ArrayList<byte[]>(); - stopRows = new ArrayList<byte[]>(); - for (IndexPredication indexPredication: indexPredications) { - byte[] startRow; - byte[] stopRow; - if (indexPredication.getStartValue() != null) { - startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue()); - } else { - startRow = HConstants.EMPTY_START_ROW; - } - if (indexPredication.getStopValue() != null) { - stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue()); - } else { - stopRow = HConstants.EMPTY_END_ROW; - } - startRows.add(startRow); - stopRows.add(stopRow); - } - } else { - startRows = TUtil.newList(HConstants.EMPTY_START_ROW); - stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); + return new Pair(startRows, stopRows); + } + + private boolean isEmptyRegion(org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange) { + return tableRange == null || tableRange.getFirst() == null || tableRange.getFirst().length == 0; + } + + private long getRegionSize(RegionSizeCalculator calculator, byte [] regionName) { + long regionSize = calculator.getRegionSize(regionName); + if (regionSize == 0) { + return TajoConstants.UNKNOWN_LENGTH; + } else { + return regionSize; + } + } + + private List<HBaseFragment> createEmptyFragment(TableDesc table, String sourceId, HTable htable, + RegionSizeCalculator sizeCalculator) throws IOException { + HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + + HBaseFragment fragment = new HBaseFragment( + table.getUri(), + sourceId, htable.getName().getNameAsString(), + HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, + regLoc.getHostname()); + + fragment.setLength(getRegionSize(sizeCalculator, regLoc.getRegionInfo().getRegionName())); + return ImmutableList.of(fragment); + } + + private Collection<HBaseFragment> convertRangeToFragment( + TableDesc table, String inputSourceId, HTable htable, RegionSizeCalculator sizeCalculator, + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange, + Pair<List<byte[]>, List<byte[]>> selectedRange) throws IOException { + + final Map<byte[], HBaseFragment> fragmentMap = new HashMap<>(); + + for (int i = 0; i < tableRange.getFirst().length; i++) { + HRegionLocation location = htable.getRegionLocation(tableRange.getFirst()[i], false); + if (location == null) { + throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(tableRange.getFirst()[i])); } - // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext) - // region startkey -> HBaseFragment - Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>(); - for (int i = 0; i < keys.getFirst().length; i++) { - HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); - if (null == location) { - throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(keys.getFirst()[i])); - } + final byte[] regionStartKey = tableRange.getFirst()[i]; + final byte[] regionStopKey = tableRange.getSecond()[i]; + + int startRowsSize = selectedRange.getFirst().size(); + for (int j = 0; j < startRowsSize; j++) { + byte[] startRow = selectedRange.getFirst().get(j); + byte[] stopRow = selectedRange.getSecond().get(j); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) + && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + final byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? + regionStartKey : startRow; + + final byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && + regionStopKey.length > 0 ? regionStopKey : stopRow; + + if (fragmentMap.containsKey(regionStartKey)) { + final HBaseFragment prevFragment = fragmentMap.get(regionStartKey); + if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { + prevFragment.setStartRow(fragmentStart); + } + if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { + prevFragment.setStopRow(fragmentStop); + } + } else { - byte[] regionStartKey = keys.getFirst()[i]; - byte[] regionStopKey = keys.getSecond()[i]; - - int startRowsSize = startRows.size(); - for (int j = 0; j < startRowsSize; j++) { - byte[] startRow = startRows.get(j); - byte[] stopRow = stopRows.get(j); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) - && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { - byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? - regionStartKey : startRow; - - byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && - regionStopKey.length > 0 ? regionStopKey : stopRow; - - if (fragmentMap.containsKey(regionStartKey)) { - HBaseFragment prevFragment = fragmentMap.get(regionStartKey); - if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { - prevFragment.setStartRow(fragmentStart); - } - if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { - prevFragment.setStopRow(fragmentStop); - } - } else { - byte[] regionName = location.getRegionInfo().getRegionName(); - long regionSize = sizeCalculator.getRegionSize(regionName); - - HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(), - inputSourceId, - htable.getName().getNameAsString(), - fragmentStart, - fragmentStop, - location.getHostname()); - if (regionSize == 0) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } else { - fragment.setLength(regionSize); - } + final HBaseFragment fragment = new HBaseFragment(table.getUri(), + inputSourceId, + htable.getName().getNameAsString(), + fragmentStart, + fragmentStop, + location.getHostname()); - fragmentMap.put(regionStartKey, fragment); - if (LOG.isDebugEnabled()) { - LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); - } + fragment.setLength(getRegionSize(sizeCalculator, location.getRegionInfo().getRegionName())); + fragmentMap.put(regionStartKey, fragment); + + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); } } } } + } + + return fragmentMap.values(); + } + + @Override + public List<Fragment> getSplits(String inputSourceId, + TableDesc table, + @Nullable EvalNode filterCondition) throws IOException, TajoException { + return (List<Fragment>) (List) getRawSplits(inputSourceId, table, filterCondition); + } + + private List<HBaseFragment> getRawSplits(String inputSourceId, + TableDesc table, + @Nullable EvalNode filterCondition) throws IOException, TajoException { + final ColumnMapping columnMapping = new ColumnMapping(table.getSchema(), table.getMeta().getOptions()); - List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values()); + try (final HTable htable = new HTable(hbaseConf, table.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY))) { + final RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable); + final org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> tableRange = htable.getStartEndKeys(); + if (isEmptyRegion(tableRange)) { + return createEmptyFragment(table, inputSourceId, htable, sizeCalculator); + } + + final Pair<List<byte []>, List<byte []>> selectedRange = getSelectedKeyRange( + columnMapping, + getIndexPredications(columnMapping, table, filterCondition)); + + // region startkey -> HBaseFragment + List<HBaseFragment> fragments = new ArrayList<>(convertRangeToFragment(table, inputSourceId, htable, sizeCalculator, tableRange, selectedRange)); Collections.sort(fragments); if (!fragments.isEmpty()) { fragments.get(fragments.size() - 1).setLast(true); } - return (ArrayList<Fragment>) (ArrayList) fragments; - } finally { - if (htable != null) { - htable.close(); - } + + return ImmutableList.copyOf(fragments); } } private byte[] serialize(ColumnMapping columnMapping, - IndexPredication indexPredication, Datum datum) throws IOException { + IndexPredication indexPredication, Datum datum) { if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) { return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum); } else { @@ -611,7 +646,7 @@ public class HBaseTablespace extends Tablespace { private String username; HConnectionKey(Configuration conf) { - Map<String, String> m = new HashMap<String, String>(); + Map<String, String> m = new HashMap<>(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); @@ -689,10 +724,7 @@ public class HBaseTablespace extends Tablespace { @Override public String toString() { - return "HConnectionKey{" + - "properties=" + properties + - ", username='" + username + '\'' + - '}'; + return "HConnectionKey{ properties=" + properties + ", username='" + username + '\'' + '}'; } } @@ -724,7 +756,7 @@ public class HBaseTablespace extends Tablespace { public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual, Column[] indexableColumns) throws IOException { - List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); + List<Set<EvalNode>> indexablePredicateList = new ArrayList<>(); // if a query statement has a search condition, try to find indexable predicates if (indexableColumns != null && qual != null) { @@ -874,7 +906,7 @@ public class HBaseTablespace extends Tablespace { new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE})); } if (startDatum != null || endDatum != null) { - return new Pair<Datum, Datum>(startDatum, endDatum); + return new Pair<>(startDatum, endDatum); } else { return null; } @@ -944,7 +976,7 @@ public class HBaseTablespace extends Tablespace { if (endKeys.length == 1) { return new TupleRange[]{dataRange}; } - List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length); + List<TupleRange> tupleRanges = new ArrayList<>(endKeys.length); TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs); Tuple previousTuple = dataRange.getStart(); @@ -976,12 +1008,12 @@ public class HBaseTablespace extends Tablespace { for (int i = 0; i < sortSpecs.length; i++) { if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) { endTuple.put(i, - HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), - rowKeyFields[i])); + HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); } else { endTuple.put(i, - HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), - rowKeyFields[i])); + HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); } } tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple)); http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java new file mode 100644 index 0000000..806320d --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RegionSizeCalculator.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is borrowed from Hbase, but it is modified in order to recognize + * the mem store size. + * + * Computes size of each region for given table and given column families. + * The value is used by MapReduce for better scheduling. + * */ [email protected] [email protected] +public class RegionSizeCalculator { + + private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class); + + /** + * Maps each region to its size in bytes. + * */ + private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + + static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable"; + + /** + * Computes size of each region for table and given column families. + * + * @deprecated Use {@link #RegionSizeCalculator(RegionLocator, Admin)} instead. + */ + @Deprecated + public RegionSizeCalculator(HTable table) throws IOException { + HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); + try { + init(table.getRegionLocator(), admin); + } finally { + admin.close(); + } + } + + /** + * Computes size of each region for table and given column families. + * */ + public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException { + init(regionLocator, admin); + } + + private void init(RegionLocator regionLocator, Admin admin) + throws IOException { + if (!enabled(admin.getConfiguration())) { + LOG.info("Region size calculation disabled."); + return; + } + + LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\"."); + + //get regions for table + List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations(); + Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + for (HRegionLocation regionInfo : tableRegionInfos) { + tableRegions.add(regionInfo.getRegionInfo().getRegionName()); + } + + ClusterStatus clusterStatus = admin.getClusterStatus(); + Collection<ServerName> servers = clusterStatus.getServers(); + final long megaByte = 1024L * 1024L; + + //iterate all cluster regions, filter regions from our table and compute their size + for (ServerName serverName: servers) { + ServerLoad serverLoad = clusterStatus.getLoad(serverName); + + for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) { + byte[] regionId = regionLoad.getName(); + + if (tableRegions.contains(regionId)) { + + long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte; + sizeMap.put(regionId, regionSizeBytes); + + if (LOG.isDebugEnabled()) { + LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes); + } + } + } + } + LOG.debug("Region sizes calculated"); + } + + boolean enabled(Configuration configuration) { + return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true); + } + + /** + * Returns size of given region in bytes. Returns 0 if region was not found. + * */ + public long getRegionSize(byte[] regionId) { + Long size = sizeMap.get(regionId); + if (size == null) { + LOG.debug("Unknown region:" + Arrays.toString(regionId)); + return 0; + } else { + return size; + } + } + + public Map<byte[], Long> getRegionSizeMap() { + return Collections.unmodifiableMap(sizeMap); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index dc4502c..61ecab8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -127,8 +127,8 @@ public class FileTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException { - Path path = new Path(uri); + public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { + Path path = new Path(table.getUri()); ContentSummary summary; try { summary = fs.getContentSummary(path); http://git-wip-us.apache.org/repos/asf/tajo/blob/08050752/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java index da0b5a7..4c62523 100644 --- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java +++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java @@ -110,7 +110,7 @@ public abstract class JdbcTablespace extends Tablespace { } @Override - public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException { + public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { throw new UnsupportedException(); }
