http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java index 986e45e..6b3a82c 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java @@ -22,12 +22,11 @@ import java.util.BitSet; import java.util.Map; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; @@ -42,11 +41,14 @@ public final class DefaultTupleConverter implements TupleConverter { private final CubeSegment segment; private final int measureCount; private final Map<TblColRef, Integer> columnLengthMap; + private RowKeyEncoderProvider rowKeyEncoderProvider; + private byte[] rowKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) { this.segment = segment; this.measureCount = segment.getCubeDesc().getMeasures().size(); this.columnLengthMap = columnLengthMap; + this.rowKeyEncoderProvider = new RowKeyEncoderProvider(this.segment); } private ByteBuffer getValueBuf() { @@ -65,11 +67,8 @@ public final class DefaultTupleConverter implements TupleConverter { @Override public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) { - int bytesLength = RowConstants.ROWKEY_HEADER_LEN; Cuboid cuboid = Cuboid.findById(segment.getCubeDesc(), cuboidId); - for (TblColRef column : cuboid.getColumns()) { - bytesLength += columnLengthMap.get(column); - } + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); final int dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality(); int[] measureColumnsIndex = getMeasureColumnsIndex(); @@ -77,22 +76,15 @@ public final class DefaultTupleConverter implements TupleConverter { measureColumnsIndex[i] = dimensions + i; } - byte[] key = new byte[bytesLength]; - System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN); - int header = RowConstants.ROWKEY_HEADER_LEN; - int offSet = header; + int offSet = 0; for (int x = 0; x < dimensions; x++) { final ByteArray byteArray = record.get(x); - System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length()); + System.arraycopy(byteArray.array(), byteArray.offset(), rowKeyBodyBuf, offSet, byteArray.length()); offSet += byteArray.length(); } - //fill shard - short cuboidShardNum = segment.getCuboidShardNum(cuboidId); - short shardOffset = ShardingHash.getShard(key, header, offSet - header, cuboidShardNum); - short cuboidShardBase = segment.getCuboidBaseShard(cuboidId); - short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, segment.getTotalShards()); - BytesUtil.writeShort(finalShard, key, 0, RowConstants.ROWKEY_SHARDID_LEN); + byte[] rowKey = rowkeyEncoder.createBuf(); + rowkeyEncoder.encode(new ByteArray(rowKeyBodyBuf), new ByteArray(rowKey)); ByteBuffer valueBuf = getValueBuf(); valueBuf.clear(); @@ -100,6 +92,6 @@ public final class DefaultTupleConverter implements TupleConverter { byte[] value = new byte[valueBuf.position()]; System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position()); - return new Tuple2<>(key, value); + return new Tuple2<>(rowKey, value); } }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/sample_cube/metadata/cube_desc/kylin_sales_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/sample_cube/metadata/cube_desc/kylin_sales_cube_desc.json b/examples/sample_cube/metadata/cube_desc/kylin_sales_cube_desc.json index 26932b2..9320aaf 100644 --- a/examples/sample_cube/metadata/cube_desc/kylin_sales_cube_desc.json +++ b/examples/sample_cube/metadata/cube_desc/kylin_sales_cube_desc.json @@ -1,166 +1,225 @@ { - "uuid" : "9ac9b7a8-3929-4dff-b59d-2100aadc8dbf", - "name" : "kylin_sales_cube_desc", - "description" : null, - "engine_type": 2, - "dimensions" : [ { - "id" : 0, - "name" : "CAL_DT", - "table" : "DEFAULT.KYLIN_CAL_DT", - "column" : null, - "derived" : [ "WEEK_BEG_DT" ], - "hierarchy" : false - }, { - "id" : 1, - "name" : "CATEGORY", - "table" : "DEFAULT.KYLIN_CATEGORY_GROUPINGS", - "column" : null, - "derived" : [ "USER_DEFINED_FIELD1", "USER_DEFINED_FIELD3", "UPD_DATE", "UPD_USER" ], - "hierarchy" : false - }, { - "id" : 2, - "name" : "CATEGORY_HIERARCHY", - "table" : "DEFAULT.KYLIN_CATEGORY_GROUPINGS", - "column" : [ "META_CATEG_NAME", "CATEG_LVL2_NAME", "CATEG_LVL3_NAME" ], - "derived" : null, - "hierarchy" : true - }, { - "id" : 3, - "name" : "LSTG_FORMAT_NAME", - "table" : "DEFAULT.KYLIN_SALES", - "column" : [ "LSTG_FORMAT_NAME" ], - "derived" : null, - "hierarchy" : false - } ], - "measures" : [ { - "id" : 1, - "name" : "GMV_SUM", - "function" : { - "expression" : "SUM", - "parameter" : { - "type" : "column", - "value" : "PRICE" - }, - "returntype" : "decimal(19,4)" + "uuid": "9ac9b7a8-3929-4dff-b59d-2100aadc8dbf", + "name": "kylin_sales_cube_desc", + "description": null, + "dimensions": [ + { + "id": 0, + "name": "CAL_DT", + "table": "DEFAULT.KYLIN_CAL_DT", + "column": null, + "derived": [ + "WEEK_BEG_DT" + ], + "hierarchy": false + }, + { + "id": 1, + "name": "CATEGORY", + "table": "DEFAULT.KYLIN_CATEGORY_GROUPINGS", + "column": null, + "derived": [ + "USER_DEFINED_FIELD1", + "USER_DEFINED_FIELD3", + "UPD_DATE", + "UPD_USER" + ], + "hierarchy": false + }, + { + "id": 2, + "name": "CATEGORY_HIERARCHY", + "table": "DEFAULT.KYLIN_CATEGORY_GROUPINGS", + "column": [ + "META_CATEG_NAME", + "CATEG_LVL2_NAME", + "CATEG_LVL3_NAME" + ], + "derived": null, + "hierarchy": true }, - "dependent_measure_ref" : null - }, { - "id" : 2, - "name" : "GMV_MIN", - "function" : { - "expression" : "MIN", - "parameter" : { - "type" : "column", - "value" : "PRICE" + { + "id": 3, + "name": "LSTG_FORMAT_NAME", + "table": "DEFAULT.KYLIN_SALES", + "column": [ + "LSTG_FORMAT_NAME" + ], + "derived": null, + "hierarchy": false + } + ], + "measures": [ + { + "id": 1, + "name": "GMV_SUM", + "function": { + "expression": "SUM", + "parameter": { + "type": "column", + "value": "PRICE" + }, + "returntype": "decimal(19,4)" }, - "returntype" : "decimal(19,4)" + "dependent_measure_ref": null }, - "dependent_measure_ref" : null - }, { - "id" : 3, - "name" : "GMV_MAX", - "function" : { - "expression" : "MAX", - "parameter" : { - "type" : "column", - "value" : "PRICE" + { + "id": 2, + "name": "GMV_MIN", + "function": { + "expression": "MIN", + "parameter": { + "type": "column", + "value": "PRICE" + }, + "returntype": "decimal(19,4)" }, - "returntype" : "decimal(19,4)" + "dependent_measure_ref": null }, - "dependent_measure_ref" : null - }, { - "id" : 4, - "name" : "TRANS_CNT", - "function" : { - "expression" : "COUNT", - "parameter" : { - "type" : "constant", - "value" : "1" + { + "id": 3, + "name": "GMV_MAX", + "function": { + "expression": "MAX", + "parameter": { + "type": "column", + "value": "PRICE" + }, + "returntype": "decimal(19,4)" }, - "returntype" : "bigint" + "dependent_measure_ref": null }, - "dependent_measure_ref" : null - }, { - "id" : 5, - "name" : "SELLER_CNT_HLL", - "function" : { - "expression" : "COUNT_DISTINCT", - "parameter" : { - "type" : "column", - "value" : "SELLER_ID" + { + "id": 4, + "name": "TRANS_CNT", + "function": { + "expression": "COUNT", + "parameter": { + "type": "constant", + "value": "1" + }, + "returntype": "bigint" }, - "returntype" : "hllc(10)" + "dependent_measure_ref": null }, - "dependent_measure_ref" : null - }, { - "id" : 6, - "name" : "SELLER_FORMAT_CNT", - "function" : { - "expression" : "COUNT_DISTINCT", - "parameter" : { - "type" : "column", - "value" : "LSTG_FORMAT_NAME" + { + "id": 5, + "name": "SELLER_CNT_HLL", + "function": { + "expression": "COUNT_DISTINCT", + "parameter": { + "type": "column", + "value": "SELLER_ID" + }, + "returntype": "hllc(10)" }, - "returntype" : "hllc(10)" + "dependent_measure_ref": null }, - "dependent_measure_ref" : null - } ], - "rowkey" : { - "rowkey_columns" : [ { - "column" : "part_dt", - "length" : 0, - "dictionary" : "true", - "mandatory" : false - }, { - "column" : "leaf_categ_id", - "length" : 0, - "dictionary" : "true", - "mandatory" : false - }, { - "column" : "meta_categ_name", - "length" : 0, - "dictionary" : "true", - "mandatory" : false - }, { - "column" : "categ_lvl2_name", - "length" : 0, - "dictionary" : "true", - "mandatory" : false - }, { - "column" : "categ_lvl3_name", - "length" : 0, - "dictionary" : "true", - "mandatory" : false - }, { - "column" : "lstg_format_name", - "length" : 12, - "dictionary" : null, - "mandatory" : false - }, { - "column" : "lstg_site_id", - "length" : 0, - "dictionary" : "true", - "mandatory" : false - } ], - "aggregation_groups" : [ [ "part_dt", "lstg_site_id", "leaf_categ_id", "meta_categ_name", "categ_lvl3_name", "categ_lvl2_name", "lstg_format_name" ] ] + { + "id": 6, + "name": "SELLER_FORMAT_CNT", + "function": { + "expression": "COUNT_DISTINCT", + "parameter": { + "type": "column", + "value": "LSTG_FORMAT_NAME" + }, + "returntype": "hllc(10)" + }, + "dependent_measure_ref": null + } + ], + "rowkey": { + "rowkey_columns": [ + { + "column": "part_dt", + "length": 0, + "dictionary": "true", + "mandatory": false + }, + { + "column": "leaf_categ_id", + "length": 0, + "dictionary": "true", + "mandatory": false + }, + { + "column": "meta_categ_name", + "length": 0, + "dictionary": "true", + "mandatory": false + }, + { + "column": "categ_lvl2_name", + "length": 0, + "dictionary": "true", + "mandatory": false + }, + { + "column": "categ_lvl3_name", + "length": 0, + "dictionary": "true", + "mandatory": false + }, + { + "column": "lstg_format_name", + "length": 12, + "dictionary": null, + "mandatory": false + }, + { + "column": "lstg_site_id", + "length": 0, + "dictionary": "true", + "mandatory": false + } + ], + "aggregation_groups": [ + [ + "part_dt", + "lstg_site_id", + "leaf_categ_id", + "meta_categ_name", + "categ_lvl3_name", + "categ_lvl2_name", + "lstg_format_name" + ] + ] }, - "last_modified" : 1426255280419, - "model_name" : "kylin_sales_model", - "null_string" : null, - "hbase_mapping" : { - "column_family" : [ { - "name" : "f1", - "columns" : [ { - "qualifier" : "m", - "measure_refs" : [ "gmv_sum", "gmv_min", "gmv_max", "trans_cnt" ] - } ] - }, { - "name" : "f2", - "columns" : [ { - "qualifier" : "m", - "measure_refs" : [ "seller_cnt_hll", "seller_format_cnt" ] - } ] - } ] + "last_modified": 1426255280419, + "model_name": "kylin_sales_model", + "null_string": null, + "hbase_mapping": { + "column_family": [ + { + "name": "f1", + "columns": [ + { + "qualifier": "m", + "measure_refs": [ + "gmv_sum", + "gmv_min", + "gmv_max", + "trans_cnt" + ] + } + ] + }, + { + "name": "f2", + "columns": [ + { + "qualifier": "m", + "measure_refs": [ + "seller_cnt_hll", + "seller_format_cnt" + ] + } + ] + } + ] }, - "notify_list" : null + "notify_list": null, + "engine_type": 2, + "storage_type": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json index 17a3fdc..84cdaf4 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json @@ -2,7 +2,6 @@ "uuid": "4334a905-1fc6-4f67-985c-38fa5aeafd92", "name": "test_kylin_cube_topn_desc", "description": null, - "engine_type": 2, "dimensions": [ { "id": 0, @@ -143,5 +142,7 @@ } ] }, - "notify_list": null + "notify_list": null, + "engine_type": 2, + "storage_type": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json index 893ebcd..f7e700d 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json @@ -2,7 +2,6 @@ "uuid": "5445a905-1fc6-4f67-985c-38fa5aeafd92", "name": "test_kylin_cube_topn_left_join_desc", "description": null, - "engine_type": 2, "dimensions": [ { "id": 0, @@ -130,7 +129,8 @@ ] } ] - }, { + }, + { "name": "f2", "columns": [ { @@ -143,5 +143,7 @@ } ] }, - "notify_list": null + "notify_list": null, + "engine_type": 2, + "storage_type": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json index 0b6c31a..0b99047 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json @@ -2,7 +2,6 @@ "uuid": "a24ca905-1fc6-4f67-985c-38fa5aeafd92", "name": "test_kylin_cube_with_slr_desc", "description": null, - "engine_type": 2, "dimensions": [ { "id": 0, @@ -237,5 +236,7 @@ } ] }, - "notify_list": null + "notify_list": null, + "engine_type": 2, + "storage_type": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json index 1bd1ec5..8e22615 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json @@ -2,7 +2,6 @@ "uuid": "bbbba905-1fc6-4f67-985c-38fa5aeafd92", "name": "test_kylin_cube_with_slr_left_join_desc", "description": null, - "engine_type": 2, "dimensions": [ { "id": 0, @@ -237,5 +236,7 @@ } ] }, - "notify_list": null + "notify_list": null, + "engine_type": 2, + "storage_type": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json index c54b205..bd979e0 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json @@ -2,7 +2,6 @@ "uuid": "9ac9b7a8-3929-4dff-b59d-2100aadc8dbf", "name": "test_kylin_cube_without_slr_desc", "description": null, - "engine_type": 2, "dimensions": [ { "id": 0, @@ -289,5 +288,7 @@ } ] }, - "notify_list": null + "notify_list": null, + "engine_type": 2, + "storage_type": 0 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json index b4428cc..08a132e 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json @@ -2,7 +2,6 @@ "uuid": "9ac9b7a8-3929-4dff-b59d-2100aadc8dbf", "name": "test_kylin_cube_without_slr_left_join_desc", "description": null, - "engine_type": 2, "dimensions": [ { "id": 0, @@ -289,5 +288,7 @@ } ] }, - "notify_list": null + "notify_list": null, + "engine_type": 0, + "storage_type": 0 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json index ebf656a..c9a6536 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json @@ -2,7 +2,6 @@ "uuid": "901ed15e-7769-4c66-b7ae-fbdc971cd192", "name": "test_streaming_table_cube_desc", "description": "", - "engine_type": 2, "dimensions": [ { "id": 1, @@ -140,5 +139,7 @@ } ] }, - "notify_list": [] + "notify_list": [], + "engine_type": 2, + "storage_type": 2 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java index 9b839c3..c37b2f4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java @@ -41,9 +41,8 @@ public class CoprocessorProjector { RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) { @Override - protected int fillHeader(byte[] bytes) { - Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff); - return this.headerLength; + protected void fillHeader(byte[] bytes) { + Arrays.fill(bytes, 0, this.getHeaderLength(), (byte) 0xff); } @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java index 7ec97c0..35488d1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java @@ -26,7 +26,6 @@ import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.kv.RowKeyColumnIO; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.metadata.model.ColumnDesc; @@ -47,7 +46,9 @@ public class CoprocessorRowType { for (int i = 0; i < cols.size(); i++) { colSizes[i] = tableRecordInfo.getDigest().length(i); } - return new CoprocessorRowType(cols.toArray(new TblColRef[cols.size()]), colSizes); + + //TODO:check0 + return new CoprocessorRowType(cols.toArray(new TblColRef[cols.size()]), colSizes, 0); } //for observer @@ -59,7 +60,7 @@ public class CoprocessorRowType { for (int i = 0; i < cols.length; i++) { colSizes[i] = colIO.getColumnLength(cols[i]); } - return new CoprocessorRowType(cols, colSizes); + return new CoprocessorRowType(cols, colSizes, seg.getRowKeyPreambleSize()); } public static byte[] serialize(CoprocessorRowType o) { @@ -82,6 +83,7 @@ public class CoprocessorRowType { public void serialize(CoprocessorRowType o, ByteBuffer out) { int n = o.columns.length; BytesUtil.writeVInt(o.columns.length, out); + BytesUtil.writeVInt(o.bodyOffset, out); for (int i = 0; i < n; i++) { BytesUtil.writeAsciiString(o.columns[i].getTable(), out); BytesUtil.writeAsciiString(o.columns[i].getName(), out); @@ -92,6 +94,7 @@ public class CoprocessorRowType { @Override public CoprocessorRowType deserialize(ByteBuffer in) { int n = BytesUtil.readVInt(in); + int bodyOffset = BytesUtil.readVInt(in); TblColRef[] cols = new TblColRef[n]; int[] colSizes = new int[n]; for (int i = 0; i < n; i++) { @@ -108,18 +111,20 @@ public class CoprocessorRowType { int colSize = BytesUtil.readVInt(in); colSizes[i] = colSize; } - return new CoprocessorRowType(cols, colSizes); + return new CoprocessorRowType(cols, colSizes, bodyOffset); } } // ============================================================================ public TblColRef[] columns; + private int bodyOffset; public int[] columnSizes; public int[] columnOffsets; public HashMap<TblColRef, Integer> columnIdxMap; - public CoprocessorRowType(TblColRef[] columns, int[] columnSizes) { + public CoprocessorRowType(TblColRef[] columns, int[] columnSizes, int bodyOffset) { + this.bodyOffset = bodyOffset; this.columns = columns; this.columnSizes = columnSizes; init(); @@ -131,7 +136,7 @@ public class CoprocessorRowType { private void init() { int[] offsets = new int[columns.length]; - int o = RowConstants.ROWKEY_HEADER_LEN; + int o = bodyOffset; for (int i = 0; i < columns.length; i++) { offsets[i] = o; o += columnSizes[i]; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index 034ffac..22f7017 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -254,7 +254,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator { flushScanCountDelta(); if (logger.isDebugEnabled() && scan != null) { - logger.debug("Scan " + scan.toString()); byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); if (metricsBytes != null) { ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index c62308e..f84e4e6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -33,9 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; @@ -75,6 +72,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; @SuppressWarnings("unused") public class CubeStorageQuery implements ICachableStorageQuery { @@ -482,7 +482,10 @@ public class CubeStorageQuery implements ICachableStorageQuery { dropUnhitSegments(result); logger.info("hbasekeyrange count after dropping unhit :" + result.size()); - result = duplicateRangeByShard(result); + //TODO: should use LazyRowKeyEncoder.getRowKeysDifferentShards like CubeHBaseRPC, not do so because v1 query engine is retiring. not worth changing it + if (cubeDesc.isEnableSharding()) { + result = duplicateRangeByShard(result); + } logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size()); return result; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 6ad30e2..b606d2e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -25,9 +25,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; @@ -163,12 +166,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final AtomicInteger totalScannedCount = new AtomicInteger(0); final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior(); logger.info("The execution of this query will use " + toggle + " as endpoint's behavior"); + List<Future<?>> futures = Lists.newArrayList(); for (int i = 0; i < rawScans.size(); ++i) { final int shardIndex = i; final RawScan rawScan = rawScans.get(i); - executorService.submit(new Runnable() { + Future<?> future = executorService.submit(new Runnable() { @Override public void run() { final byte[] rawScanBytes = KryoUtils.serialize(rawScan); @@ -177,7 +181,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { for (IntList intList : hbaseColumnsToGTIntList) { builder.addHbaseColumnsToGT(intList); } - + builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize()); builder.setBehavior(toggle); Collection<CubeVisitProtos.CubeVisitResponse> results; @@ -211,14 +215,19 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { rowBlocks.addAll(part); } }); + futures.add(future); } executorService.shutdown(); try { - if (!executorService.awaitTermination(1, TimeUnit.HOURS)) { - throw new RuntimeException("Visiting cube by endpoint timeout"); + for (Future<?> future : futures) { + future.get(1, TimeUnit.HOURS); } } catch (InterruptedException e) { throw new RuntimeException("Visiting cube by endpoint gets interrupted"); + } catch (ExecutionException e) { + throw new RuntimeException("Visiting cube throw exception", e); + } catch (TimeoutException e) { + throw new RuntimeException("Visiting cube by endpoint timeout"); } return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns(), totalScannedCount.get()); @@ -227,10 +236,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) { StringBuilder sb = new StringBuilder(); Stats stats = result.getStats(); - sb.append("Shard " + shardIndex + ": "); + sb.append("Shard " + shardIndex + " on host: " + stats.getHostname()); sb.append("Total scanned row: " + stats.getScannedRowCount() + ". "); sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". "); sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). "); + sb.append("Server CPU usage: " + stats.getSystemCpuLoad() + ", server physical mem left: " + stats.getFreePhysicalMemorySize() + ", server swap mem left:" + stats.getFreeSwapSpaceSize()); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 1d217ac..412e7602 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -5,20 +5,22 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.hadoop.hbase.Cell; +import javax.annotation.Nullable; + import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; -import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.FuzzyKeyEncoder; +import org.apache.kylin.cube.kv.FuzzyMaskEncoder; +import org.apache.kylin.cube.kv.LazyRowKeyEncoder; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; import org.apache.kylin.cube.model.HBaseMappingDesc; @@ -29,6 +31,7 @@ import org.apache.kylin.gridtable.IGTScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -41,11 +44,16 @@ public abstract class CubeHBaseRPC { final protected CubeSegment cubeSeg; final protected Cuboid cuboid; final protected GTInfo fullGTInfo; + final private RowKeyEncoder fuzzyKeyEncoder; + final private RowKeyEncoder fuzzyMaskEncoder; public CubeHBaseRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) { this.cubeSeg = cubeSeg; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; + + this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid); + this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); } abstract IGTScanner getGTScanner(GTScanRequest scanRequest) throws IOException; @@ -76,23 +84,34 @@ public abstract class CubeHBaseRPC { final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); List<RawScan> ret = Lists.newArrayList(); - byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE); - byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE); + LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid); + byte[] start = encoder.createBuf(); + byte[] end = encoder.createBuf(); + List<byte[]> startKeys; + List<byte[]> endKeys; + + encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE); + encoder.encode(pkStart, pkStart.getInfo().getPrimaryKey(), start); + startKeys = encoder.getRowKeysDifferentShards(start); + + encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE); + encoder.encode(pkEnd, pkEnd.getInfo().getPrimaryKey(), end); + endKeys = encoder.getRowKeysDifferentShards(end); + endKeys = Lists.transform(endKeys, new Function<byte[], byte[]>() { + @Nullable + @Override + public byte[] apply(byte[] input) { + byte[] shardEnd = new byte[input.length + 1];//append extra 0 to the end key to make it inclusive while scanning + System.arraycopy(input, 0, shardEnd, 0, input.length); + return shardEnd; + } + }); + + Preconditions.checkState(startKeys.size() == endKeys.size()); List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); - short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); - - for (short i = 0; i < cuboidShardNum; ++i) { - short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); - - byte[] shardStart = Arrays.copyOf(start, start.length); - byte[] shardEnd = new byte[end.length + 1];//append extra 0 to the end key to make it inclusive while scanning - System.arraycopy(end, 0, shardEnd, 0, end.length); - - BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN); - BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN); - - ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys)); + for (short i = 0; i < startKeys.size(); ++i) { + ret.add(new RawScan(startKeys.get(i), endKeys.get(i), selectedColumns, hbaseFuzzyKeys)); } return ret; @@ -108,30 +127,12 @@ public abstract class CubeHBaseRPC { } List<Pair<byte[], byte[]>> ret = Lists.newArrayList(); - int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey()); for (GTRecord gtRecordFuzzyKey : fuzzyKeys) { - byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN]; - byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN]; - - int pos = 0; - //shard part - Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed - pos += RowConstants.ROWKEY_SHARDID_LEN; - - //cuboid part - Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO); - System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN); - pos += RowConstants.ROWKEY_CUBOIDID_LEN; + byte[] hbaseFuzzyKey = fuzzyKeyEncoder.createBuf(); + byte[] hbaseFuzzyMask = fuzzyMaskEncoder.createBuf(); - //row key core part - ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO); - System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length()); - ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey); - System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length()); - - Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length"); - pos += coreKey.length(); - Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated"); + fuzzyKeyEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyKey); + fuzzyMaskEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyMask); ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask)); } @@ -139,21 +140,6 @@ public abstract class CubeHBaseRPC { return ret; } - private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) { - ByteArray pk = HBaseScan.exportScanKey(pkRec, fill); - - byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN]; - Arrays.fill(buf, fill); - - //for scanning/reading, later all possible shard will be applied - - System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN); - if (pk != null && pk.array() != null) { - System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_HEADER_LEN, pk.length()); - } - return buf; - } - /** * prune untouched hbase columns */ @@ -206,8 +192,6 @@ public abstract class CubeHBaseRPC { return ret; } - - public static void applyHBaseColums(Scan scan, List<Pair<byte[], byte[]>> hbaseColumns) { for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) { byte[] byteFamily = hbaseColumn.getFirst(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index fa5a844..69b95ca 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -118,7 +118,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT,cubeSeg.getRowKeyPreambleSize()); IGTScanner rawScanner = store.scan(scanRequest); final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java index 7731f19..303c360 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java @@ -43,13 +43,14 @@ public class HBaseReadonlyStore implements IGTStore { private GTInfo info; private List<Pair<byte[], byte[]>> hbaseColumns; private List<List<Integer>> hbaseColumnsToGT; + private int rowkeyPreambleSize; - public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT) { + public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize) { this.cellListIterator = cellListIterator; - this.info = gtScanRequest.getInfo(); this.hbaseColumns = hbaseColumns; this.hbaseColumnsToGT = hbaseColumnsToGT; + this.rowkeyPreambleSize = rowkeyPreambleSize; } @Override @@ -108,7 +109,7 @@ public class HBaseReadonlyStore implements IGTStore { // dimensions, set to primary key, also the 0th column block Cell firstCell = oneRow.get(0); - ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN); + ByteBuffer buf = byteBuffer(firstCell.getRowArray(), rowkeyPreambleSize + firstCell.getRowOffset(), firstCell.getRowLength() - rowkeyPreambleSize); oneRecord.loadCellBlock(0, buf); // metrics http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java deleted file mode 100644 index 65a963d..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v2; - -import java.util.Arrays; - -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; - -import com.google.common.base.Preconditions; - -public class HBaseScan { - - /** - * every column in scan key is fixed length. for empty values, 0 zero will be populated - */ - public static ByteArray exportScanKey(GTRecord rec, byte fill) { - - Preconditions.checkNotNull(rec); - - GTInfo info = rec.getInfo(); - int len = info.getMaxColumnLength(info.getPrimaryKey()); - ByteArray buf = ByteArray.allocate(len); - int pos = 0; - for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) { - int c = info.getPrimaryKey().trueBitAt(i); - int colLength = info.getCodeSystem().maxCodeLength(c); - - if (rec.get(c).array() != null) { - Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " != cols[c].length: " + rec.get(c).length() + ", c is " + c); - System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length()); - } else { - Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill); - } - pos += colLength; - } - buf.setLength(pos); - - return buf; - } - - /** - * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated - */ - public static ByteArray exportScanMask(GTRecord rec) { - Preconditions.checkNotNull(rec); - - GTInfo info = rec.getInfo(); - int len = info.getMaxColumnLength(info.getPrimaryKey()); - ByteArray buf = ByteArray.allocate(len); - byte fill; - - int pos = 0; - for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) { - int c = info.getPrimaryKey().trueBitAt(i); - int colLength = info.getCodeSystem().maxCodeLength(c); - - if (rec.get(c).array() != null) { - fill = RowConstants.BYTE_ZERO; - } else { - fill = RowConstants.BYTE_ONE; - } - Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill); - pos += colLength; - } - buf.setLength(pos); - - return buf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fce575bc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index f474139..3759738 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -20,6 +20,8 @@ package org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; @@ -56,6 +58,7 @@ import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; +import com.sun.management.OperatingSystemMXBean; @SuppressWarnings("unused") //used in hbase endpoint @@ -144,7 +147,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement innerScanner = region.getScanner(scan); InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); IGTScanner rawScanner = store.scan(scanReq); CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); @@ -165,6 +168,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining()); finalRowCount++; } + + OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); + double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize(); + double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize(); + //outputStream.close() is not necessary byte[] allRows = outputStream.toByteArray(); CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); @@ -174,7 +183,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).// setScannedRowCount(finalScanner.getScannedRowCount()).// setServiceStartTime(serviceStartTime).// - setServiceEndTime(System.currentTimeMillis()).build()).// + setServiceEndTime(System.currentTimeMillis()).// + setSystemCpuLoad(systemCpuLoad).// + setFreePhysicalMemorySize(freePhysicalMemorySize).// + setFreeSwapSpaceSize(freeSwapSpaceSize).// + setHostname(InetAddress.getLocalHost().getHostName()).// + build()).// build()); } catch (IOException ioe) {