Repository: kylin Updated Branches: refs/heads/2.0-rc e6b55540a -> cd30e48e3
KYLIN-1233 Spill to disk when AggregationCache need too much memory Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd30e48e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd30e48e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd30e48e Branch: refs/heads/2.0-rc Commit: cd30e48e387e4de44428eb508740dff18642a9c4 Parents: e6b5554 Author: lidongsjtu <don...@ebay.com> Authored: Sat Dec 19 22:19:13 2015 +0800 Committer: lidongsjtu <don...@ebay.com> Committed: Sun Dec 20 11:47:42 2015 +0800 ---------------------------------------------------------------------- .../common/hll/HyperLogLogPlusCounter.java | 2 +- .../org/apache/kylin/cube/util/KryoUtils.java | 2 +- .../kylin/gridtable/GTAggregateScanner.java | 306 ++++++++++++++++--- .../apache/kylin/gridtable/UnitTestSupport.java | 44 ++- .../gridtable/AggregationCacheSpillTest.java | 83 +++++ 5 files changed, 384 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java index ef91509..11ae78f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java +++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java @@ -42,7 +42,7 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog private final int p; private final int m; - private final HashFunction hashFunc; + private transient final HashFunction hashFunc; byte[] registers; public HyperLogLogPlusCounter() { http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java index 9dbe0d2..48f925a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java @@ -48,7 +48,7 @@ public class KryoUtils { return deserialize(bytes, clazz); } - private static Kryo getKryo() { + public static Kryo getKryo() { if (_Kryo.get() == null) { Kryo kryo = new Kryo(); kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 0ed320c..46d51a4 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -1,20 +1,33 @@ package org.apache.kylin.gridtable; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; +import java.util.PriorityQueue; import java.util.SortedMap; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel; +import org.apache.kylin.cube.util.KryoUtils; import org.apache.kylin.metadata.measure.MeasureAggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -36,7 +49,7 @@ public class GTAggregateScanner implements IGTScanner { private MemoryWaterLevel memTracker; public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) { - if (req.hasAggregation() == false) + if (!req.hasAggregation()) throw new IllegalStateException(); this.info = inputScanner.getInfo(); @@ -49,6 +62,30 @@ public class GTAggregateScanner implements IGTScanner { this.enableMemCheck = enableMemCheck; } + public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) { + // Aggregation cache is basically a tree map. The tree map entry overhead is + // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/ + // - 41~52 according to AggregationCacheMemSizeTest + return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size; + } + + public static long estimateSizeOf(MeasureAggregator[] aggrs) { + // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc.. + // Memory alignment to 8 bytes + long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */); + for (MeasureAggregator aggr : aggrs) { + if (aggr != null) + est += aggr.getMemBytesEstimate(); + } + return est; + } + + public static long estimateSizeOf(byte[] bytes) { + // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16] + // Memory alignment to 8 bytes + return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */); + } + public void trackMemoryLevel(MemoryWaterLevel tracker) { this.memTracker = tracker; } @@ -66,6 +103,7 @@ public class GTAggregateScanner implements IGTScanner { @Override public void close() throws IOException { inputScanner.close(); + aggrCache.close(); } @Override @@ -73,6 +111,8 @@ public class GTAggregateScanner implements IGTScanner { for (GTRecord r : inputScanner) { aggrCache.aggregate(r); } + logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache()); + aggrCache.spillBuffMap(); return aggrCache.iterator(); } @@ -81,35 +121,44 @@ public class GTAggregateScanner implements IGTScanner { return aggrCache.estimatedMemSize(); } - class AggregationCache { - final SortedMap<byte[], MeasureAggregator[]> aggBufMap; + class AggregationCache implements Closeable { + final static double SPILL_THRESHOLD_GB = 0.5; + + final List<Dump> dumps; final int keyLength; final boolean[] compareMask; - public AggregationCache() { - compareMask = createCompareMask(); - keyLength = compareMask.length; - aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() { - @Override - public int compare(byte[] o1, byte[] o2) { - int result = 0; - // profiler shows this check is slow - // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length); - for (int i = 0; i < keyLength; ++i) { - if (compareMask[i]) { - int a = (o1[i] & 0xff); - int b = (o2[i] & 0xff); - result = a - b; - if (result == 0) { - continue; - } else { - return result; - } + final Kryo kryo = KryoUtils.getKryo(); + + final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() { + @Override + public int compare(byte[] o1, byte[] o2) { + int result = 0; + // profiler shows this check is slow + // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length); + for (int i = 0; i < keyLength; ++i) { + if (compareMask[i]) { + int a = (o1[i] & 0xff); + int b = (o2[i] & 0xff); + result = a - b; + if (result == 0) { + continue; + } else { + return result; } } - return result; } - }); + return result; + } + }; + + SortedMap<byte[], MeasureAggregator[]> aggBufMap; + + public AggregationCache() { + compareMask = createCompareMask(); + keyLength = compareMask.length; + dumps = Lists.newArrayList(); + aggBufMap = createBuffMap(); } private boolean[] createCompareMask() { @@ -133,6 +182,10 @@ public class GTAggregateScanner implements IGTScanner { return mask; } + private SortedMap<byte[], MeasureAggregator[]> createBuffMap() { + return Maps.newTreeMap(bytesComparator); + } + private byte[] createKey(GTRecord record) { byte[] result = new byte[keyLength]; int offset = 0; @@ -152,10 +205,13 @@ public class GTAggregateScanner implements IGTScanner { if (memTracker != null) { memTracker.markHigh(); } - long estimated = estimatedMemSize(); - if (estimated > 10 * MemoryBudgetController.ONE_GB) { - throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated); - } + } + + // Here will spill to disk when aggBufMap used too large memory + long estimated = estimatedMemSize(); + if (estimated > SPILL_THRESHOLD_GB * MemoryBudgetController.ONE_GB) { + logger.info("AggregationCache memory estimated size is: " + estimated); + spillBuffMap(); } final byte[] key = createKey(r); @@ -171,6 +227,28 @@ public class GTAggregateScanner implements IGTScanner { } } + private void spillBuffMap() throws RuntimeException { + try { + Dump dump = new Dump(aggBufMap); + dump.flush(); + dumps.add(dump); + aggBufMap = createBuffMap(); + } catch (Exception e) { + throw new RuntimeException("AggregationCache spill failed: " + e.getMessage()); + } + } + + @Override + public void close() throws RuntimeException { + try { + for (Dump dump : dumps) { + dump.terminate(); + } + } catch (Exception e) { + throw new RuntimeException("AggregationCache close failed: " + e.getMessage()); + } + } + private MeasureAggregator[] newAggregators() { return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs); } @@ -185,9 +263,11 @@ public class GTAggregateScanner implements IGTScanner { } public Iterator<GTRecord> iterator() { + final DumpMerger merger = new DumpMerger(dumps); + return new Iterator<GTRecord>() { - final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator(); + final Iterator<Entry<byte[], MeasureAggregator[]>> it = merger.iterator(); final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics)); final GTRecord secondRecord = new GTRecord(info); @@ -227,29 +307,157 @@ public class GTAggregateScanner implements IGTScanner { } }; } - } - public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) { - // Aggregation cache is basically a tree map. The tree map entry overhead is - // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/ - // - 41~52 according to AggregationCacheMemSizeTest - return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size; - } + class Dump implements Iterable<Entry<byte[], MeasureAggregator[]>> { + File dumpedFile; + Input input; + SortedMap<byte[], MeasureAggregator[]> buffMap; - public static long estimateSizeOf(MeasureAggregator[] aggrs) { - // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc.. - // Memory alignment to 8 bytes - long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */); - for (MeasureAggregator aggr : aggrs) { - if (aggr != null) - est += aggr.getMemBytesEstimate(); + public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException { + this.buffMap = buffMap; + } + + @Override + public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() { + try { + if (dumpedFile == null || !dumpedFile.exists()) { + throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); + } + + input = new Input(new FileInputStream(dumpedFile)); + + final int count = kryo.readObject(input, Integer.class); + return new Iterator<Entry<byte[], MeasureAggregator[]>>() { + int cursorIdx = 0; + + @Override + public boolean hasNext() { + return cursorIdx < count; + } + + @Override + public Entry<byte[], MeasureAggregator[]> next() { + try { + cursorIdx++; + return (ImmutablePair<byte[], MeasureAggregator[]>) kryo.readObject(input, ImmutablePair.class); + } catch (Exception e) { + throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage()); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } catch (Exception e) { + throw new RuntimeException("Failed to read dumped file: " + e.getMessage()); + } + } + + public void flush() throws IOException { + if (buffMap != null) { + Output output = null; + try { + dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp"); + + logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath()); + output = new Output(new FileOutputStream(dumpedFile)); + kryo.writeObject(output, buffMap.size()); + for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) { + kryo.writeObject(output, new ImmutablePair(entry.getKey(), entry.getValue())); + } + } finally { + buffMap = null; + if (output != null) + output.close(); + } + } + } + + public void terminate() throws IOException { + buffMap = null; + if (input != null) + input.close(); + if (dumpedFile != null && dumpedFile.exists()) + dumpedFile.delete(); + } } - return est; - } - public static long estimateSizeOf(byte[] bytes) { - // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16] - // Memory alignment to 8 bytes - return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */); + class DumpMerger implements Iterable<Entry<byte[], MeasureAggregator[]>> { + final PriorityQueue<Entry<byte[], Integer>> minHeap; + final List<Iterator<Entry<byte[], MeasureAggregator[]>>> dumpIterators; + final List<MeasureAggregator[]> dumpCurrentValues; + + public DumpMerger(List<Dump> dumps) { + minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Entry<byte[], Integer>>() { + @Override + public int compare(Entry<byte[], Integer> o1, Entry<byte[], Integer> o2) { + return bytesComparator.compare(o1.getKey(), o2.getKey()); + } + }); + dumpIterators = Lists.newArrayListWithCapacity(dumps.size()); + dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size()); + + Iterator<Entry<byte[], MeasureAggregator[]>> it; + for (int i = 0; i < dumps.size(); i++) { + it = dumps.get(i).iterator(); + if (it.hasNext()) { + dumpIterators.add(i, it); + Entry<byte[], MeasureAggregator[]> entry = it.next(); + minHeap.offer(new ImmutablePair(entry.getKey(), i)); + dumpCurrentValues.add(i, entry.getValue()); + } else { + dumpIterators.add(i, null); + dumpCurrentValues.add(i, null); + } + } + } + + private void enqueueFromDump(int index) { + if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) { + Entry<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next(); + minHeap.offer(new ImmutablePair(entry.getKey(), index)); + dumpCurrentValues.set(index, entry.getValue()); + } + } + + @Override + public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() { + return new Iterator<Entry<byte[], MeasureAggregator[]>>() { + @Override + public boolean hasNext() { + return !CollectionUtils.isEmpty(minHeap); + } + + @Override + public Entry<byte[], MeasureAggregator[]> next() { + // Use minimum heap to merge sort the keys, + // also do aggregation for measures with same keys in different dumps + Entry<byte[], Integer> peekEntry = minHeap.poll(); + MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue()); + enqueueFromDump(peekEntry.getValue()); + + while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) { + Entry<byte[], Integer> newPeek = minHeap.poll(); + + MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue()); + for (int i = 0; i < newPeekAggr.length; i++) { + mergedAggr[i].aggregate(newPeekAggr[i].getState()); + } + + enqueueFromDump(newPeek.getValue()); + } + + return new ImmutablePair(peekEntry.getKey(), mergedAggr); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java index 1a9f637..aea627a 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo.Builder; @@ -44,6 +45,23 @@ public class UnitTestSupport { return info; } + public static GTInfo hllInfo() { + Builder builder = GTInfo.builder(); + builder.setCodeSystem(new GTSampleCodeSystem()); + builder.setColumns( // + DataType.getInstance("varchar(10)"), // + DataType.getInstance("varchar(10)"), // + DataType.getInstance("varchar(10)"), // + DataType.getInstance("bigint"), // + DataType.getInstance("decimal"), // + DataType.getInstance("hllc14") // + ); + builder.setPrimaryKey(setOf(0)); + builder.setColumnPreferIndex(setOf(0)); + GTInfo info = builder.build(); + return info; + } + private static Builder infoBuilder() { Builder builder = GTInfo.builder(); builder.setCodeSystem(new GTSampleCodeSystem()); @@ -81,15 +99,37 @@ public class UnitTestSupport { return result; } + public static List<GTRecord> mockupHllData(GTInfo info, int nRows) { + List<GTRecord> result = new ArrayList<GTRecord>(nRows); + int round = nRows / 10; + for (int i = 0; i < round; i++) { + String d_01_14 = datePlus("2015-01-14", i * 4); + String d_01_15 = datePlus("2015-01-15", i * 4); + String d_01_16 = datePlus("2015-01-16", i * 4); + String d_01_17 = datePlus("2015-01-17", i * 4); + result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + } + return result; + } + private static String datePlus(String date, int plusDays) { long millis = DateFormat.stringToMillis(date); millis += (1000L * 3600L * 24L) * plusDays; return DateFormat.formatToDateStr(millis); } - private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) { + private static GTRecord newRec(GTInfo info, Object... values) { GTRecord rec = new GTRecord(info); - return rec.setValues(date, name, category, amount, price); + return rec.setValues(values); } private static ImmutableBitSet setOf(int... values) { http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java new file mode 100644 index 0000000..606b591 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.kylin.common.util.ImmutableBitSet; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Created by dongli on 12/16/15. + */ +public class AggregationCacheSpillTest { + + final GTInfo info = UnitTestSupport.hllInfo(); + final List<GTRecord> data = UnitTestSupport.mockupHllData(info, 40000); // converts to about 34 MB data + + @BeforeClass + public static void beforeClass() { + System.setProperty("log4j.configuration", "kylin-log4j.properties"); + } + + @Test + public void testAggregationCacheSpill() { + final List<GTRecord> testData = Lists.newArrayListWithCapacity(data.size() * 2); + testData.addAll(data); + testData.addAll(data); + + IGTScanner inputScanner = new IGTScanner() { + @Override + public GTInfo getInfo() { + return info; + } + + @Override + public int getScannedRowCount() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + } + + @Override + public Iterator<GTRecord> iterator() { + return testData.iterator(); + } + }; + + GTScanRequest scanRequest = new GTScanRequest(info, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true); + + GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, false); + + int count = 0; + for (GTRecord record : scanner) { + count++; + } + assertEquals(data.size(), count); + } +} \ No newline at end of file