Repository: kylin
Updated Branches:
  refs/heads/2.0-rc 19676e6ea -> 7b53cad50


KYLIN-1233 1) drop unnecessary dependency; 2) configurable spill threshold; 3) 
optimize for all-in-mem case


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7b53cad5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7b53cad5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7b53cad5

Branch: refs/heads/2.0-rc
Commit: 7b53cad50f2105e481f0b9f96651d5a8b453e35e
Parents: 19676e6
Author: Yang Li <[email protected]>
Authored: Sat Dec 26 19:09:02 2015 +0800
Committer: Yang Li <[email protected]>
Committed: Sat Dec 26 19:09:02 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/common/util/Pair.java |  18 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   2 +-
 .../kylin/gridtable/GTAggregateScanner.java     | 181 +++++++++++--------
 .../apache/kylin/gridtable/GTScanRequest.java   |  17 +-
 .../gridtable/AggregationCacheSpillTest.java    |   9 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  10 +-
 6 files changed, 150 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java 
b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
index 9efdf3b..b54e517 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
@@ -66,6 +66,10 @@ public class Pair<T1, T2> implements Serializable {
     public void setFirst(T1 a) {
         this.first = a;
     }
+    
+    public void setKey(T1 a) {
+        setFirst(a);
+    }
 
     /**
      * Replace the second element of the pair.
@@ -75,6 +79,10 @@ public class Pair<T1, T2> implements Serializable {
         this.second = b;
     }
 
+    public void setValue(T2 b) {
+        setSecond(b);
+    }
+
     /**
      * Return the first element stored in the pair.
      * @return T1
@@ -83,6 +91,10 @@ public class Pair<T1, T2> implements Serializable {
         return first;
     }
 
+    public T1 getKey() {
+        return getFirst();
+    }
+    
     /**
      * Return the second element stored in the pair.
      * @return T2
@@ -90,13 +102,17 @@ public class Pair<T1, T2> implements Serializable {
     public T2 getSecond() {
         return second;
     }
+    
+    public T2 getValue() {
+        return getSecond();
+    }
 
     private static boolean equals(Object x, Object y) {
         return (x == null && y == null) || (x != null && x.equals(y));
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
-    @SuppressWarnings("unchecked")
     public boolean equals(Object other) {
         return other instanceof Pair && equals(first, ((Pair) other).first) && 
equals(second, ((Pair) other).second);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 73f89c1..97b2d5e 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -333,7 +333,7 @@ public class InMemCubeBuilder extends 
AbstractInMemCubeBuilder {
 
         Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = 
InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, 
measureCount);
         GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, 
dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), 
metricsAggrFuncs, null);
-        GTAggregateScanner aggregationScanner = new 
GTAggregateScanner(baseInput, req, true);
+        GTAggregateScanner aggregationScanner = new 
GTAggregateScanner(baseInput, req);
         aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
 
         int count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 46d51a4..0dd9e61 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -13,12 +13,11 @@ import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.SortedMap;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.util.KryoUtils;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.slf4j.Logger;
@@ -33,7 +32,6 @@ import com.google.common.collect.Maps;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class GTAggregateScanner implements IGTScanner {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(GTAggregateScanner.class);
 
     final GTInfo info;
@@ -43,12 +41,12 @@ public class GTAggregateScanner implements IGTScanner {
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
     final AggregationCache aggrCache;
-    final boolean enableMemCheck;
+    final long spillThreshold;
 
     private int aggregatedRowCount = 0;
     private MemoryWaterLevel memTracker;
 
-    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, 
boolean enableMemCheck) {
+    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
         if (!req.hasAggregation())
             throw new IllegalStateException();
 
@@ -59,7 +57,7 @@ public class GTAggregateScanner implements IGTScanner {
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
         this.aggrCache = new AggregationCache();
-        this.enableMemCheck = enableMemCheck;
+        this.spillThreshold = (long) (req.getAggrCacheGB() * 
MemoryBudgetController.ONE_GB);
     }
 
     public static long estimateSizeOfAggrCache(byte[] keySample, 
MeasureAggregator<?>[] aggrSample, int size) {
@@ -111,19 +109,19 @@ public class GTAggregateScanner implements IGTScanner {
         for (GTRecord r : inputScanner) {
             aggrCache.aggregate(r);
         }
-        logger.info("Last spill, current AggregationCache memory estimated 
size is: " + getEstimateSizeOfAggrCache());
-        aggrCache.spillBuffMap();
         return aggrCache.iterator();
     }
 
+    public int getNumOfSpills() {
+        return aggrCache.dumps.size();
+    }
+
     /** return the estimate memory size of aggregation cache */
     public long getEstimateSizeOfAggrCache() {
         return aggrCache.estimatedMemSize();
     }
 
     class AggregationCache implements Closeable {
-        final static double SPILL_THRESHOLD_GB = 0.5;
-
         final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
@@ -201,17 +199,16 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         void aggregate(GTRecord r) {
-            if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) {
+            if (++aggregatedRowCount % 1000 == 0) {
                 if (memTracker != null) {
                     memTracker.markHigh();
                 }
-            }
-
-            // Here will spill to disk when aggBufMap used too large memory
-            long estimated = estimatedMemSize();
-            if (estimated > SPILL_THRESHOLD_GB * 
MemoryBudgetController.ONE_GB) {
-                logger.info("AggregationCache memory estimated size is: " + 
estimated);
-                spillBuffMap();
+                if (spillThreshold > 0) {
+                    // spill to disk when aggBufMap used too large memory
+                    if (estimatedMemSize() > spillThreshold) {
+                        spillBuffMap();
+                    }
+                }
             }
 
             final byte[] key = createKey(r);
@@ -228,6 +225,9 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         private void spillBuffMap() throws RuntimeException {
+            if (aggBufMap.isEmpty())
+                return;
+            
             try {
                 Dump dump = new Dump(aggBufMap);
                 dump.flush();
@@ -263,52 +263,83 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         public Iterator<GTRecord> iterator() {
-            final DumpMerger merger = new DumpMerger(dumps);
+            // the all-in-mem case
+            if (dumps.isEmpty()) {
+                return new Iterator<GTRecord>() {
+                    final Iterator<Entry<byte[], MeasureAggregator[]>> it = 
aggBufMap.entrySet().iterator();
+                    final ReturningRecord returningRecord = new 
ReturningRecord();
 
-            return new Iterator<GTRecord>() {
+                    @Override
+                    public boolean hasNext() {
+                        return it.hasNext();
+                    }
 
-                final Iterator<Entry<byte[], MeasureAggregator[]>> it = 
merger.iterator();
+                    @Override
+                    public GTRecord next() {
+                        Entry<byte[], MeasureAggregator[]> entry = it.next();
+                        returningRecord.load(entry.getKey(), entry.getValue());
+                        return returningRecord.record;
+                    }
 
-                final ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
-                final GTRecord secondRecord = new GTRecord(info);
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+            // the spill case
+            else {
+                logger.info("Last spill, current AggregationCache memory 
estimated size is: " + getEstimateSizeOfAggrCache());
+                this.spillBuffMap();
 
-                @Override
-                public boolean hasNext() {
-                    return it.hasNext();
-                }
+                return new Iterator<GTRecord>() {
+                    final DumpMerger merger = new DumpMerger(dumps);
+                    final Iterator<Pair<byte[], MeasureAggregator[]>> it = 
merger.iterator();
+                    final ReturningRecord returningRecord = new 
ReturningRecord();
 
-                @Override
-                public GTRecord next() {
-                    Entry<byte[], MeasureAggregator[]> entry = it.next();
-                    create(entry.getKey(), entry.getValue());
-                    return secondRecord;
-                }
+                    @Override
+                    public boolean hasNext() {
+                        return it.hasNext();
+                    }
 
-                private void create(byte[] key, MeasureAggregator[] value) {
-                    int offset = 0;
-                    for (int i = 0; i < dimensions.trueBitCount(); i++) {
-                        int c = dimensions.trueBitAt(i);
-                        final int columnLength = 
info.codeSystem.maxCodeLength(c);
-                        secondRecord.set(c, new ByteArray(key, offset, 
columnLength));
-                        offset += columnLength;
+                    @Override
+                    public GTRecord next() {
+                        Pair<byte[], MeasureAggregator[]> entry = it.next();
+                        returningRecord.load(entry.getKey(), entry.getValue());
+                        return returningRecord.record;
                     }
-                    metricsBuf.clear();
-                    for (int i = 0; i < value.length; i++) {
-                        int col = metrics.trueBitAt(i);
-                        int pos = metricsBuf.position();
-                        info.codeSystem.encodeColumnValue(col, 
value[i].getState(), metricsBuf);
-                        secondRecord.cols[col].set(metricsBuf.array(), pos, 
metricsBuf.position() - pos);
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
                     }
-                }
+                };
+            }
+        }
 
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
+        class ReturningRecord {
+            final GTRecord record = new GTRecord(info);
+            final ByteBuffer metricsBuf = 
ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+
+            void load(byte[] key, MeasureAggregator[] value) {
+                int offset = 0;
+                for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                    int c = dimensions.trueBitAt(i);
+                    final int columnLength = info.codeSystem.maxCodeLength(c);
+                    record.cols[c].set(key, offset, columnLength);
+                    offset += columnLength;
+                }
+                metricsBuf.clear();
+                for (int i = 0; i < value.length; i++) {
+                    int col = metrics.trueBitAt(i);
+                    int pos = metricsBuf.position();
+                    info.codeSystem.encodeColumnValue(col, 
value[i].getState(), metricsBuf);
+                    record.cols[col].set(metricsBuf.array(), pos, 
metricsBuf.position() - pos);
                 }
-            };
+            }
         }
 
-        class Dump implements Iterable<Entry<byte[], MeasureAggregator[]>> {
+        class Dump implements Iterable<Pair<byte[], MeasureAggregator[]>> {
             File dumpedFile;
             Input input;
             SortedMap<byte[], MeasureAggregator[]> buffMap;
@@ -318,7 +349,7 @@ public class GTAggregateScanner implements IGTScanner {
             }
 
             @Override
-            public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
+            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
                 try {
                     if (dumpedFile == null || !dumpedFile.exists()) {
                         throw new RuntimeException("Dumped file cannot be 
found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
@@ -327,7 +358,7 @@ public class GTAggregateScanner implements IGTScanner {
                     input = new Input(new FileInputStream(dumpedFile));
 
                     final int count = kryo.readObject(input, Integer.class);
-                    return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
+                    return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
                         int cursorIdx = 0;
 
                         @Override
@@ -336,10 +367,10 @@ public class GTAggregateScanner implements IGTScanner {
                         }
 
                         @Override
-                        public Entry<byte[], MeasureAggregator[]> next() {
+                        public Pair<byte[], MeasureAggregator[]> next() {
                             try {
                                 cursorIdx++;
-                                return (ImmutablePair<byte[], 
MeasureAggregator[]>) kryo.readObject(input, ImmutablePair.class);
+                                return (Pair<byte[], MeasureAggregator[]>) 
kryo.readObject(input, Pair.class);
                             } catch (Exception e) {
                                 throw new RuntimeException("Cannot read 
AggregationCache from dumped file: " + e.getMessage());
                             }
@@ -365,7 +396,7 @@ public class GTAggregateScanner implements IGTScanner {
                         output = new Output(new FileOutputStream(dumpedFile));
                         kryo.writeObject(output, buffMap.size());
                         for (Entry<byte[], MeasureAggregator[]> entry : 
buffMap.entrySet()) {
-                            kryo.writeObject(output, new 
ImmutablePair(entry.getKey(), entry.getValue()));
+                            kryo.writeObject(output, new Pair(entry.getKey(), 
entry.getValue()));
                         }
                     } finally {
                         buffMap = null;
@@ -384,28 +415,28 @@ public class GTAggregateScanner implements IGTScanner {
             }
         }
 
-        class DumpMerger implements Iterable<Entry<byte[], 
MeasureAggregator[]>> {
-            final PriorityQueue<Entry<byte[], Integer>> minHeap;
-            final List<Iterator<Entry<byte[], MeasureAggregator[]>>> 
dumpIterators;
+        class DumpMerger implements Iterable<Pair<byte[], 
MeasureAggregator[]>> {
+            final PriorityQueue<Pair<byte[], Integer>> minHeap;
+            final List<Iterator<Pair<byte[], MeasureAggregator[]>>> 
dumpIterators;
             final List<MeasureAggregator[]> dumpCurrentValues;
 
             public DumpMerger(List<Dump> dumps) {
-                minHeap = new PriorityQueue<>(dumps.size(), new 
Comparator<Entry<byte[], Integer>>() {
+                minHeap = new PriorityQueue<>(dumps.size(), new 
Comparator<Pair<byte[], Integer>>() {
                     @Override
-                    public int compare(Entry<byte[], Integer> o1, 
Entry<byte[], Integer> o2) {
-                        return bytesComparator.compare(o1.getKey(), 
o2.getKey());
+                    public int compare(Pair<byte[], Integer> o1, Pair<byte[], 
Integer> o2) {
+                        return bytesComparator.compare(o1.getFirst(), 
o2.getFirst());
                     }
                 });
                 dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
                 dumpCurrentValues = 
Lists.newArrayListWithCapacity(dumps.size());
 
-                Iterator<Entry<byte[], MeasureAggregator[]>> it;
+                Iterator<Pair<byte[], MeasureAggregator[]>> it;
                 for (int i = 0; i < dumps.size(); i++) {
                     it = dumps.get(i).iterator();
                     if (it.hasNext()) {
                         dumpIterators.add(i, it);
-                        Entry<byte[], MeasureAggregator[]> entry = it.next();
-                        minHeap.offer(new ImmutablePair(entry.getKey(), i));
+                        Pair<byte[], MeasureAggregator[]> entry = it.next();
+                        minHeap.offer(new Pair(entry.getKey(), i));
                         dumpCurrentValues.add(i, entry.getValue());
                     } else {
                         dumpIterators.add(i, null);
@@ -416,30 +447,30 @@ public class GTAggregateScanner implements IGTScanner {
 
             private void enqueueFromDump(int index) {
                 if (dumpIterators.get(index) != null && 
dumpIterators.get(index).hasNext()) {
-                    Entry<byte[], MeasureAggregator[]> entry = 
dumpIterators.get(index).next();
-                    minHeap.offer(new ImmutablePair(entry.getKey(), index));
+                    Pair<byte[], MeasureAggregator[]> entry = 
dumpIterators.get(index).next();
+                    minHeap.offer(new Pair(entry.getKey(), index));
                     dumpCurrentValues.set(index, entry.getValue());
                 }
             }
 
             @Override
-            public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
-                return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
+            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
+                return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
                     @Override
                     public boolean hasNext() {
-                        return !CollectionUtils.isEmpty(minHeap);
+                        return !minHeap.isEmpty();
                     }
 
                     @Override
-                    public Entry<byte[], MeasureAggregator[]> next() {
+                    public Pair<byte[], MeasureAggregator[]> next() {
                         // Use minimum heap to merge sort the keys,
                         // also do aggregation for measures with same keys in 
different dumps
-                        Entry<byte[], Integer> peekEntry = minHeap.poll();
+                        Pair<byte[], Integer> peekEntry = minHeap.poll();
                         MeasureAggregator[] mergedAggr = 
dumpCurrentValues.get(peekEntry.getValue());
                         enqueueFromDump(peekEntry.getValue());
 
                         while (!minHeap.isEmpty() && 
bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) {
-                            Entry<byte[], Integer> newPeek = minHeap.poll();
+                            Pair<byte[], Integer> newPeek = minHeap.poll();
 
                             MeasureAggregator[] newPeekAggr = 
dumpCurrentValues.get(newPeek.getValue());
                             for (int i = 0; i < newPeekAggr.length; i++) {
@@ -449,7 +480,7 @@ public class GTAggregateScanner implements IGTScanner {
                             enqueueFromDump(newPeek.getValue());
                         }
 
-                        return new ImmutablePair(peekEntry.getKey(), 
mergedAggr);
+                        return new Pair(peekEntry.getKey(), mergedAggr);
                     }
 
                     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 2c284c9..4d26bbe 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -29,6 +29,7 @@ public class GTScanRequest {
 
     // hint to storage behavior
     private boolean allowPreAggregation = true;
+    private double aggrCacheGB = 0; // no limit
 
     public GTScanRequest(GTInfo info) {
         this(info, null, null, null);
@@ -118,7 +119,7 @@ public class GTScanRequest {
     }
 
     public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
-        return decorateScanner(scanner, true, true, false);//by default do not 
check mem
+        return decorateScanner(scanner, true, true);//by default do not check 
mem
     }
 
     /**
@@ -127,7 +128,7 @@ public class GTScanRequest {
      * 
      * Refer to CoprocessorBehavior for explanation
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, 
boolean doAggr, boolean doMemCheck) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, 
boolean doAggr) throws IOException {
         IGTScanner result = scanner;
         if (!doFilter) { //Skip reading this section if you're not profiling! 
             lookAndForget(result);
@@ -144,13 +145,13 @@ public class GTScanRequest {
             }
 
             if (this.allowPreAggregation && this.hasAggregation()) {
-                result = new GTAggregateScanner(result, this, doMemCheck);
+                result = new GTAggregateScanner(result, this);
             }
             return result;
         }
     }
 
-    //touch every byte of the cell so that the cost of scanning will be trully 
reflected
+    //touch every byte of the cell so that the cost of scanning will be truly 
reflected
     private void lookAndForget(IGTScanner scanner) {
         byte meaninglessByte = 0;
         for (GTRecord gtRecord : scanner) {
@@ -214,6 +215,14 @@ public class GTScanRequest {
     public String[] getAggrMetricsFuncs() {
         return aggrMetricsFuncs;
     }
+    
+    public double getAggrCacheGB() {
+        return aggrCacheGB;
+    }
+    
+    public void setAggrCacheGB(double gb) {
+        this.aggrCacheGB = gb;
+    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
 
b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index 606b591..e6d0258 100644
--- 
a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ 
b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -44,7 +44,7 @@ public class AggregationCacheSpillTest {
     }
 
     @Test
-    public void testAggregationCacheSpill() {
+    public void testAggregationCacheSpill() throws IOException {
         final List<GTRecord> testData = 
Lists.newArrayListWithCapacity(data.size() * 2);
         testData.addAll(data);
         testData.addAll(data);
@@ -71,13 +71,16 @@ public class AggregationCacheSpillTest {
         };
 
         GTScanRequest scanRequest = new GTScanRequest(info, null, new 
ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), 
new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+        scanRequest.setAggrCacheGB(0.5); // 500 MB
 
-        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, 
scanRequest, false);
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, 
scanRequest);
 
         int count = 0;
         for (GTRecord record : scanner) {
-            count++;
+            if (record != null)
+                count++;
         }
         assertEquals(data.size(), count);
+        scanner.close();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3759738..6feed33 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -147,14 +147,18 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             innerScanner = region.getScanner(scan);
             InnerScannerAsIterator cellListIterator = new 
InnerScannerAsIterator(innerScanner);
 
+            CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
+            if (behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+                if (scanReq.getAggrCacheGB() <= 0)
+                    scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit 
from v1.0
+            }
+
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, 
hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
             IGTScanner rawScanner = store.scan(scanReq);
 
-            CoprocessorBehavior behavior = 
CoprocessorBehavior.valueOf(request.getBehavior());
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,//
                     behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER.ordinal(),//
-                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(),//
-                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal());//
+                    behavior.ordinal() >= 
CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
 
             ByteBuffer buffer = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 

Reply via email to