KYLIN-2606 Only return counter for precise count_distinct if query is 
exactAggregate


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d610a6dd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d610a6dd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d610a6dd

Branch: refs/heads/KYLIN-2606
Commit: d610a6dd50b547c0f1f4d1529aaf626760c66549
Parents: fdacf5d
Author: kangkaisen <kangkai...@163.com>
Authored: Wed Feb 15 19:53:17 2017 +0800
Committer: kangkaisen <kangkai...@live.com>
Committed: Wed May 24 15:36:38 2017 +0800

----------------------------------------------------------------------
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   4 +
 .../org/apache/kylin/gridtable/GTRecord.java    |   8 +
 .../kylin/gridtable/GTSampleCodeSystem.java     |   4 +
 .../apache/kylin/gridtable/IGTCodeSystem.java   |   3 +
 .../measure/bitmap/BitmapCounterFactory.java    |   2 +
 .../kylin/measure/bitmap/BitmapSerializer.java  |  37 ++++-
 .../measure/bitmap/RoaringBitmapCounter.java    |  10 ++
 .../bitmap/RoaringBitmapCounterFactory.java     |   5 +
 .../metadata/datatype/DataTypeSerializer.java   |  13 ++
 .../gtrecord/GTCubeStorageQueryBase.java        |  47 ++++++
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   1 +
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   2 +-
 .../hbase/cube/v2/HBaseReadonlyStore.java       |  33 +++-
 .../coprocessor/endpoint/CubeVisitService.java  |   2 +-
 .../endpoint/generated/CubeVisitProtos.java     | 151 +++++++++++++++----
 .../endpoint/protobuf/CubeVisit.proto           |   1 +
 16 files changed, 286 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index aaa12a7..9eae6f3 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -177,4 +177,8 @@ public class CubeCodeSystem implements IGTCodeSystem {
         return result;
     }
 
+    @Override
+    public DataTypeSerializer<?> getSerializer(int col) {
+        return serializers[col];
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 3e62ea7..f65e4b5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -296,4 +296,12 @@ public class GTRecord implements Comparable<GTRecord>, 
Cloneable {
         }
     }
 
+    /** change pointers to point to data in given buffer, this
+     *  method allows to defined specific column to load */
+    public void loadColumns(int selectedCol, ByteBuffer buf) {
+        int pos = buf.position();
+        int len = info.codeSystem.codeLength(selectedCol, buf);
+        cols[selectedCol].set(buf.array(), buf.arrayOffset() + pos, len);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index 3f3c844..2a5e791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -118,4 +118,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
         }
     };
 
+    @Override
+    public DataTypeSerializer<?> getSerializer(int col) {
+        return serializers[col];
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
index 89dfc99..9c8ad6b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 public interface IGTCodeSystem {
 
@@ -62,4 +63,6 @@ public interface IGTCodeSystem {
     /** Return aggregators for metrics */
     MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, 
String[] aggrFunctions);
 
+    /** Return specific DataTypeSerializer */
+    DataTypeSerializer<?> getSerializer(int col);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
index da7748e..39aa1be 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
@@ -26,5 +26,7 @@ public interface BitmapCounterFactory {
 
     BitmapCounter newBitmap(int... values);
 
+    BitmapCounter newBitmap(long counter);
+
     BitmapCounter newBitmap(ByteBuffer in) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index c1b260d..e2ec4cc 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -28,6 +28,9 @@ public class BitmapSerializer extends 
DataTypeSerializer<BitmapCounter> {
     private static final BitmapCounterFactory factory = 
RoaringBitmapCounterFactory.INSTANCE;
     private static final BitmapCounter DELEGATE = factory.newBitmap();
 
+    private static final int IS_RESULT_FLAG = 1;
+    private static final int RESULT_SIZE = 12;
+
     // called by reflection
     public BitmapSerializer(DataType type) {
     }
@@ -44,8 +47,12 @@ public class BitmapSerializer extends 
DataTypeSerializer<BitmapCounter> {
     @Override
     public BitmapCounter deserialize(ByteBuffer in) {
         try {
-            return factory.newBitmap(in);
-
+            if (peekLength(in) == RESULT_SIZE) {
+                int flag = in.getInt();
+                return factory.newBitmap(in.getLong());
+            } else {
+                return factory.newBitmap(in);
+            }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -53,7 +60,12 @@ public class BitmapSerializer extends 
DataTypeSerializer<BitmapCounter> {
 
     @Override
     public int peekLength(ByteBuffer in) {
-        return DELEGATE.peekLength(in);
+        ByteBuffer buffer = in.slice();
+        if (buffer.getInt(0) == IS_RESULT_FLAG) {
+            return RESULT_SIZE;
+        } else {
+            return DELEGATE.peekLength(in);
+        }
     }
 
     @Override
@@ -71,4 +83,23 @@ public class BitmapSerializer extends 
DataTypeSerializer<BitmapCounter> {
         // It's difficult to decide the size before data was ingested, 
comparing with HLLCounter(16) as 64KB, here is assumption
         return 8 * 1024;
     }
+
+    @Override
+    public boolean supportDirectReturnResult() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer getFinalResult(ByteBuffer in) {
+        ByteBuffer out = ByteBuffer.allocate(RESULT_SIZE);
+        try {
+            BitmapCounter counter = factory.newBitmap(in);
+            out.putInt(IS_RESULT_FLAG);
+            out.putLong(counter.getCount());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        out.flip();
+        return out;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
index eec45f2..9929e24 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
@@ -35,6 +35,7 @@ import java.util.Iterator;
 public class RoaringBitmapCounter implements BitmapCounter, Serializable {
 
     private ImmutableRoaringBitmap bitmap;
+    private Long counter;
 
     RoaringBitmapCounter() {
         bitmap = new MutableRoaringBitmap();
@@ -44,6 +45,11 @@ public class RoaringBitmapCounter implements BitmapCounter, 
Serializable {
         this.bitmap = bitmap;
     }
 
+    RoaringBitmapCounter(long counter) {
+        this.counter = counter;
+    }
+
+
     private MutableRoaringBitmap getMutableBitmap() {
         if (bitmap instanceof MutableRoaringBitmap) {
             return (MutableRoaringBitmap) bitmap;
@@ -86,6 +92,10 @@ public class RoaringBitmapCounter implements BitmapCounter, 
Serializable {
 
     @Override
     public long getCount() {
+        if (counter != null) {
+            return counter;
+        }
+
         return bitmap.getCardinality();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
index 822afa2..8ab908a 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
@@ -40,6 +40,11 @@ public class RoaringBitmapCounterFactory implements 
BitmapCounterFactory, Serial
     }
 
     @Override
+    public BitmapCounter newBitmap(long counter) {
+        return new RoaringBitmapCounter(counter);
+    }
+
+    @Override
     public BitmapCounter newBitmap(ByteBuffer in) throws IOException {
         RoaringBitmapCounter counter = new RoaringBitmapCounter();
         counter.readFields(in);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index a4a35a4..2de38c0 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -89,6 +89,19 @@ abstract public class DataTypeSerializer<T> implements 
BytesSerializer<T>, java.
         throw new UnsupportedOperationException();
     }
 
+    /** If the query is exactAggregation and has some memory hungry measures,
+     * we could directly return final result to speed up the query.
+     * If the DataTypeSerializer support this,
+     * which should override the getFinalResult method, besides that, the 
deserialize and peekLength method should also support it, like {@link 
org.apache.kylin.measure.bitmap.BitmapSerializer} */
+    public boolean supportDirectReturnResult() {
+        return false;
+    }
+
+    /** An optional method that converts a expensive buffer to lightweight 
buffer containing final result (for memory hungry measures) */
+    public ByteBuffer getFinalResult(ByteBuffer in) {
+        throw new UnsupportedOperationException();
+    }
+
     /** Convert from obj to string */
     public String toString(T value) {
         if (value == null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index a8c4872..267dedb 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -44,6 +44,7 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -134,6 +135,10 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
         context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, 
groupsD, singleValuesD));
 
+        // exactAggregation mean: needn't aggregation at storage and query 
engine both.
+        boolean exactAggregation = isExactAggregation(context, cuboid, groups, 
otherDimsD, singleValuesD, derivedPostAggregation);
+        context.setExactAggregation(exactAggregation);
+
         // replace derived columns in filter with host columns; columns on 
loosened condition must be added to group by
         Set<TblColRef> loosenedColumnD = Sets.newHashSet();
         Set<TblColRef> filterColumnD = Sets.newHashSet();
@@ -460,4 +465,46 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         return havingFilter;
     }
 
+    private boolean isExactAggregation(StorageContext context, Cuboid cuboid, 
Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> 
singleValuesD, Set<TblColRef> derivedPostAggregation) {
+        boolean exact = true;
+
+        if (context.isNeedStorageAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because need storage 
aggregation");
+        }
+
+        if (cuboid.requirePostAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because cuboid " + 
cuboid.getInputID() + "=> " + cuboid.getId());
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in 
group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because derived column 
require post aggregation: " + derivedPostAggregation);
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have 
single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because some column not on 
group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+        }
+
+        // for partitioned cube, the partition column must belong to group by 
or has single value
+        PartitionDesc partDesc = 
cuboid.getCubeDesc().getModel().getPartitionDesc();
+        if (partDesc.isPartitioned()) {
+            TblColRef col = partDesc.getPartitionDateColumnRef();
+            if (!groups.contains(col) && !singleValuesD.contains(col)) {
+                exact = false;
+                logger.info("exactAggregation is false because cube is 
partitioned and " + col + " is not on group by");
+            }
+        }
+
+        if (exact) {
+            logger.info("exactAggregation is true, cuboid id is " + 
cuboid.getId());
+        }
+        return exact;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index e822ada..af8754d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -163,6 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
         
builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
         
builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
+        builder.setIsExactAggregate(storageContext.isExactAggregation());
 
         for (final Pair<byte[], byte[]> epRange : 
getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             executorService.submit(new Runnable() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 951e2ef..a8f4fd8 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -193,7 +193,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
             }
         };
 
-        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, 
rawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
cubeSeg.getRowKeyPreambleSize(), false);
+        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, 
rawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
cubeSeg.getRowKeyPreambleSize(), false, storageContext.isExactAggregation());
         IGTScanner rawScanner = store.scan(scanRequest);
 
         final IGTScanner decorateScanner = 
scanRequest.decorateScanner(rawScanner);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 631e8e8..4ec0c9d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -44,18 +45,19 @@ public class HBaseReadonlyStore implements IGTStore {
     private List<List<Integer>> hbaseColumnsToGT;
     private int rowkeyPreambleSize;
     private boolean withDelay = false;
-
+    private boolean isExactAggregation;
 
     /**
      * @param withDelay is for test use
      */
-    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest 
gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> 
hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) {
+    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest 
gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> 
hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay, boolean 
isExactAggregation) {
         this.cellListIterator = cellListIterator;
         this.info = gtScanRequest.getInfo();
         this.hbaseColumns = hbaseColumns;
         this.hbaseColumnsToGT = hbaseColumnsToGT;
         this.rowkeyPreambleSize = rowkeyPreambleSize;
         this.withDelay = withDelay;
+        this.isExactAggregation = isExactAggregation;
     }
 
     @Override
@@ -132,6 +134,12 @@ public class HBaseReadonlyStore implements IGTStore {
                             buf = byteBuffer(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
                             oneRecord.loadColumns(hbaseColumnsToGT.get(i), 
buf);
                         }
+
+
+                        if (isExactAggregation && 
getDirectReturnResultColumns().size() > 0) {
+                            trimGTRecord(oneRecord);
+                        }
+
                         return oneRecord;
 
                     }
@@ -145,6 +153,27 @@ public class HBaseReadonlyStore implements IGTStore {
                         return ByteBuffer.wrap(array, offset, length);
                     }
 
+                    private List<Integer> getDirectReturnResultColumns() {
+                        List<Integer> columns = new ArrayList<>();
+                        for (int i = 0; i < info.getColumnCount(); i++) {
+                            if 
(info.getCodeSystem().getSerializer(i).supportDirectReturnResult()) {
+                                columns.add(i);
+                            }
+                        }
+                        return columns;
+                    }
+
+                    private void trimGTRecord(GTRecord record) {
+                        List<Integer> directReturnResultColumns = 
getDirectReturnResultColumns();
+                        for (Integer i : directReturnResultColumns) {
+                            ByteBuffer recordBuffer = record.get(i).asBuffer();
+                            if (recordBuffer!= null) {
+                                ByteBuffer trimmedBuffer = 
info.getCodeSystem().getSerializer(i).getFinalResult(recordBuffer);
+                                record.loadColumns(i, trimmedBuffer);
+                            }
+                        }
+                    }
+
                 };
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index cde127e..62dac4c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -296,7 +296,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                     !request.hasMaxScanBytes() ? Long.MAX_VALUE : 
request.getMaxScanBytes(), // for new client
                     scanReq.getTimeout());
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
request.getRowkeyPreambleSize(), behavior.delayToggledOn());
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, 
request.getRowkeyPreambleSize(), behavior.delayToggledOn(), 
request.getIsExactAggregate());
 
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, 
behavior.filterToggledOn(), behavior.aggrToggledOn(), false, 
request.getSpillEnabled());

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index b9f2771..4c662c9 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -123,7 +123,7 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     boolean hasMaxScanBytes();
@@ -131,10 +131,20 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     long getMaxScanBytes();
+
+    // optional bool isExactAggregate = 9 [default = false];
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    boolean hasIsExactAggregate();
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    boolean getIsExactAggregate();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -230,6 +240,11 @@ public final class CubeVisitProtos {
               maxScanBytes_ = input.readInt64();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000080;
+              isExactAggregate_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -982,7 +997,7 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     public boolean hasMaxScanBytes() {
@@ -992,13 +1007,29 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     public long getMaxScanBytes() {
       return maxScanBytes_;
     }
 
+    // optional bool isExactAggregate = 9 [default = false];
+    public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 9;
+    private boolean isExactAggregate_;
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    public boolean hasIsExactAggregate() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    public boolean getIsExactAggregate() {
+      return isExactAggregate_;
+    }
+
     private void initFields() {
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
       hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
@@ -1008,6 +1039,7 @@ public final class CubeVisitProtos {
       queryId_ = "";
       spillEnabled_ = true;
       maxScanBytes_ = 0L;
+      isExactAggregate_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1061,6 +1093,9 @@ public final class CubeVisitProtos {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeInt64(8, maxScanBytes_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeBool(9, isExactAggregate_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1102,6 +1137,10 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(8, maxScanBytes_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(9, isExactAggregate_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1162,6 +1201,11 @@ public final class CubeVisitProtos {
         result = result && (getMaxScanBytes()
             == other.getMaxScanBytes());
       }
+      result = result && (hasIsExactAggregate() == 
other.hasIsExactAggregate());
+      if (hasIsExactAggregate()) {
+        result = result && (getIsExactAggregate()
+            == other.getIsExactAggregate());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1207,6 +1251,10 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getMaxScanBytes());
       }
+      if (hasIsExactAggregate()) {
+        hash = (37 * hash) + ISEXACTAGGREGATE_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getIsExactAggregate());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1337,6 +1385,8 @@ public final class CubeVisitProtos {
         bitField0_ = (bitField0_ & ~0x00000040);
         maxScanBytes_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000080);
+        isExactAggregate_ = false;
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
 
@@ -1402,6 +1452,10 @@ public final class CubeVisitProtos {
           to_bitField0_ |= 0x00000040;
         }
         result.maxScanBytes_ = maxScanBytes_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.isExactAggregate_ = isExactAggregate_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1469,6 +1523,9 @@ public final class CubeVisitProtos {
         if (other.hasMaxScanBytes()) {
           setMaxScanBytes(other.getMaxScanBytes());
         }
+        if (other.hasIsExactAggregate()) {
+          setIsExactAggregate(other.getIsExactAggregate());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2068,7 +2125,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public boolean hasMaxScanBytes() {
@@ -2078,7 +2135,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public long getMaxScanBytes() {
@@ -2088,7 +2145,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public Builder setMaxScanBytes(long value) {
@@ -2101,7 +2158,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public Builder clearMaxScanBytes() {
@@ -2111,6 +2168,39 @@ public final class CubeVisitProtos {
         return this;
       }
 
+      // optional bool isExactAggregate = 9 [default = false];
+      private boolean isExactAggregate_ ;
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public boolean hasIsExactAggregate() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public boolean getIsExactAggregate() {
+        return isExactAggregate_;
+      }
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public Builder setIsExactAggregate(boolean value) {
+        bitField0_ |= 0x00000100;
+        isExactAggregate_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public Builder clearIsExactAggregate() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        isExactAggregate_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -5516,32 +5606,33 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\205\002\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\246\002\n\020C" +
       "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" +
       "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" +
       "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 
\003(\0132\031.Cub" +
       "eVisitRequest.IntList\022\027\n\017kylinProperties" +
       "\030\005 \002(\t\022\017\n\007queryId\030\006 
\001(\t\022\032\n\014spillEnabled\030" +
-      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 
\001(\003\032\027\n\007Int" +
-      "List\022\014\n\004ints\030\001 
\003(\005\"\253\004\n\021CubeVisitResponse",
-      "\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 
\002(\0132" +
-      "\030.CubeVisitResponse.Stats\022/\n\terrorInfo\030\003" +
-      " \001(\0132\034.CubeVisitResponse.ErrorInfo\032\220\002\n\005S" +
-      "tats\022\030\n\020serviceStartTime\030\001 
\001(\003\022\026\n\016servic" +
-      "eEndTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\003\022" +
-      "\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\rsystemCpu" +
-      "Load\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001" +
-      "(\001\022\031\n\021freeSwapSpaceSize\030\007 
\001(\001\022\020\n\010hostnam" +
-      "e\030\010 \001(\t\022\016\n\006etcMsg\030\t 
\001(\t\022\026\n\016normalComplet" +
-      "e\030\n \001(\005\022\024\n\014scannedBytes\030\013 
\001(\003\032H\n\tErrorIn",
-      "fo\022*\n\004type\030\001 \002(\0162\034.CubeVisitResponse.Err" +
-      "orType\022\017\n\007message\030\002 
\002(\t\"G\n\tErrorType\022\020\n\014" +
-      
"UNKNOWN_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_" +
-      "LIMIT_EXCEEDED\020\0022F\n\020CubeVisitService\0222\n\t" +
-      "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" +
-      "ResponseB`\nEorg.apache.kylin.storage.hba" +
-      "se.cube.v2.coprocessor.endpoint.generate" +
-      "dB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 
\001(\003\022\037\n\020isE" +
+      "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n",
+      "\004ints\030\001 
\003(\005\"\253\004\n\021CubeVisitResponse\022\026\n\016com" +
+      "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 
\002(\0132\030.CubeV" +
+      "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." +
+      "CubeVisitResponse.ErrorInfo\032\220\002\n\005Stats\022\030\n" +
+      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\003\022\032\n\022aggr" +
+      "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " +
+      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001(\001\022\031\n\021f" +
+      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t" +
+      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n 
\001(\005",
+      "\022\024\n\014scannedBytes\030\013 
\001(\003\032H\n\tErrorInfo\022*\n\004t" +
+      "ype\030\001 \002(\0162\034.CubeVisitResponse.ErrorType\022" +
+      "\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014UNKNOWN" 
+
+      
"_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_LIMIT_E" +
+      "XCEEDED\020\0022F\n\020CubeVisitService\0222\n\tvisitCu" +
+      "be\022\021.CubeVisitRequest\032\022.CubeVisitRespons" +
+      "eB`\nEorg.apache.kylin.storage.hbase.cube" +
+      ".v2.coprocessor.endpoint.generatedB\017Cube" +
+      "VisitProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5553,7 +5644,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", 
"RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", 
"SpillEnabled", "MaxScanBytes", });
+              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", 
"RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", 
"SpillEnabled", "MaxScanBytes", "IsExactAggregate", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/kylin/blob/d610a6dd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index aa83595..8ca8756 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -38,6 +38,7 @@ message CubeVisitRequest {
     optional string queryId = 6;
     optional bool spillEnabled = 7 [default = true];
     optional int64 maxScanBytes = 8; // must be positive
+    optional bool isExactAggregate = 9 [default = false];
     message IntList {
         repeated int32 ints = 1;
     }

Reply via email to