Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 11d5dc90c -> cd5076497


KYLIN-976 Some measures only aggregates in BaseCuboid


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd507649
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd507649
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd507649

Branch: refs/heads/2.x-staging
Commit: cd50764974b401b7a282af9d22513a5710c10704
Parents: 11d5dc9
Author: lidongsjtu <[email protected]>
Authored: Sun Dec 27 23:56:16 2015 +0800
Committer: lidongsjtu <[email protected]>
Committed: Sun Dec 27 23:59:00 2015 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/InMemCubeBuilder.java      | 25 ++++++++++++++++++--
 .../kylin/gridtable/GTAggregateScanner.java     | 20 ++++++++++++----
 .../kylin/measure/MeasureAggregators.java       | 10 ++++++++
 .../org/apache/kylin/measure/MeasureType.java   |  5 ++++
 .../kylin/engine/mr/common/BatchConstants.java  |  1 +
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  1 +
 .../kylin/engine/mr/steps/CuboidReducer.java    | 19 +++++++++++++--
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  9 +++++--
 .../storage/hbase/cube/v2/CubeStorageQuery.java |  9 +++++--
 9 files changed, 87 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 4bad818..a1940a4 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -46,6 +46,8 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -395,12 +397,31 @@ public class InMemCubeBuilder extends 
AbstractInMemCubeBuilder {
         return scanAndAggregateGridTable(parent.table, parent.cuboidId, 
cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond());
     }
 
+    private GTAggregateScanner prepareGTAggregationScanner(GridTable 
gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, 
ImmutableBitSet measureColumns) throws IOException {
+        GTInfo info = gridTable.getInfo();
+        GTScanRequest req = new GTScanRequest(info, null, aggregationColumns, 
measureColumns, metricsAggrFuncs, null);
+        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+
+        // for child cuboid, some measures don't need aggregation.
+        if (parentId != cuboidId) {
+            boolean[] aggrMask = new boolean[measureDescs.length];
+            for (int i = 0; i < measureDescs.length; i++) {
+                aggrMask[i] = 
!measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid();
+
+                if (!aggrMask[i]) {
+                    logger.info(measureDescs[i].toString() + " doesn't need 
aggregation.");
+                }
+            }
+            scanner.setAggrMask(aggrMask);
+        }
+
+        return scanner;
+    }
     private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long 
parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet 
measureColumns) throws IOException {
         long startTime = System.currentTimeMillis();
         logger.info("Calculating cuboid " + cuboidId);
 
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, 
aggregationColumns, measureColumns, metricsAggrFuncs, null);
-        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+        GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, 
parentId, cuboidId, aggregationColumns, measureColumns);
         GridTable newGridTable = newGridTableByCuboidID(cuboidId);
         GTBuilder builder = newGridTable.rebuild();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index a760b92..f3afaba 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -6,6 +6,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +46,7 @@ public class GTAggregateScanner implements IGTScanner {
 
     private int aggregatedRowCount = 0;
     private MemoryWaterLevel memTracker;
+    private boolean[] aggrMask;
 
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
         if (!req.hasAggregation())
@@ -58,6 +60,9 @@ public class GTAggregateScanner implements IGTScanner {
         this.inputScanner = inputScanner;
         this.aggrCache = new AggregationCache();
         this.spillThreshold = (long) (req.getAggrCacheGB() * 
MemoryBudgetController.ONE_GB);
+        this.aggrMask = new boolean[metricsAggrFuncs.length];
+
+        Arrays.fill(aggrMask, true);
     }
 
     public static long estimateSizeOfAggrCache(byte[] keySample, 
MeasureAggregator<?>[] aggrSample, int size) {
@@ -116,6 +121,10 @@ public class GTAggregateScanner implements IGTScanner {
         return aggrCache.dumps.size();
     }
 
+    public void setAggrMask(boolean[] aggrMask) {
+        this.aggrMask = aggrMask;
+    }
+
     /** return the estimate memory size of aggregation cache */
     public long getEstimateSizeOfAggrCache() {
         return aggrCache.estimatedMemSize();
@@ -218,9 +227,11 @@ public class GTAggregateScanner implements IGTScanner {
                 aggBufMap.put(key, aggrs);
             }
             for (int i = 0; i < aggrs.length; i++) {
-                int col = metrics.trueBitAt(i);
-                Object metrics = info.codeSystem.decodeColumnValue(col, 
r.cols[col].asBuffer());
-                aggrs[i].aggregate(metrics);
+                if (aggrMask[i]) {
+                    int col = metrics.trueBitAt(i);
+                    Object metrics = info.codeSystem.decodeColumnValue(col, 
r.cols[col].asBuffer());
+                    aggrs[i].aggregate(metrics);
+                }
             }
         }
 
@@ -475,7 +486,8 @@ public class GTAggregateScanner implements IGTScanner {
 
                             MeasureAggregator[] newPeekAggr = 
dumpCurrentValues.get(newPeek.getValue());
                             for (int i = 0; i < newPeekAggr.length; i++) {
-                                
mergedAggr[i].aggregate(newPeekAggr[i].getState());
+                                if (aggrMask[i])
+                                    
mergedAggr[i].aggregate(newPeekAggr[i].getState());
                             }
 
                             enqueueFromDump(newPeek.getValue());

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
index b3edbc3..eb8a20b 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -72,6 +72,16 @@ public class MeasureAggregators implements Serializable {
         }
     }
 
+    public void aggregate(Object[] values, boolean[] aggrMask) {
+        assert values.length == descLength;
+        assert aggrMask.length == descLength;
+
+        for (int i = 0; i < descLength; i++) {
+            if (aggrMask[i])
+                aggs[i].aggregate(values[i]);
+        }
+    }
+
     public void collectStates(Object[] states) {
         for (int i = 0; i < descLength; i++) {
             states[i] = aggs[i].getState();

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index e9d97c6..26cac81 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -55,6 +55,11 @@ abstract public class MeasureType<T> {
     public boolean isMemoryHungry() {
         return false;
     }
+
+    /** Return true if this MeasureType only aggregate values in base cuboid, 
and output initial value in child cuboid. */
+    public boolean onlyAggrInBaseCuboid() {
+        return false;
+    }
     
     /* 
============================================================================
      * Build

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index b305741..400a3aa 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -24,6 +24,7 @@ public interface BatchConstants {
 
     String CFG_CUBE_NAME = "cube.name";
     String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+    String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
 
     String CFG_II_NAME = "ii.name";
     String CFG_II_SEGMENT_NAME = "ii.segment.name";

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d24772c..3a1ce99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -132,6 +132,7 @@ public class CuboidJob extends AbstractHadoopJob {
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, 
segmentName);
+            
job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 
nCuboidLevel);
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 381b07c..4dbb53e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -52,6 +52,8 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
     private MeasureAggregators aggs;
 
     private int counter;
+    private int cuboidLevel;
+    private boolean[] needAggr;
     private Object[] input;
     private Object[] result;
 
@@ -63,6 +65,9 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         super.bindCurrentConfiguration(context.getConfiguration());
         cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
 
+        // only used in Build job, not in Merge job
+        cuboidLevel = 
context.getConfiguration().getInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 0);
+
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeDesc = 
CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
@@ -73,16 +78,26 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
 
         input = new Object[measuresDescs.size()];
         result = new Object[measuresDescs.size()];
+        needAggr = new boolean[measuresDescs.size()];
+
+        if (cuboidLevel > 0) {
+            for (int i = 0; i < measuresDescs.size(); i++) {
+                needAggr[i] = 
!measuresDescs.get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+            }
+        }
     }
 
     @Override
     public void reduce(Text key, Iterable<Text> values, Context context) 
throws IOException, InterruptedException {
-
         aggs.reset();
 
         for (Text value : values) {
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength()), input);
-            aggs.aggregate(input);
+            if (cuboidLevel > 0) {
+                aggs.aggregate(input, needAggr);
+            } else {
+                aggs.aggregate(input);
+            }
         }
         aggs.collectStates(result);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index b340be0..32dfc78 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -123,7 +123,7 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         Set<TblColRef> dimensionsD = Sets.newHashSet();
         dimensionsD.addAll(groupsD);
         dimensionsD.addAll(othersD);
-        Cuboid cuboid = identifyCuboid(dimensionsD);
+        Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
         context.setCuboid(cuboid);
 
         // isExactAggregation? meaning: tuples returned from storage requires 
no further aggregation in query engine
@@ -188,7 +188,12 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         }
     }
 
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+    private Cuboid identifyCuboid(Set<TblColRef> dimensions, 
Collection<FunctionDesc> metrics) {
+        for (FunctionDesc metric : metrics) {
+            if (metric.getMeasureType().onlyAggrInBaseCuboid())
+                return Cuboid.getBaseCuboid(cubeDesc);
+        }
+
         long cuboidID = 0;
         for (TblColRef column : dimensions) {
             int index = cubeDesc.getRowkey().getColumnBitIndex(column);

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd507649/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 3a231b5..5a14d40 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -78,7 +78,7 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
         dimensionsD.addAll(groupsD);
         dimensionsD.addAll(filterDimsD);
-        Cuboid cuboid = identifyCuboid(dimensionsD);
+        Cuboid cuboid = identifyCuboid(dimensionsD, metrics);
         context.setCuboid(cuboid);
 
         // isExactAggregation? meaning: tuples returned from storage requires 
no further aggregation in query engine
@@ -155,7 +155,12 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         return expanded;
     }
 
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+    private Cuboid identifyCuboid(Set<TblColRef> dimensions, 
Collection<FunctionDesc> metrics) {
+        for (FunctionDesc metric : metrics) {
+            if (metric.getMeasureType().onlyAggrInBaseCuboid())
+                return Cuboid.getBaseCuboid(cubeDesc);
+        }
+
         long cuboidID = 0;
         for (TblColRef column : dimensions) {
             int index = cubeDesc.getRowkey().getColumnBitIndex(column);

Reply via email to