Repository: kylin Updated Branches: refs/heads/2.x-staging 11d5dc90c -> cd5076497
KYLIN-976 Some measures only aggregates in BaseCuboid Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd507649 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd507649 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd507649 Branch: refs/heads/2.x-staging Commit: cd50764974b401b7a282af9d22513a5710c10704 Parents: 11d5dc9 Author: lidongsjtu <[email protected]> Authored: Sun Dec 27 23:56:16 2015 +0800 Committer: lidongsjtu <[email protected]> Committed: Sun Dec 27 23:59:00 2015 +0800 ---------------------------------------------------------------------- .../cube/inmemcubing/InMemCubeBuilder.java | 25 ++++++++++++++++++-- .../kylin/gridtable/GTAggregateScanner.java | 20 ++++++++++++---- .../kylin/measure/MeasureAggregators.java | 10 ++++++++ .../org/apache/kylin/measure/MeasureType.java | 5 ++++ .../kylin/engine/mr/common/BatchConstants.java | 1 + .../apache/kylin/engine/mr/steps/CuboidJob.java | 1 + .../kylin/engine/mr/steps/CuboidReducer.java | 19 +++++++++++++-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 9 +++++-- .../storage/hbase/cube/v2/CubeStorageQuery.java | 9 +++++-- 9 files changed, 87 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 4bad818..a1940a4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -46,6 +46,8 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -395,12 +397,31 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond()); } + private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { + GTInfo info = gridTable.getInfo(); + GTScanRequest req = new GTScanRequest(info, null, aggregationColumns, measureColumns, metricsAggrFuncs, null); + GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req); + + // for child cuboid, some measures don't need aggregation. + if (parentId != cuboidId) { + boolean[] aggrMask = new boolean[measureDescs.length]; + for (int i = 0; i < measureDescs.length; i++) { + aggrMask[i] = !measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid(); + + if (!aggrMask[i]) { + logger.info(measureDescs[i].toString() + " doesn't need aggregation."); + } + } + scanner.setAggrMask(aggrMask); + } + + return scanner; + } private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { long startTime = System.currentTimeMillis(); logger.info("Calculating cuboid " + cuboidId); - GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null); - GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req); + GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, measureColumns); GridTable newGridTable = newGridTableByCuboidID(cuboidId); GTBuilder builder = newGridTable.rebuild(); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index a760b92..f3afaba 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -6,6 +6,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -45,6 +46,7 @@ public class GTAggregateScanner implements IGTScanner { private int aggregatedRowCount = 0; private MemoryWaterLevel memTracker; + private boolean[] aggrMask; public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) { if (!req.hasAggregation()) @@ -58,6 +60,9 @@ public class GTAggregateScanner implements IGTScanner { this.inputScanner = inputScanner; this.aggrCache = new AggregationCache(); this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB); + this.aggrMask = new boolean[metricsAggrFuncs.length]; + + Arrays.fill(aggrMask, true); } public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) { @@ -116,6 +121,10 @@ public class GTAggregateScanner implements IGTScanner { return aggrCache.dumps.size(); } + public void setAggrMask(boolean[] aggrMask) { + this.aggrMask = aggrMask; + } + /** return the estimate memory size of aggregation cache */ public long getEstimateSizeOfAggrCache() { return aggrCache.estimatedMemSize(); @@ -218,9 +227,11 @@ public class GTAggregateScanner implements IGTScanner { aggBufMap.put(key, aggrs); } for (int i = 0; i < aggrs.length; i++) { - int col = metrics.trueBitAt(i); - Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer()); - aggrs[i].aggregate(metrics); + if (aggrMask[i]) { + int col = metrics.trueBitAt(i); + Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer()); + aggrs[i].aggregate(metrics); + } } } @@ -475,7 +486,8 @@ public class GTAggregateScanner implements IGTScanner { MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue()); for (int i = 0; i < newPeekAggr.length; i++) { - mergedAggr[i].aggregate(newPeekAggr[i].getState()); + if (aggrMask[i]) + mergedAggr[i].aggregate(newPeekAggr[i].getState()); } enqueueFromDump(newPeek.getValue()); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java index b3edbc3..eb8a20b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java @@ -72,6 +72,16 @@ public class MeasureAggregators implements Serializable { } } + public void aggregate(Object[] values, boolean[] aggrMask) { + assert values.length == descLength; + assert aggrMask.length == descLength; + + for (int i = 0; i < descLength; i++) { + if (aggrMask[i]) + aggs[i].aggregate(values[i]); + } + } + public void collectStates(Object[] states) { for (int i = 0; i < descLength; i++) { states[i] = aggs[i].getState(); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index e9d97c6..26cac81 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -55,6 +55,11 @@ abstract public class MeasureType<T> { public boolean isMemoryHungry() { return false; } + + /** Return true if this MeasureType only aggregate values in base cuboid, and output initial value in child cuboid. */ + public boolean onlyAggrInBaseCuboid() { + return false; + } /* ============================================================================ * Build http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index b305741..400a3aa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -24,6 +24,7 @@ public interface BatchConstants { String CFG_CUBE_NAME = "cube.name"; String CFG_CUBE_SEGMENT_NAME = "cube.segment.name"; + String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level"; String CFG_II_NAME = "ii.name"; String CFG_II_SEGMENT_NAME = "ii.segment.name"; http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index d24772c..3a1ce99 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -132,6 +132,7 @@ public class CuboidJob extends AbstractHadoopJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel); // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index 381b07c..4dbb53e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -52,6 +52,8 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { private MeasureAggregators aggs; private int counter; + private int cuboidLevel; + private boolean[] needAggr; private Object[] input; private Object[] result; @@ -63,6 +65,9 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { super.bindCurrentConfiguration(context.getConfiguration()); cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); + // only used in Build job, not in Merge job + cuboidLevel = context.getConfiguration().getInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 0); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor(); @@ -73,16 +78,26 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { input = new Object[measuresDescs.size()]; result = new Object[measuresDescs.size()]; + needAggr = new boolean[measuresDescs.size()]; + + if (cuboidLevel > 0) { + for (int i = 0; i < measuresDescs.size(); i++) { + needAggr[i] = !measuresDescs.get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); + } + } } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - aggs.reset(); for (Text value : values) { codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); - aggs.aggregate(input); + if (cuboidLevel > 0) { + aggs.aggregate(input, needAggr); + } else { + aggs.aggregate(input); + } } aggs.collectStates(result); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index b340be0..32dfc78 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -123,7 +123,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { Set<TblColRef> dimensionsD = Sets.newHashSet(); dimensionsD.addAll(groupsD); dimensionsD.addAll(othersD); - Cuboid cuboid = identifyCuboid(dimensionsD); + Cuboid cuboid = identifyCuboid(dimensionsD, metrics); context.setCuboid(cuboid); // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine @@ -188,7 +188,12 @@ public class CubeStorageQuery implements ICachableStorageQuery { } } - private Cuboid identifyCuboid(Set<TblColRef> dimensions) { + private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { + for (FunctionDesc metric : metrics) { + if (metric.getMeasureType().onlyAggrInBaseCuboid()) + return Cuboid.getBaseCuboid(cubeDesc); + } + long cuboidID = 0; for (TblColRef column : dimensions) { int index = cubeDesc.getRowkey().getColumnBitIndex(column); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 3a231b5..5a14d40 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -78,7 +78,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); dimensionsD.addAll(groupsD); dimensionsD.addAll(filterDimsD); - Cuboid cuboid = identifyCuboid(dimensionsD); + Cuboid cuboid = identifyCuboid(dimensionsD, metrics); context.setCuboid(cuboid); // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine @@ -155,7 +155,12 @@ public class CubeStorageQuery implements ICachableStorageQuery { return expanded; } - private Cuboid identifyCuboid(Set<TblColRef> dimensions) { + private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { + for (FunctionDesc metric : metrics) { + if (metric.getMeasureType().onlyAggrInBaseCuboid()) + return Cuboid.getBaseCuboid(cubeDesc); + } + long cuboidID = 0; for (TblColRef column : dimensions) { int index = cubeDesc.getRowkey().getColumnBitIndex(column);
