Repository: kylin Updated Branches: refs/heads/2.0-rc 19676e6ea -> 7b53cad50
KYLIN-1233 1) drop unnecessary dependency; 2) configurable spill threshold; 3) optimize for all-in-mem case Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7b53cad5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7b53cad5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7b53cad5 Branch: refs/heads/2.0-rc Commit: 7b53cad50f2105e481f0b9f96651d5a8b453e35e Parents: 19676e6 Author: Yang Li <[email protected]> Authored: Sat Dec 26 19:09:02 2015 +0800 Committer: Yang Li <[email protected]> Committed: Sat Dec 26 19:09:02 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/common/util/Pair.java | 18 +- .../cube/inmemcubing/InMemCubeBuilder.java | 2 +- .../kylin/gridtable/GTAggregateScanner.java | 181 +++++++++++-------- .../apache/kylin/gridtable/GTScanRequest.java | 17 +- .../gridtable/AggregationCacheSpillTest.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 10 +- 6 files changed, 150 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-common/src/main/java/org/apache/kylin/common/util/Pair.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java index 9efdf3b..b54e517 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java @@ -66,6 +66,10 @@ public class Pair<T1, T2> implements Serializable { public void setFirst(T1 a) { this.first = a; } + + public void setKey(T1 a) { + setFirst(a); + } /** * Replace the second element of the pair. @@ -75,6 +79,10 @@ public class Pair<T1, T2> implements Serializable { this.second = b; } + public void setValue(T2 b) { + setSecond(b); + } + /** * Return the first element stored in the pair. * @return T1 @@ -83,6 +91,10 @@ public class Pair<T1, T2> implements Serializable { return first; } + public T1 getKey() { + return getFirst(); + } + /** * Return the second element stored in the pair. * @return T2 @@ -90,13 +102,17 @@ public class Pair<T1, T2> implements Serializable { public T2 getSecond() { return second; } + + public T2 getValue() { + return getSecond(); + } private static boolean equals(Object x, Object y) { return (x == null && y == null) || (x != null && x.equals(y)); } + @SuppressWarnings("rawtypes") @Override - @SuppressWarnings("unchecked") public boolean equals(Object other) { return other instanceof Pair && equals(first, ((Pair) other).first) && equals(second, ((Pair) other).second); } http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 73f89c1..97b2d5e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -333,7 +333,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount); GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); - GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, true); + GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req); aggregationScanner.trackMemoryLevel(baseCuboidMemTracker); int count = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 46d51a4..0dd9e61 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -13,12 +13,11 @@ import java.util.Map.Entry; import java.util.PriorityQueue; import java.util.SortedMap; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.util.KryoUtils; import org.apache.kylin.metadata.measure.MeasureAggregator; import org.slf4j.Logger; @@ -33,7 +32,6 @@ import com.google.common.collect.Maps; @SuppressWarnings({ "rawtypes", "unchecked" }) public class GTAggregateScanner implements IGTScanner { - @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class); final GTInfo info; @@ -43,12 +41,12 @@ public class GTAggregateScanner implements IGTScanner { final String[] metricsAggrFuncs; final IGTScanner inputScanner; final AggregationCache aggrCache; - final boolean enableMemCheck; + final long spillThreshold; private int aggregatedRowCount = 0; private MemoryWaterLevel memTracker; - public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) { + public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) { if (!req.hasAggregation()) throw new IllegalStateException(); @@ -59,7 +57,7 @@ public class GTAggregateScanner implements IGTScanner { this.metricsAggrFuncs = req.getAggrMetricsFuncs(); this.inputScanner = inputScanner; this.aggrCache = new AggregationCache(); - this.enableMemCheck = enableMemCheck; + this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB); } public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) { @@ -111,19 +109,19 @@ public class GTAggregateScanner implements IGTScanner { for (GTRecord r : inputScanner) { aggrCache.aggregate(r); } - logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache()); - aggrCache.spillBuffMap(); return aggrCache.iterator(); } + public int getNumOfSpills() { + return aggrCache.dumps.size(); + } + /** return the estimate memory size of aggregation cache */ public long getEstimateSizeOfAggrCache() { return aggrCache.estimatedMemSize(); } class AggregationCache implements Closeable { - final static double SPILL_THRESHOLD_GB = 0.5; - final List<Dump> dumps; final int keyLength; final boolean[] compareMask; @@ -201,17 +199,16 @@ public class GTAggregateScanner implements IGTScanner { } void aggregate(GTRecord r) { - if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) { + if (++aggregatedRowCount % 1000 == 0) { if (memTracker != null) { memTracker.markHigh(); } - } - - // Here will spill to disk when aggBufMap used too large memory - long estimated = estimatedMemSize(); - if (estimated > SPILL_THRESHOLD_GB * MemoryBudgetController.ONE_GB) { - logger.info("AggregationCache memory estimated size is: " + estimated); - spillBuffMap(); + if (spillThreshold > 0) { + // spill to disk when aggBufMap used too large memory + if (estimatedMemSize() > spillThreshold) { + spillBuffMap(); + } + } } final byte[] key = createKey(r); @@ -228,6 +225,9 @@ public class GTAggregateScanner implements IGTScanner { } private void spillBuffMap() throws RuntimeException { + if (aggBufMap.isEmpty()) + return; + try { Dump dump = new Dump(aggBufMap); dump.flush(); @@ -263,52 +263,83 @@ public class GTAggregateScanner implements IGTScanner { } public Iterator<GTRecord> iterator() { - final DumpMerger merger = new DumpMerger(dumps); + // the all-in-mem case + if (dumps.isEmpty()) { + return new Iterator<GTRecord>() { + final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator(); + final ReturningRecord returningRecord = new ReturningRecord(); - return new Iterator<GTRecord>() { + @Override + public boolean hasNext() { + return it.hasNext(); + } - final Iterator<Entry<byte[], MeasureAggregator[]>> it = merger.iterator(); + @Override + public GTRecord next() { + Entry<byte[], MeasureAggregator[]> entry = it.next(); + returningRecord.load(entry.getKey(), entry.getValue()); + return returningRecord.record; + } - final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); - final GTRecord secondRecord = new GTRecord(info); + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + // the spill case + else { + logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache()); + this.spillBuffMap(); - @Override - public boolean hasNext() { - return it.hasNext(); - } + return new Iterator<GTRecord>() { + final DumpMerger merger = new DumpMerger(dumps); + final Iterator<Pair<byte[], MeasureAggregator[]>> it = merger.iterator(); + final ReturningRecord returningRecord = new ReturningRecord(); - @Override - public GTRecord next() { - Entry<byte[], MeasureAggregator[]> entry = it.next(); - create(entry.getKey(), entry.getValue()); - return secondRecord; - } + @Override + public boolean hasNext() { + return it.hasNext(); + } - private void create(byte[] key, MeasureAggregator[] value) { - int offset = 0; - for (int i = 0; i < dimensions.trueBitCount(); i++) { - int c = dimensions.trueBitAt(i); - final int columnLength = info.codeSystem.maxCodeLength(c); - secondRecord.set(c, new ByteArray(key, offset, columnLength)); - offset += columnLength; + @Override + public GTRecord next() { + Pair<byte[], MeasureAggregator[]> entry = it.next(); + returningRecord.load(entry.getKey(), entry.getValue()); + return returningRecord.record; } - metricsBuf.clear(); - for (int i = 0; i < value.length; i++) { - int col = metrics.trueBitAt(i); - int pos = metricsBuf.position(); - info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); - secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); + + @Override + public void remove() { + throw new UnsupportedOperationException(); } - } + }; + } + } - @Override - public void remove() { - throw new UnsupportedOperationException(); + class ReturningRecord { + final GTRecord record = new GTRecord(info); + final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); + + void load(byte[] key, MeasureAggregator[] value) { + int offset = 0; + for (int i = 0; i < dimensions.trueBitCount(); i++) { + int c = dimensions.trueBitAt(i); + final int columnLength = info.codeSystem.maxCodeLength(c); + record.cols[c].set(key, offset, columnLength); + offset += columnLength; + } + metricsBuf.clear(); + for (int i = 0; i < value.length; i++) { + int col = metrics.trueBitAt(i); + int pos = metricsBuf.position(); + info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf); + record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos); } - }; + } } - class Dump implements Iterable<Entry<byte[], MeasureAggregator[]>> { + class Dump implements Iterable<Pair<byte[], MeasureAggregator[]>> { File dumpedFile; Input input; SortedMap<byte[], MeasureAggregator[]> buffMap; @@ -318,7 +349,7 @@ public class GTAggregateScanner implements IGTScanner { } @Override - public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() { + public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() { try { if (dumpedFile == null || !dumpedFile.exists()) { throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); @@ -327,7 +358,7 @@ public class GTAggregateScanner implements IGTScanner { input = new Input(new FileInputStream(dumpedFile)); final int count = kryo.readObject(input, Integer.class); - return new Iterator<Entry<byte[], MeasureAggregator[]>>() { + return new Iterator<Pair<byte[], MeasureAggregator[]>>() { int cursorIdx = 0; @Override @@ -336,10 +367,10 @@ public class GTAggregateScanner implements IGTScanner { } @Override - public Entry<byte[], MeasureAggregator[]> next() { + public Pair<byte[], MeasureAggregator[]> next() { try { cursorIdx++; - return (ImmutablePair<byte[], MeasureAggregator[]>) kryo.readObject(input, ImmutablePair.class); + return (Pair<byte[], MeasureAggregator[]>) kryo.readObject(input, Pair.class); } catch (Exception e) { throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage()); } @@ -365,7 +396,7 @@ public class GTAggregateScanner implements IGTScanner { output = new Output(new FileOutputStream(dumpedFile)); kryo.writeObject(output, buffMap.size()); for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) { - kryo.writeObject(output, new ImmutablePair(entry.getKey(), entry.getValue())); + kryo.writeObject(output, new Pair(entry.getKey(), entry.getValue())); } } finally { buffMap = null; @@ -384,28 +415,28 @@ public class GTAggregateScanner implements IGTScanner { } } - class DumpMerger implements Iterable<Entry<byte[], MeasureAggregator[]>> { - final PriorityQueue<Entry<byte[], Integer>> minHeap; - final List<Iterator<Entry<byte[], MeasureAggregator[]>>> dumpIterators; + class DumpMerger implements Iterable<Pair<byte[], MeasureAggregator[]>> { + final PriorityQueue<Pair<byte[], Integer>> minHeap; + final List<Iterator<Pair<byte[], MeasureAggregator[]>>> dumpIterators; final List<MeasureAggregator[]> dumpCurrentValues; public DumpMerger(List<Dump> dumps) { - minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Entry<byte[], Integer>>() { + minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Pair<byte[], Integer>>() { @Override - public int compare(Entry<byte[], Integer> o1, Entry<byte[], Integer> o2) { - return bytesComparator.compare(o1.getKey(), o2.getKey()); + public int compare(Pair<byte[], Integer> o1, Pair<byte[], Integer> o2) { + return bytesComparator.compare(o1.getFirst(), o2.getFirst()); } }); dumpIterators = Lists.newArrayListWithCapacity(dumps.size()); dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size()); - Iterator<Entry<byte[], MeasureAggregator[]>> it; + Iterator<Pair<byte[], MeasureAggregator[]>> it; for (int i = 0; i < dumps.size(); i++) { it = dumps.get(i).iterator(); if (it.hasNext()) { dumpIterators.add(i, it); - Entry<byte[], MeasureAggregator[]> entry = it.next(); - minHeap.offer(new ImmutablePair(entry.getKey(), i)); + Pair<byte[], MeasureAggregator[]> entry = it.next(); + minHeap.offer(new Pair(entry.getKey(), i)); dumpCurrentValues.add(i, entry.getValue()); } else { dumpIterators.add(i, null); @@ -416,30 +447,30 @@ public class GTAggregateScanner implements IGTScanner { private void enqueueFromDump(int index) { if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) { - Entry<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next(); - minHeap.offer(new ImmutablePair(entry.getKey(), index)); + Pair<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next(); + minHeap.offer(new Pair(entry.getKey(), index)); dumpCurrentValues.set(index, entry.getValue()); } } @Override - public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() { - return new Iterator<Entry<byte[], MeasureAggregator[]>>() { + public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() { + return new Iterator<Pair<byte[], MeasureAggregator[]>>() { @Override public boolean hasNext() { - return !CollectionUtils.isEmpty(minHeap); + return !minHeap.isEmpty(); } @Override - public Entry<byte[], MeasureAggregator[]> next() { + public Pair<byte[], MeasureAggregator[]> next() { // Use minimum heap to merge sort the keys, // also do aggregation for measures with same keys in different dumps - Entry<byte[], Integer> peekEntry = minHeap.poll(); + Pair<byte[], Integer> peekEntry = minHeap.poll(); MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue()); enqueueFromDump(peekEntry.getValue()); while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) { - Entry<byte[], Integer> newPeek = minHeap.poll(); + Pair<byte[], Integer> newPeek = minHeap.poll(); MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue()); for (int i = 0; i < newPeekAggr.length; i++) { @@ -449,7 +480,7 @@ public class GTAggregateScanner implements IGTScanner { enqueueFromDump(newPeek.getValue()); } - return new ImmutablePair(peekEntry.getKey(), mergedAggr); + return new Pair(peekEntry.getKey(), mergedAggr); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 2c284c9..4d26bbe 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -29,6 +29,7 @@ public class GTScanRequest { // hint to storage behavior private boolean allowPreAggregation = true; + private double aggrCacheGB = 0; // no limit public GTScanRequest(GTInfo info) { this(info, null, null, null); @@ -118,7 +119,7 @@ public class GTScanRequest { } public IGTScanner decorateScanner(IGTScanner scanner) throws IOException { - return decorateScanner(scanner, true, true, false);//by default do not check mem + return decorateScanner(scanner, true, true);//by default do not check mem } /** @@ -127,7 +128,7 @@ public class GTScanRequest { * * Refer to CoprocessorBehavior for explanation */ - public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, boolean doMemCheck) throws IOException { + public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException { IGTScanner result = scanner; if (!doFilter) { //Skip reading this section if you're not profiling! lookAndForget(result); @@ -144,13 +145,13 @@ public class GTScanRequest { } if (this.allowPreAggregation && this.hasAggregation()) { - result = new GTAggregateScanner(result, this, doMemCheck); + result = new GTAggregateScanner(result, this); } return result; } } - //touch every byte of the cell so that the cost of scanning will be trully reflected + //touch every byte of the cell so that the cost of scanning will be truly reflected private void lookAndForget(IGTScanner scanner) { byte meaninglessByte = 0; for (GTRecord gtRecord : scanner) { @@ -214,6 +215,14 @@ public class GTScanRequest { public String[] getAggrMetricsFuncs() { return aggrMetricsFuncs; } + + public double getAggrCacheGB() { + return aggrCacheGB; + } + + public void setAggrCacheGB(double gb) { + this.aggrCacheGB = gb; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java index 606b591..e6d0258 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java @@ -44,7 +44,7 @@ public class AggregationCacheSpillTest { } @Test - public void testAggregationCacheSpill() { + public void testAggregationCacheSpill() throws IOException { final List<GTRecord> testData = Lists.newArrayListWithCapacity(data.size() * 2); testData.addAll(data); testData.addAll(data); @@ -71,13 +71,16 @@ public class AggregationCacheSpillTest { }; GTScanRequest scanRequest = new GTScanRequest(info, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true); + scanRequest.setAggrCacheGB(0.5); // 500 MB - GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, false); + GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest); int count = 0; for (GTRecord record : scanner) { - count++; + if (record != null) + count++; } assertEquals(data.size(), count); + scanner.close(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/7b53cad5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 3759738..6feed33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -147,14 +147,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement innerScanner = region.getScanner(scan); InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner); + CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); + if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { + if (scanReq.getAggrCacheGB() <= 0) + scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit from v1.0 + } + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize()); IGTScanner rawScanner = store.scan(scanReq); - CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior()); IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,// behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(),// - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(),// - behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal());// + behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()); ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
