Repository: tajo Updated Branches: refs/heads/master 252c311ea -> 2aefa0dad
TAJO-1990: Refine some parts in HBaseTablespace. Closes #880 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2aefa0da Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2aefa0da Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2aefa0da Branch: refs/heads/master Commit: 2aefa0dada15fbfe1f2c45534a160521f00c0758 Parents: 252c311 Author: Hyunsik Choi <[email protected]> Authored: Sun Dec 13 22:18:57 2015 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Sun Dec 13 22:18:57 2015 -0800 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/storage/hbase/HBaseTablespace.java | 92 ++++++++++---------- .../tajo/storage/hbase/IndexPredication.java | 31 +++---- 3 files changed, 60 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9b7842f..c297940 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-1990: Refine some parts in HBaseTablespace. (hyunsik) + TAJO-2005: Add TableStatUpdateRewriter. (hyunsik) TAJO-1948: Change GroupbyNode::setAggFunctions and getAggFunctions to set http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index f06cc67..c1e8a2d 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -61,6 +61,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.*; +import java.util.stream.Collectors; /** * Tablespace for HBase table. @@ -72,6 +73,8 @@ public class HBaseTablespace extends Tablespace { new StorageProperty("hbase", false, true, false, false); public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true); public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false); + public static final List<byte []> EMPTY_START_ROW_KEY = TUtil.newList(HConstants.EMPTY_START_ROW); + public static final List<byte []> EMPTY_END_ROW_KEY = TUtil.newList(HConstants.EMPTY_END_ROW); private Configuration hbaseConf; @@ -266,6 +269,7 @@ public class HBaseTablespace extends Tablespace { // If there is many split keys, Tajo allows to define in the file. Path path = new Path(splitRowKeysFile); FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path)) { throw new MissingTablePropertyException("hbase.split.rowkeys.file=" + path.toString() + " not exists.", hbaseTableName); @@ -273,6 +277,7 @@ public class HBaseTablespace extends Tablespace { SortedSet<String> splitKeySet = new TreeSet<>(); BufferedReader reader = null; + try { reader = new BufferedReader(new InputStreamReader(fs.open(path))); String line = null; @@ -438,32 +443,34 @@ public class HBaseTablespace extends Tablespace { return fragments; } - List<byte[]> startRows; - List<byte[]> stopRows; + final List<byte[]> startRows; + final List<byte[]> stopRows; if (indexPredications != null && !indexPredications.isEmpty()) { // indexPredications is Disjunctive set - startRows = new ArrayList<>(); - stopRows = new ArrayList<>(); - for (IndexPredication indexPredication: indexPredications) { - byte[] startRow; - byte[] stopRow; - if (indexPredication.getStartValue() != null) { - startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue()); - } else { - startRow = HConstants.EMPTY_START_ROW; - } - if (indexPredication.getStopValue() != null) { - stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue()); - } else { - stopRow = HConstants.EMPTY_END_ROW; - } - startRows.add(startRow); - stopRows.add(stopRow); - } + startRows = indexPredications.stream() + .map(x -> { + if (x.getStartValue() != null) { + return serialize(columnMapping, x, x.getStartValue()); + } else { + return HConstants.EMPTY_START_ROW; + } + }) + .collect(Collectors.toList()); + + stopRows = indexPredications.stream() + .map(x -> { + if (x.getStopValue() != null) { + return serialize(columnMapping, x, x.getStopValue()); + } else { + return HConstants.EMPTY_START_ROW; + } + }) + .collect(Collectors.toList()); + } else { - startRows = TUtil.newList(HConstants.EMPTY_START_ROW); - stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); + startRows = EMPTY_START_ROW_KEY; + stopRows = EMPTY_END_ROW_KEY; } hAdmin = new HBaseAdmin(hbaseConf); @@ -544,6 +551,7 @@ public class HBaseTablespace extends Tablespace { if (!fragments.isEmpty()) { fragments.get(fragments.size() - 1).setLast(true); } + return (ArrayList<Fragment>) (ArrayList) fragments; } finally { if (htable != null) { @@ -556,7 +564,7 @@ public class HBaseTablespace extends Tablespace { } private byte[] serialize(ColumnMapping columnMapping, - IndexPredication indexPredication, Datum datum) throws IOException { + IndexPredication indexPredication, Datum datum) { if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) { return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum); } else { @@ -690,10 +698,7 @@ public class HBaseTablespace extends Tablespace { @Override public String toString() { - return "HConnectionKey{" + - "properties=" + properties + - ", username='" + username + '\'' + - '}'; + return "HConnectionKey{ properties=" + properties + ", username='" + username + '\'' + '}'; } } @@ -702,25 +707,22 @@ public class HBaseTablespace extends Tablespace { @Nullable EvalNode filterCondition) throws IOException, MissingTablePropertyException, InvalidTablePropertyException { - List<IndexPredication> indexPredications = new ArrayList<>(); - Column[] indexableColumns = getIndexableColumns(tableDesc); - if (indexableColumns != null && indexableColumns.length == 1) { - // Currently supports only single index column. - List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(filterCondition, indexableColumns); - for (Set<EvalNode> eachEvalSet: indexablePredicateList) { - Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet); - if (indexPredicationValues != null) { - IndexPredication indexPredication = new IndexPredication(); - indexPredication.setColumn(indexableColumns[0]); - indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName())); - indexPredication.setStartValue(indexPredicationValues.getFirst()); - indexPredication.setStopValue(indexPredicationValues.getSecond()); - - indexPredications.add(indexPredication); - } - } + final Column[] indexableColumns = getIndexableColumns(tableDesc); + if (indexableColumns == null || indexableColumns.length == 0) { + return Collections.EMPTY_LIST; } - return indexPredications; + + // Currently supports only single index column. + return findIndexablePredicateSet(filterCondition, indexableColumns).stream() + .map(set -> getIndexablePredicateValue(columnMapping, set)) + .filter(value -> value != null) + .map(value -> + new IndexPredication( + indexableColumns[0], + tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()), + value.getFirst(), + value.getSecond())) + .collect(Collectors.toList()); } public List<Set<EvalNode>> findIndexablePredicateSet(@Nullable EvalNode qual, http://git-wip-us.apache.org/repos/asf/tajo/blob/2aefa0da/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java index 3a58e50..6241a94 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java @@ -22,40 +22,31 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.datum.Datum; public class IndexPredication { - private Column column; - private int columnId; - private Datum startValue; - private Datum stopValue; + final private Column column; + final private int columnId; + final private Datum startValue; + final private Datum stopValue; + + public IndexPredication(Column c, int columnId, Datum startValue, Datum stopValue) { + this.column = c; + this.columnId = columnId; + this.startValue = startValue; + this.stopValue = stopValue; + } public Column getColumn() { return column; } - public void setColumn(Column column) { - this.column = column; - } - public int getColumnId() { return columnId; } - public void setColumnId(int columnId) { - this.columnId = columnId; - } - public Datum getStartValue() { return startValue; } - public void setStartValue(Datum startValue) { - this.startValue = startValue; - } - public Datum getStopValue() { return stopValue; } - - public void setStopValue(Datum stopValue) { - this.stopValue = stopValue; - } }
