This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d3adc52e5e6ecf3518d0445e9180557e70b2731f Author: Zhong <nju_y...@apache.org> AuthorDate: Sun May 13 15:45:07 2018 +0800 KYLIN-3362 support dynamic dimension push down --- .../kylin/cube/gridtable/CubeCodeSystem.java | 13 +++-- .../apache/kylin/cube/gridtable/CubeGridTable.java | 4 +- .../gridtable/CuboidToGridTableMappingExt.java | 21 +++++++- .../java/org/apache/kylin/gridtable/GTInfo.java | 17 ++++++ .../metadata/datatype/DynamicDimSerializer.java | 63 ++++++++++++++++++++++ .../kylin/metadata/realization/SQLDigest.java | 9 +++- .../storage/gtrecord/CubeScanRangePlanner.java | 12 ++++- .../kylin/storage/gtrecord/CubeSegmentScanner.java | 17 +++--- .../storage/gtrecord/GTCubeStorageQueryBase.java | 33 ++++++++---- .../gtrecord/GTCubeStorageQueryRequest.java | 16 +++++- .../gtrecord/SequentialCubeTupleIterator.java | 11 ++-- .../apache/kylin/storage/hbase/ITStorageTest.java | 2 + .../kylin/query/relnode/OLAPAggregateRel.java | 37 ++++++++++--- .../apache/kylin/query/relnode/OLAPContext.java | 11 +++- 14 files changed, 227 insertions(+), 39 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index 9eae6f3..3577476 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -34,6 +34,7 @@ import org.apache.kylin.gridtable.IGTCodeSystem; import org.apache.kylin.gridtable.IGTComparator; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.datatype.DynamicDimSerializer; /** * defines how column values will be encoded to/ decoded from GTRecord @@ -68,7 +69,7 @@ public class CubeCodeSystem implements IGTCodeSystem { @Override public void init(GTInfo info) { this.info = info; - + ImmutableBitSet dDims = info.getDynamicDims(); this.serializers = new DataTypeSerializer[info.getColumnCount()]; for (int i = 0; i < serializers.length; i++) { DimensionEncoding dimEnc = i < dimEncs.length ? dimEncs[i] : null; @@ -77,8 +78,14 @@ public class CubeCodeSystem implements IGTCodeSystem { // for dimensions serializers[i] = dimEnc.asDataTypeSerializer(); } else { - // for measures - serializers[i] = DataTypeSerializer.create(info.getColumnType(i)); + DataTypeSerializer dSerializer = DataTypeSerializer.create(info.getColumnType(i)); + if (dDims != null && dDims.get(i)) { + // for dynamic dimensions + dSerializer = new DynamicDimSerializer(dSerializer); + } else { + // for measures + } + serializers[i] = dSerializer; } } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java index 79732e8..2a819bc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java @@ -44,7 +44,9 @@ public class CubeGridTable { builder.setColumns(mapping.getDataTypes()); builder.setPrimaryKey(mapping.getPrimaryKey()); builder.enableColumnBlock(mapping.getColumnBlocks()); - + if (mapping instanceof CuboidToGridTableMappingExt) { + builder.enableDynamicDims(((CuboidToGridTableMappingExt) mapping).getDynamicDims()); + } return builder.build(); } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java index fbdd07e..32c4ca0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMappingExt.java @@ -35,8 +35,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping { + private final List<TblColRef> dynDims; private final List<DynamicFunctionDesc> dynFuncs; + private ImmutableBitSet dynamicDims; + private List<DataType> dynGtDataTypes; private List<ImmutableBitSet> dynGtColBlocks; @@ -44,8 +47,9 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping { private Map<FunctionDesc, Integer> dynMetrics2gt; - public CuboidToGridTableMappingExt(Cuboid cuboid, List<DynamicFunctionDesc> dynFuncs) { + public CuboidToGridTableMappingExt(Cuboid cuboid, List<TblColRef> dynDims, List<DynamicFunctionDesc> dynFuncs) { super(cuboid); + this.dynDims = dynDims; this.dynFuncs = dynFuncs; init(); } @@ -59,6 +63,15 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping { int gtColIdx = super.getColumnCount(); BitSet rtColBlock = new BitSet(); + // dynamic dimensions + for (TblColRef rtDim : dynDims) { + dynDim2gt.put(rtDim, gtColIdx); + dynGtDataTypes.add(rtDim.getType()); + rtColBlock.set(gtColIdx); + gtColIdx++; + } + dynamicDims = new ImmutableBitSet(rtColBlock); + // dynamic metrics for (DynamicFunctionDesc rtFunc : dynFuncs) { dynMetrics2gt.put(rtFunc, gtColIdx); @@ -70,9 +83,13 @@ public class CuboidToGridTableMappingExt extends CuboidToGridTableMapping { dynGtColBlocks.add(new ImmutableBitSet(rtColBlock)); } + public ImmutableBitSet getDynamicDims() { + return dynamicDims; + } + @Override public int getColumnCount() { - return super.getColumnCount() + dynMetrics2gt.size(); + return super.getColumnCount() + dynDims.size() + dynFuncs.size(); } @Override diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java index d10d6e7..739adf8 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java @@ -57,6 +57,9 @@ public class GTInfo { int rowBlockSize; // 0: disable row block ImmutableBitSet colBlocksAll; + // not included during serialization, only used for loadColumns + ImmutableBitSet dynamicDims; + // must create from builder private GTInfo() { } @@ -93,6 +96,10 @@ public class GTInfo { return colAll; } + public ImmutableBitSet getDynamicDims() { + return dynamicDims; + } + public boolean isRowBlockEnabled() { return rowBlockSize > 0; } @@ -214,6 +221,10 @@ public class GTInfo { it.remove(); } colBlocks = list.toArray(new ImmutableBitSet[list.size()]); + + // for dynamic dimensions + if (dynamicDims == null) + dynamicDims = ImmutableBitSet.EMPTY; } public static class Builder { @@ -269,6 +280,12 @@ public class GTInfo { return this; } + /** optional */ + public Builder enableDynamicDims(ImmutableBitSet dynamicDims) { + info.dynamicDims = dynamicDims; + return this; + } + public GTInfo build() { info.validate(); return info; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java new file mode 100644 index 0000000..a1c42a8 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DynamicDimSerializer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.datatype; + +import java.nio.ByteBuffer; + +/** + * For dynamic dimensions, the code length must be fixed + */ +public class DynamicDimSerializer<T> extends DataTypeSerializer<T> { + + private final DataTypeSerializer<T> dimDataTypeSerializer; + + public DynamicDimSerializer(DataTypeSerializer<T> dimDataTypeSerializer) { + this.dimDataTypeSerializer = dimDataTypeSerializer; + } + + public void serialize(T value, ByteBuffer out) { + dimDataTypeSerializer.serialize(value, out); + } + + public T deserialize(ByteBuffer in) { + return dimDataTypeSerializer.deserialize(in); + } + + public int peekLength(ByteBuffer in) { + return maxLength(); + } + + public int maxLength() { + return dimDataTypeSerializer.maxLength(); + } + + public int getStorageBytesEstimate() { + return dimDataTypeSerializer.getStorageBytesEstimate(); + } + + /** An optional convenient method that converts a string to this data type (for dimensions) */ + public T valueOf(String str) { + return dimDataTypeSerializer.valueOf(str); + } + + /** Convert from obj to string */ + public String toString(T value) { + return dimDataTypeSerializer.toString(value); + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java index 45ba95a..0b23e48 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java @@ -19,8 +19,10 @@ package org.apache.kylin.metadata.realization; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.DynamicFunctionDesc; import org.apache.kylin.metadata.model.FunctionDesc; @@ -57,6 +59,8 @@ public class SQLDigest { public List<TblColRef> groupbyColumns; public Set<TblColRef> subqueryJoinParticipants; + public Map<TblColRef, TupleExpression> dynGroupbyColumns; + // aggregation public Set<TblColRef> metricColumns; public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply @@ -80,7 +84,8 @@ public class SQLDigest { public Set<MeasureDesc> involvedMeasure; public SQLDigest(String factTable, Set<TblColRef> allColumns, List<JoinDesc> joinDescs, // model - List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants, // group by + List<TblColRef> groupbyColumns, Set<TblColRef> subqueryJoinParticipants, + Map<TblColRef, TupleExpression> dynGroupByColumns, // group by Set<TblColRef> metricColumns, List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, // aggregation List<DynamicFunctionDesc> dynAggregations, // Set<TblColRef> rtDimensionColumns, Set<TblColRef> rtMetricColumns, // dynamic col related columns @@ -95,6 +100,8 @@ public class SQLDigest { this.groupbyColumns = groupbyColumns; this.subqueryJoinParticipants = subqueryJoinParticipants; + this.dynGroupbyColumns = dynGroupByColumns; + this.metricColumns = metricColumns; this.aggregations = aggregations; this.aggrSqlCalls = aggrSqlCalls; diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index d0f2ca2..f99c868 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.GTUtil; import org.apache.kylin.gridtable.IGTComparator; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.DynamicFunctionDesc; import org.apache.kylin.metadata.model.FunctionDesc; @@ -75,7 +76,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { protected Cuboid cuboid; public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, // - Set<TblColRef> groupByDims, // + Set<TblColRef> groupByDims, List<TblColRef> dynGroupsDims, List<TupleExpression> dynGroupExprs, // Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, // TupleFilter havingFilter, StorageContext context) { this.context = context; @@ -102,6 +103,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { //replace the constant values in filter to dictionary codes Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims); + groupByPushDown.addAll(dynGroupsDims); this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown); this.havingFilter = havingFilter; @@ -112,10 +114,16 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { // for dynamic cols, which are as appended columns to GTInfo BitSet tmpGtDynCols = new BitSet(); + tmpGtDynCols.or(mapping.makeGridTableColumns(Sets.newHashSet(dynGroupsDims)).mutable()); tmpGtDynCols.or(mapping.makeGridTableColumns(dynFuncs).mutable()); this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols); - this.tupleExpressionList = Lists.newArrayListWithExpectedSize(dynFuncs.size()); + this.tupleExpressionList = Lists.newArrayListWithExpectedSize(dynGroupExprs.size() + dynFuncs.size()); + // for dynamic dimensions + for (TupleExpression rtGroupExpr : dynGroupExprs) { + this.tupleExpressionList + .add(GTUtil.convertFilterColumnsAndConstants(rtGroupExpr, gtInfo, mapping, groupByPushDown)); + } // for dynamic measures Set<FunctionDesc> tmpRtAggrMetrics = Sets.newHashSet(); diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index d8b245c..95ffa35 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.ITupleFilterTransformer; import org.apache.kylin.metadata.filter.StringCodeSystem; import org.apache.kylin.metadata.filter.TupleFilter; @@ -53,12 +54,12 @@ public class CubeSegmentScanner implements IGTScanner { final GTScanRequest scanRequest; public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, // - Set<TblColRef> groups, // + Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, // Collection<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, // TupleFilter originalfilter, TupleFilter havingFilter, StorageContext context) { - + logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName()); - + this.cuboid = cuboid; this.cubeSeg = cubeSeg; @@ -74,20 +75,20 @@ public class CubeSegmentScanner implements IGTScanner { CubeScanRangePlanner scanRangePlanner; try { - scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, metrics, dynFuncs, - havingFilter, context); + scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups, + dynGroupExprs, metrics, dynFuncs, havingFilter, context); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } - + scanRequest = scanRangePlanner.planScanRequest(); - + String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context); } - + public boolean isSegmentSkipped() { return scanner.isSegmentSkipped(); } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index d2f6c94..434ba6d 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -43,6 +43,7 @@ import org.apache.kylin.dict.lookup.ILookupTable; import org.apache.kylin.gridtable.StorageLimitLevel; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.bitmap.BitmapMeasureType; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.CaseTupleFilter; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; @@ -94,7 +95,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { continue; } - scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), // + scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), // + request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), // request.getMetrics(), request.getDynFuncs(), // request.getFilter(), request.getHavingFilter(), request.getContext()); if (!scanner.isSegmentSkipped()) @@ -105,7 +107,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return ITupleIterator.EMPTY_TUPLE_ITERATOR; return new SequentialCubeTupleIterator(scanners, request.getCuboid(), request.getDimensions(), - request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest); + request.getDynGroups(), request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest); } public GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext context, SQLDigest sqlDigest, @@ -145,13 +147,19 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // set cuboid to GridTable mapping; boolean noDynamicCols; - + // dynamic dimensions + List<TblColRef> dynGroups = Lists.newArrayList(sqlDigest.dynGroupbyColumns.keySet()); + noDynamicCols = dynGroups.isEmpty(); + List<TupleExpression> dynGroupExprs = Lists.newArrayListWithExpectedSize(sqlDigest.dynGroupbyColumns.size()); + for (TblColRef dynGroupCol : dynGroups) { + dynGroupExprs.add(sqlDigest.dynGroupbyColumns.get(dynGroupCol)); + } // dynamic measures List<DynamicFunctionDesc> dynFuncs = sqlDigest.dynAggregations; - noDynamicCols = dynFuncs.isEmpty(); + noDynamicCols = noDynamicCols && dynFuncs.isEmpty(); CuboidToGridTableMapping mapping = noDynamicCols ? new CuboidToGridTableMapping(cuboid) - : new CuboidToGridTableMappingExt(cuboid, dynFuncs); + : new CuboidToGridTableMappingExt(cuboid, dynGroups, dynFuncs); context.setMapping(mapping); // set whether to aggr at storage @@ -172,8 +180,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { context.setFilterMask(getQueryFilterMask(filterColumnD)); // set limit push down - enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filterD, loosenedColumnD, - sqlDigest.aggregations, context); + enableStorageLimitIfPossible(cuboid, groups, dynGroups, derivedPostAggregation, groupsD, filterD, + loosenedColumnD, sqlDigest.aggregations, context); // set whether to aggregate results from multiple partitions enableStreamAggregateIfBeneficial(cuboid, groupsD, context); // check query deadline @@ -188,8 +196,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { cubeInstance.getName(), cuboid.getId(), groupsD, filterColumnD, context.getFinalPushDownLimit(), context.getStorageLimitLevel(), context.isNeedStorageAggregation()); - return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, filterColumnD, metrics, dynFuncs, filterD, - havingFilter, context); + return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, dynGroups, dynGroupExprs, filterColumnD, + metrics, dynFuncs, filterD, havingFilter, context); } protected abstract String getGTStorage(); @@ -416,7 +424,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } } - private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, + private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, List<TblColRef> dynGroups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) { @@ -432,6 +440,11 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { + " with cuboid columns: " + cuboid.getColumns()); } + if (!dynGroups.isEmpty()) { + storageLimitLevel = StorageLimitLevel.NO_LIMIT; + logger.debug("Storage limit push down is impossible because the query has dynamic groupby " + dynGroups); + } + // derived aggregation is bad, unless expanded columns are already in group by if (!groups.containsAll(derivedPostAggregation)) { storageLimitLevel = StorageLimitLevel.NO_LIMIT; diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java index c66e813..fdc976e 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryRequest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.DynamicFunctionDesc; import org.apache.kylin.metadata.model.FunctionDesc; @@ -36,17 +37,22 @@ public class GTCubeStorageQueryRequest implements Serializable { private Set<TblColRef> groups; private Set<TblColRef> filterCols; private Set<FunctionDesc> metrics; + private List<TblColRef> dynGroups; + private List<TupleExpression> dynGroupExprs; private List<DynamicFunctionDesc> dynFuncs; private TupleFilter filter; private TupleFilter havingFilter; private StorageContext context; - public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // + public GTCubeStorageQueryRequest(Cuboid cuboid, Set<TblColRef> dimensions, // + Set<TblColRef> groups, List<TblColRef> dynGroups, List<TupleExpression> dynGroupExprs, // Set<TblColRef> filterCols, Set<FunctionDesc> metrics, List<DynamicFunctionDesc> dynFuncs, // TupleFilter filter, TupleFilter havingFilter, StorageContext context) { this.cuboid = cuboid; this.dimensions = dimensions; this.groups = groups; + this.dynGroups = dynGroups; + this.dynGroupExprs = dynGroupExprs; this.filterCols = filterCols; this.metrics = metrics; this.dynFuncs = dynFuncs; @@ -79,6 +85,14 @@ public class GTCubeStorageQueryRequest implements Serializable { this.groups = groups; } + public List<TblColRef> getDynGroups() { + return dynGroups; + } + + public List<TupleExpression> getDynGroupExprs() { + return dynGroupExprs; + } + public Set<FunctionDesc> getMetrics() { return metrics; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index c067e33..b8dff8b 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import com.google.common.collect.Sets; import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.FunctionDesc; @@ -53,14 +54,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator { private int scanCount; private int scanCountDelta; - public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // - Set<TblColRef> groups, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, SQLDigest sqlDigest) { + public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, + Set<TblColRef> selectedDimensions, List<TblColRef> rtGroups, Set<TblColRef> groups, // + Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, SQLDigest sqlDigest) { this.context = context; this.scanners = scanners; + Set<TblColRef> selectedDims = Sets.newHashSet(selectedDimensions); + selectedDims.addAll(rtGroups); + segmentCubeTupleIterators = Lists.newArrayList(); for (CubeSegmentScanner scanner : scanners) { - segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context)); + segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDims, selectedMetrics, returnTupleInfo, context)); } if (context.mergeSortPartitionResults() && !sqlDigest.isRawQuery) { diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index c432c12..61aa560 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -29,6 +29,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.DynamicFunctionDesc; import org.apache.kylin.metadata.model.FunctionDesc; @@ -140,6 +141,7 @@ public class ITStorageTest extends HBaseMetadataTestCase { try { SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", /*allCol*/ Collections.<TblColRef> emptySet(), /*join*/ null, // groups, /*subqueryJoinParticipants*/ Sets.<TblColRef> newHashSet(), // + /*dynamicGroupByColumns*/ Collections.<TblColRef, TupleExpression> emptyMap(), // /*metricCol*/ Collections.<TblColRef> emptySet(), aggregations, /*aggrSqlCalls*/ Collections.<SQLCall> emptyList(), // /*dynamicAggregations*/ Collections.<DynamicFunctionDesc> emptyList(), // /*runtimeDimensionColumns*/ Collections.<TblColRef> emptySet(), // diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 0eff905..84f7676 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -266,12 +266,37 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { this.groups = Lists.newArrayList(); for (int i = getGroupSet().nextSetBit(0); i >= 0; i = getGroupSet().nextSetBit(i + 1)) { TupleExpression tupleExpression = inputColumnRowType.getSourceColumnsByIndex(i); - Set<TblColRef> srcCols = ExpressionColCollector.collectColumns(tupleExpression); - // if no source columns, use target column instead - if (srcCols.isEmpty()) { - srcCols.add(inputColumnRowType.getColumnByIndex(i)); + if (tupleExpression instanceof ColumnTupleExpression) { + this.groups.add(((ColumnTupleExpression) tupleExpression).getColumn()); + } else { + TblColRef groupOutCol = inputColumnRowType.getColumnByIndex(i); + Pair<Set<TblColRef>, Set<TblColRef>> cols = ExpressionColCollector.collectColumnsPair(tupleExpression); + + // push down only available for the innermost aggregation + boolean ifPushDown = !afterAggregate; + + // if measure columns exist, don't do push down + if (!cols.getSecond().isEmpty()) { + ifPushDown = false; + } + + // if existing a dimension which is a derived column, don't do push down + for (TblColRef dimCol : cols.getFirst()) { + if (!this.context.belongToFactTableDims(dimCol)) { + ifPushDown = false; + break; + } + } + + if (ifPushDown) { + this.groups.add(groupOutCol); + this.context.dynGroupBy.put(groupOutCol, tupleExpression); + } else { + this.groups.addAll(cols.getFirst()); + this.groups.addAll(cols.getSecond()); + this.context.dynamicFields.remove(groupOutCol); + } } - this.groups.addAll(srcCols); } } @@ -321,7 +346,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } else if (aggCall.getAggregation() instanceof SqlCountAggFunction && !aggCall.isDistinct()) { if (tupleExpr instanceof ColumnTupleExpression) { TblColRef srcCol = ((ColumnTupleExpression) tupleExpr).getColumn(); - if (this.context.belongToFactTable(srcCol)) { + if (this.context.belongToFactTableDims(srcCol)) { tupleExpr = getCountColumnExpression(srcCol); TblColRef column = TblColRef.newInnerColumn(tupleExpr.getDigest(), diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index 20533ad..f3dcd1b 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -33,6 +33,8 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.metadata.expression.ExpressionColCollector; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.DataModelDesc; @@ -158,6 +160,8 @@ public class OLAPContext { // dynamic columns info, note that the name of TblColRef will be the field name public Map<TblColRef, RelDataType> dynamicFields = new HashMap<>(); + public Map<TblColRef, TupleExpression> dynGroupBy = new HashMap<>(); + // hive query public String sql = ""; @@ -172,6 +176,9 @@ public class OLAPContext { public SQLDigest getSQLDigest() { if (sqlDigest == null) { Set<TblColRef> rtDimColumns = new HashSet<>(); + for (TupleExpression tupleExpr : dynGroupBy.values()) { + rtDimColumns.addAll(ExpressionColCollector.collectColumns(tupleExpr)); + } Set<TblColRef> rtMetricColumns = new HashSet<>(); List<DynamicFunctionDesc> dynFuncs = Lists.newLinkedList(); for (FunctionDesc functionDesc : aggregations) { @@ -183,7 +190,7 @@ public class OLAPContext { } } sqlDigest = new SQLDigest(firstTableScan.getTableName(), allColumns, joins, // model - groupByColumns, subqueryJoinParticipants, // group by + groupByColumns, subqueryJoinParticipants, dynGroupBy, // group by metricsColumns, aggregations, aggrSqlCalls, dynFuncs, // aggregation rtDimColumns, rtMetricColumns, // runtime related columns filterColumns, filter, havingFilter, // filter @@ -211,7 +218,7 @@ public class OLAPContext { return false; } - public boolean belongToFactTable(TblColRef tblColRef) { + public boolean belongToFactTableDims(TblColRef tblColRef) { if (!belongToContextTables(tblColRef)) { return false; } -- To stop receiving notification emails like this one, please contact shaofeng...@apache.org.