KYLIN-1832 HyperLogLog performance optimization Signed-off-by: Li Yang <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f05404d5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f05404d5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f05404d5 Branch: refs/heads/KYLIN-2277 Commit: f05404d5576b52c70cf26eb1bccde1c27cd3852f Parents: 5303651 Author: xiefan46 <[email protected]> Authored: Fri Dec 9 16:53:04 2016 +0800 Committer: Li Yang <[email protected]> Committed: Wed Dec 14 11:07:42 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/util/CubingUtils.java | 14 +- .../apache/kylin/gridtable/UnitTestSupport.java | 22 +- .../benchmark/GTScannerBenchmark2.java | 4 +- .../gridtable/AggregationCacheMemSizeTest.java | 4 +- .../metadata/measure/MeasureCodecTest.java | 4 +- .../org/apache/kylin/measure/MeasureType.java | 2 +- .../kylin/measure/MeasureTypeFactory.java | 2 +- .../kylin/measure/hllc/DenseRegister.java | 91 +++++ .../kylin/measure/hllc/HLLCAggregator.java | 10 +- .../kylin/measure/hllc/HLLCMeasureType.java | 20 +- .../kylin/measure/hllc/HLLCSerializer.java | 16 +- .../measure/hllc/HLLDistinctCountAggFunc.java | 22 +- .../measure/hllc/HyperLogLogPlusCounter.java | 392 ------------------- .../measure/hllc/HyperLogLogPlusCounterNew.java | 388 ++++++++++++++++++ .../measure/hllc/HyperLogLogPlusCounterOld.java | 392 +++++++++++++++++++ .../org/apache/kylin/measure/hllc/Register.java | 37 ++ .../apache/kylin/measure/hllc/RegisterType.java | 25 ++ .../kylin/measure/hllc/SparseRegister.java | 98 +++++ .../measure/AggregatorMemEstimateTest.java | 4 +- .../measure/hll/HyperLogLogCounterOldTest.java | 265 +++++++++++++ .../measure/hll/HyperLogLogCounterTest.java | 265 ------------- .../measure/hll2/HyperLogLogCounterNewTest.java | 301 ++++++++++++++ .../hll2/NewHyperLogLogBenchmarkTest.java | 288 ++++++++++++++ .../kylin/engine/mr/common/CubeStatsReader.java | 12 +- .../kylin/engine/mr/common/CubeStatsWriter.java | 6 +- .../mr/steps/FactDistinctColumnsReducer.java | 8 +- .../mr/steps/FactDistinctHiveColumnsMapper.java | 10 +- .../engine/mr/steps/MergeStatisticsStep.java | 6 +- .../kylin/engine/mr/steps/CubeSamplingTest.java | 8 +- .../steps/FactDistinctColumnsReducerTest.java | 4 +- .../apache/kylin/engine/spark/SparkCubing.java | 28 +- .../cardinality/ColumnCardinalityMapper.java | 12 +- .../cardinality/ColumnCardinalityReducer.java | 12 +- .../ColumnCardinalityReducerTest.java | 4 +- 34 files changed, 2002 insertions(+), 774 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index 413b907..35139a4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -38,7 +38,7 @@ import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; @@ -59,7 +59,7 @@ public class CubingUtils { private static Logger logger = LoggerFactory.getLogger(CubingUtils.class); - public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { + public static Map<Long, HyperLogLogPlusCounterNew> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc); final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length; final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds(); @@ -84,9 +84,9 @@ public class CubingUtils { return result; } }); - final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); + final Map<Long, HyperLogLogPlusCounterNew> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); for (Long cuboidId : allCuboidIds) { - result.put(cuboidId, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); + result.put(cuboidId, new HyperLogLogPlusCounterNew(cubeDesc.getConfig().getCubeStatsHLLPrecision())); Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)]; long mask = Long.highestOneBit(baseCuboidId); @@ -118,9 +118,9 @@ public class CubingUtils { } } - for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) { - Long cuboidId = longHyperLogLogPlusCounterEntry.getKey(); - HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue(); + for (Map.Entry<Long, HyperLogLogPlusCounterNew> longHyperLogLogPlusCounterNewEntry : result.entrySet()) { + Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey(); + HyperLogLogPlusCounterNew counter = longHyperLogLogPlusCounterNewEntry.getValue(); Hasher hc = hf.newHasher(); final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId); for (int position = 0; position < cuboidBitSet.length; position++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java index 3396fd2..6cbf237 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo.Builder; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.LongMutable; @@ -106,16 +106,16 @@ public class UnitTestSupport { String d_01_15 = datePlus("2015-01-15", i * 4); String d_01_16 = datePlus("2015-01-16", i * 4); String d_01_17 = datePlus("2015-01-17", i * 4); - result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); - result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14))); + result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); + result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounterNew(14))); } return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java index 40a5e01..f80bd24 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java @@ -34,7 +34,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator.Randomizer; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; @@ -80,7 +80,7 @@ public class GTScannerBenchmark2 { gen.addDimension(100, 4, null); gen.addMeasure(8); gen.addMeasure(8, new Randomizer() { - HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(12); + HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(12); @Override public int fillRandom(Random rand, byte[] array, int offset) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java index 00c0bd0..66a6b51 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java @@ -26,7 +26,7 @@ import org.apache.kylin.measure.basic.LongSumAggregator; import org.apache.kylin.measure.bitmap.BitmapAggregator; import org.apache.kylin.measure.bitmap.BitmapCounter; import org.apache.kylin.measure.hllc.HLLCAggregator; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.github.jamm.MemoryMeter; @@ -105,7 +105,7 @@ public class AggregationCacheMemSizeTest { private HLLCAggregator createHLLCAggr() { HLLCAggregator hllcAggregator = new HLLCAggregator(14); - hllcAggregator.aggregate(new HyperLogLogPlusCounter(14)); + hllcAggregator.aggregate(new HyperLogLogPlusCounterNew(14)); return hllcAggregator; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java index 18680ec..cd1aa96 100644 --- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java +++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.bitmap.BitmapCounter; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; @@ -57,7 +57,7 @@ public class MeasureCodecTest extends LocalFileMetadataTestCase { DoubleMutable d = new DoubleMutable(1.0); LongMutable l = new LongMutable(2); BigDecimal b = new BigDecimal("333.1234"); - HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(16); + HyperLogLogPlusCounterNew hllc = new HyperLogLogPlusCounterNew(16); hllc.add("1234567"); hllc.add("abcdefg"); BitmapCounter bitmap = new BitmapCounter(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index de1b442..031636e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -36,7 +36,7 @@ import org.apache.kylin.metadata.tuple.TupleInfo; * MeasureType captures how a kind of aggregation is defined, how it is calculated * during cube build, and how it is involved in query and storage scan. * - * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounter + * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounterOld */ abstract public class MeasureType<T> { http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index c5bd482..d94dec9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -62,7 +62,7 @@ import com.google.common.collect.Maps; } </pre> * - * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounter + * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounterOld */ abstract public class MeasureTypeFactory<T> { http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java new file mode 100644 index 0000000..26ee6ab --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/DenseRegister.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.hllc; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +/** + * Created by xiefan on 16-12-9. + */ +public class DenseRegister implements Register { + private int p; + + private int m; + + private byte[] register; + + public DenseRegister(int p) { + this.m = 1 << p; + this.register = new byte[m]; + } + + public void set(int pos, byte value) { + register[pos] = value; + } + + @Override + public Byte get(int pos) { + return register[pos]; + } + + @Override + public void merge(Register another) { + if (another instanceof DenseRegister) { + DenseRegister dr = (DenseRegister) another; + for (int i = 0; i < register.length; i++) { + if (dr.register[i] > register[i]) + register[i] = dr.register[i]; + } + } else { + SparseRegister sr = (SparseRegister) another; + Collection<Map.Entry<Integer, Byte>> allValue = sr.getAllValue(); + for (Map.Entry<Integer, Byte> entry : allValue) { + if (entry.getValue() > register[entry.getKey()]) + register[entry.getKey()] = entry.getValue(); + } + } + } + + @Override + public void clear() { + byte zero = (byte) 0; + Arrays.fill(register, zero); + } + + @Override + public int getSize() { + int size = 0; + for (int i = 0; i < m; i++) { + if (register[i] > 0) + size++; + } + return size; + } + + @Override + public int getHashCode() { + return Arrays.hashCode(register); + } + + public byte[] getRawRegister() { + return this.register; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java index aea2df1..ca73285 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java @@ -23,10 +23,10 @@ import org.apache.kylin.measure.MeasureAggregator; /** */ @SuppressWarnings("serial") -public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> { +public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounterNew> { final int precision; - HyperLogLogPlusCounter sum = null; + HyperLogLogPlusCounterNew sum = null; public HLLCAggregator(int precision) { this.precision = precision; @@ -38,15 +38,15 @@ public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> { } @Override - public void aggregate(HyperLogLogPlusCounter value) { + public void aggregate(HyperLogLogPlusCounterNew value) { if (sum == null) - sum = new HyperLogLogPlusCounter(value); + sum = new HyperLogLogPlusCounterNew(value); else sum.merge(value); } @Override - public HyperLogLogPlusCounter getState() { + public HyperLogLogPlusCounterNew getState() { return sum; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index 0e58dca..481fa4e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -33,15 +33,15 @@ import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.ImmutableMap; -public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { +public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounterNew> { public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT; public static final String DATATYPE_HLLC = "hllc"; - public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounter> { + public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounterNew> { @Override - public MeasureType<HyperLogLogPlusCounter> createMeasureType(String funcName, DataType dataType) { + public MeasureType<HyperLogLogPlusCounterNew> createMeasureType(String funcName, DataType dataType) { return new HLLCMeasureType(funcName, dataType); } @@ -56,7 +56,7 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { } @Override - public Class<? extends DataTypeSerializer<HyperLogLogPlusCounter>> getAggrDataTypeSerializer() { + public Class<? extends DataTypeSerializer<HyperLogLogPlusCounterNew>> getAggrDataTypeSerializer() { return HLLCSerializer.class; } } @@ -91,13 +91,13 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { } @Override - public MeasureIngester<HyperLogLogPlusCounter> newIngester() { - return new MeasureIngester<HyperLogLogPlusCounter>() { - HyperLogLogPlusCounter current = new HyperLogLogPlusCounter(dataType.getPrecision()); + public MeasureIngester<HyperLogLogPlusCounterNew> newIngester() { + return new MeasureIngester<HyperLogLogPlusCounterNew>() { + HyperLogLogPlusCounterNew current = new HyperLogLogPlusCounterNew(dataType.getPrecision()); @Override - public HyperLogLogPlusCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - HyperLogLogPlusCounter hllc = current; + public HyperLogLogPlusCounterNew valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + HyperLogLogPlusCounterNew hllc = current; hllc.clear(); for (String v : values) { if (v != null) @@ -109,7 +109,7 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { } @Override - public MeasureAggregator<HyperLogLogPlusCounter> newAggregator() { + public MeasureAggregator<HyperLogLogPlusCounterNew> newAggregator() { return new HLLCAggregator(dataType.getPrecision()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 4d08b6f..1d01abc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -28,10 +28,10 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer; * @author yangli9 * */ -public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { +public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounterNew> { // be thread-safe and avoid repeated obj creation - private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>(); + private ThreadLocal<HyperLogLogPlusCounterNew> current = new ThreadLocal<HyperLogLogPlusCounterNew>(); private int precision; @@ -40,7 +40,7 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { } @Override - public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) { + public void serialize(HyperLogLogPlusCounterNew value, ByteBuffer out) { try { value.writeRegisters(out); } catch (IOException e) { @@ -48,18 +48,18 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { } } - private HyperLogLogPlusCounter current() { - HyperLogLogPlusCounter hllc = current.get(); + private HyperLogLogPlusCounterNew current() { + HyperLogLogPlusCounterNew hllc = current.get(); if (hllc == null) { - hllc = new HyperLogLogPlusCounter(precision); + hllc = new HyperLogLogPlusCounterNew(precision); current.set(hllc); } return hllc; } @Override - public HyperLogLogPlusCounter deserialize(ByteBuffer in) { - HyperLogLogPlusCounter hllc = current(); + public HyperLogLogPlusCounterNew deserialize(ByteBuffer in) { + HyperLogLogPlusCounterNew hllc = current(); try { hllc.readRegisters(in); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java index 8f2a0fa..a72ad09 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java @@ -31,21 +31,21 @@ public class HLLDistinctCountAggFunc { private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class); - public static HyperLogLogPlusCounter init() { + public static HyperLogLogPlusCounterNew init() { return null; } - public static HyperLogLogPlusCounter initAdd(Object v) { + public static HyperLogLogPlusCounterNew initAdd(Object v) { if (v instanceof Long) { // holistic case long l = (Long) v; return new FixedValueHLLCMockup(l); } else { - HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; - return new HyperLogLogPlusCounter(c); + HyperLogLogPlusCounterNew c = (HyperLogLogPlusCounterNew) v; + return new HyperLogLogPlusCounterNew(c); } } - public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) { + public static HyperLogLogPlusCounterNew add(HyperLogLogPlusCounterNew counter, Object v) { if (v instanceof Long) { // holistic case long l = (Long) v; if (counter == null) { @@ -58,9 +58,9 @@ public class HLLDistinctCountAggFunc { return counter; } } else { - HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; + HyperLogLogPlusCounterNew c = (HyperLogLogPlusCounterNew) v; if (counter == null) { - return new HyperLogLogPlusCounter(c); + return new HyperLogLogPlusCounterNew(c); } else { counter.merge(c); return counter; @@ -68,16 +68,16 @@ public class HLLDistinctCountAggFunc { } } - public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) { + public static HyperLogLogPlusCounterNew merge(HyperLogLogPlusCounterNew counter0, Object counter1) { return add(counter0, counter1); } - public static long result(HyperLogLogPlusCounter counter) { + public static long result(HyperLogLogPlusCounterNew counter) { return counter == null ? 0L : counter.getCountEstimate(); } @SuppressWarnings("serial") - private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter { + private static class FixedValueHLLCMockup extends HyperLogLogPlusCounterNew { private Long value = null; @@ -107,7 +107,7 @@ public class HLLDistinctCountAggFunc { } @Override - public void merge(HyperLogLogPlusCounter another) { + public void merge(HyperLogLogPlusCounterNew another) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java deleted file mode 100644 index 00407f9..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounter.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hllc; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Arrays; - -import org.apache.kylin.common.util.BytesUtil; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - -/** - * About compression, test on HLLC data shows - * - * - LZF compression ratio is around 65%-80%, fast - * - GZIP compression ratio is around 41%-46%, very slow - * - * @author yangli9 - */ -@SuppressWarnings("serial") -public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLogLogPlusCounter> { - - private final int p; - private final int m; - private final HashFunction hashFunc; - byte[] registers; - int singleBucket; - - public HyperLogLogPlusCounter() { - this(10); - } - - public HyperLogLogPlusCounter(int p) { - this(p, Hashing.murmur3_128()); - } - - public HyperLogLogPlusCounter(HyperLogLogPlusCounter another) { - this(another.p, another.hashFunc); - merge(another); - } - - /** The larger p is, the more storage (2^p bytes), the better accuracy */ - private HyperLogLogPlusCounter(int p, HashFunction hashFunc) { - this.p = p; - this.m = 1 << p;//(int) Math.pow(2, p); - this.hashFunc = hashFunc; - this.registers = new byte[m]; - this.singleBucket = -1; - } - - public void clear() { - byte zero = (byte) 0; - if (singleBucket == -1) { - //nothing - } else if (singleBucket >= 0) { - registers[singleBucket] = 0; - } else { - Arrays.fill(registers, zero); - } - singleBucket = -1; - } - - public void add(int value) { - add(hashFunc.hashInt(value).asLong()); - } - - public void add(String value) { - add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); - } - - public void add(byte[] value) { - add(hashFunc.hashBytes(value).asLong()); - } - - public void add(byte[] value, int offset, int length) { - add(hashFunc.hashBytes(value, offset, length).asLong()); - } - - protected void add(long hash) { - int bucketMask = m - 1; - int bucket = (int) (hash & bucketMask); - int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; - - if (firstOnePos > registers[bucket]) - registers[bucket] = (byte) firstOnePos; - - if (singleBucket == -1) - singleBucket = bucket; - else - singleBucket = Integer.MIN_VALUE; - } - - public void merge(HyperLogLogPlusCounter another) { - assert this.p == another.p; - assert this.hashFunc == another.hashFunc; - - // quick path for single value HLLC - if (another.singleBucket == -1) { - return; - } else if (another.singleBucket >= 0) { - int b = another.singleBucket; - if (registers[b] < another.registers[b]) - registers[b] = another.registers[b]; - } else { - // normal path - for (int i = 0; i < m; i++) { - if (registers[i] < another.registers[i]) - registers[i] = another.registers[i]; - } - } - singleBucket = Integer.MIN_VALUE; - } - - public long getCountEstimate() { - return new HLLCSnapshot(this).getCountEstimate(); - } - - public int getPrecision() { - return this.p; - } - - public double getErrorRate() { - return 1.04 / Math.sqrt(m); - } - - private int size() { - if (singleBucket == -1) { - return 0; - } else if (singleBucket >= 0) { - return 1; - } else { - int size = 0; - for (int i = 0; i < m; i++) { - if (registers[i] > 0) - size++; - } - return size; - } - } - - @Override - public String toString() { - return "" + getCountEstimate(); - } - - // ============================================================================ - - // a memory efficient snapshot of HLL registers which can yield count - // estimate later - public static class HLLCSnapshot { - byte p; - double registerSum; - int zeroBuckets; - - public HLLCSnapshot(HyperLogLogPlusCounter hllc) { - p = (byte) hllc.p; - registerSum = 0; - zeroBuckets = 0; - - byte[] registers = hllc.registers; - for (int i = 0; i < hllc.m; i++) { - if (registers[i] == 0) { - registerSum++; - zeroBuckets++; - } else { - registerSum += 1.0 / (1L << registers[i]); - } - } - } - - public long getCountEstimate() { - int m = 1 << p; - double alpha = 0.7213 / (1 + 1.079 / m); - double estimate = alpha * m * m / registerSum; - - // small cardinality adjustment - if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) - estimate = m * Math.log(m * 1.0 / zeroBuckets); - } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { - estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); - } - - return Math.round(estimate); - } - } - - // ============================================================================ - - public void writeRegisters(final ByteBuffer out) throws IOException { - - final int indexLen = getRegisterIndexSize(); - int size = size(); - - // decide output scheme -- map (3*size bytes) or array (2^p bytes) - byte scheme; - if (5 + (indexLen + 1) * size < m) // 5 is max len of vint - scheme = 0; // map - else - scheme = 1; // array - out.put(scheme); - - if (scheme == 0) { // map scheme - BytesUtil.writeVInt(size, out); - if (singleBucket == -1) { - // no non-zero register - } else if (singleBucket >= 0) { - writeUnsigned(singleBucket, indexLen, out); - out.put(registers[singleBucket]); - } else { - for (int i = 0; i < m; i++) { - if (registers[i] > 0) { - writeUnsigned(i, indexLen, out); - out.put(registers[i]); - } - } - } - } else if (scheme == 1) { // array scheme - out.put(registers); - } else - throw new IllegalStateException(); - } - - public void readRegisters(ByteBuffer in) throws IOException { - byte scheme = in.get(); - - if (scheme == 0) { // map scheme - clear(); - int size = BytesUtil.readVInt(in); - if (size > m) - throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); - int indexLen = getRegisterIndexSize(); - int key = 0; - for (int i = 0; i < size; i++) { - key = readUnsigned(in, indexLen); - registers[key] = in.get(); - } - - if (size == 0) - singleBucket = -1; - else if (size == 1) - singleBucket = key; - else - singleBucket = Integer.MIN_VALUE; - - } else if (scheme == 1) { // array scheme - in.get(registers); - singleBucket = Integer.MIN_VALUE; - } else - throw new IllegalStateException(); - } - - public int peekLength(ByteBuffer in) { - int mark = in.position(); - int len; - - byte scheme = in.get(); - if (scheme == 0) { // map scheme - int size = BytesUtil.readVInt(in); - int indexLen = getRegisterIndexSize(); - len = in.position() - mark + (indexLen + 1) * size; - } else { - len = in.position() - mark + m; - } - - in.position(mark); - return len; - } - - public int maxLength() { - return 1 + m; - } - - public void writeRegistersArray(final ByteBuffer out) { - out.put(this.registers); - } - - public void readRegistersArray(ByteBuffer in) { - in.get(registers, 0, m); - singleBucket = Integer.MIN_VALUE; - } - - private int getRegisterIndexSize() { - return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); - result = prime * result + p; - result = prime * result + Arrays.hashCode(registers); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HyperLogLogPlusCounter other = (HyperLogLogPlusCounter) obj; - if (hashFunc == null) { - if (other.hashFunc != null) - return false; - } else if (!hashFunc.equals(other.hashFunc)) - return false; - if (p != other.p) - return false; - if (!Arrays.equals(registers, other.registers)) - return false; - return true; - } - - @Override - public int compareTo(HyperLogLogPlusCounter o) { - if (o == null) - return 1; - - long e1 = this.getCountEstimate(); - long e2 = o.getCountEstimate(); - - if (e1 == e2) - return 0; - else if (e1 > e2) - return 1; - else - return -1; - } - - public static void main(String[] args) throws IOException { - dumpErrorRates(); - } - - static void dumpErrorRates() { - for (int p = 10; p <= 18; p++) { - double rate = new HyperLogLogPlusCounter(p).getErrorRate(); - double er = Math.round(rate * 10000) / 100D; - double er2 = Math.round(rate * 2 * 10000) / 100D; - double er3 = Math.round(rate * 3 * 10000) / 100D; - long size = Math.round(Math.pow(2, p)); - System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); - } - } - - /** - * - * @param num - * @param size - * @param out - */ - public static void writeUnsigned(int num, int size, ByteBuffer out) { - for (int i = 0; i < size; i++) { - out.put((byte) num); - num >>>= 8; - } - } - - public static int readUnsigned(ByteBuffer in, int size) { - int integer = 0; - int mask = 0xff; - int shift = 0; - for (int i = 0; i < size; i++) { - integer |= (in.get() << shift) & mask; - mask = mask << 8; - shift += 8; - } - return integer; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java new file mode 100644 index 0000000..d7329f6 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterNew.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.kylin.common.util.BytesUtil; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Map; + +@SuppressWarnings("serial") +public class HyperLogLogPlusCounterNew implements Serializable, Comparable<HyperLogLogPlusCounterNew> { + + private int p; + + private int m; + + private HashFunction hashFunc = Hashing.murmur3_128(); + + private Register register; + + public static double overflowFactor = 0.01; + + public HyperLogLogPlusCounterNew(int p, RegisterType type, HashFunction hashFunc) { + this.p = p; + this.m = 1 << p;//(int) Math.pow(2, p); + this.hashFunc = hashFunc; + if (type == RegisterType.SPARSE) { + double over = overflowFactor * m; + this.register = new SparseRegister((int) over); + } else { + this.register = new DenseRegister(p); + } + } + + public HyperLogLogPlusCounterNew() { + this(10, RegisterType.SPARSE, Hashing.murmur3_128()); + } + + public HyperLogLogPlusCounterNew(int p) { + this(p, RegisterType.SPARSE, Hashing.murmur3_128()); + } + + public HyperLogLogPlusCounterNew(int p, RegisterType type) { + this(p, type, Hashing.murmur3_128()); + } + + public HyperLogLogPlusCounterNew(int p, HashFunction hashFunc) { + this(p, RegisterType.SPARSE, hashFunc); + } + + public HyperLogLogPlusCounterNew(HyperLogLogPlusCounterNew another) { + this(another.p, another.hashFunc); + merge(another); + } + + public void add(int value) { + add(hashFunc.hashInt(value).asLong()); + } + + public void add(String value) { + add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); + } + + public void add(byte[] value) { + add(hashFunc.hashBytes(value).asLong()); + } + + public void add(byte[] value, int offset, int length) { + add(hashFunc.hashBytes(value, offset, length).asLong()); + } + + protected void add(long hash) { + int bucketMask = m - 1; + int bucket = (int) (hash & bucketMask); + int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; + Byte b = register.get(bucket); + if (b == null || (byte) firstOnePos > b) { + register.set(bucket, (byte) firstOnePos); + } + if (register instanceof SparseRegister) { + if (((SparseRegister) register).isOverThreshold()) { + register = ((SparseRegister) register).toDense(p); + } + } + } + + public void merge(HyperLogLogPlusCounterNew another) { + assert this.p == another.p; + assert this.hashFunc == another.hashFunc; + if (register instanceof SparseRegister && another.register instanceof SparseRegister) { + register.merge(another.register); + if (((SparseRegister) register).isOverThreshold()) { + register = ((SparseRegister) register).toDense(p); + } + } else if (register instanceof SparseRegister && another.register instanceof DenseRegister) { + register = ((SparseRegister) register).toDense(p); + register.merge(another.register); + } else { + register.merge(another.register); + } + } + + public long getCountEstimate() { + return new HLLCSnapshot(this).getCountEstimate(); + } + + public int getPrecision() { + return this.p; + } + + public double getErrorRate() { + return 1.04 / Math.sqrt(m); + } + + @Override + public String toString() { + return "" + getCountEstimate(); + } + + // ============================================================================ + + // a memory efficient snapshot of HLL registers which can yield count + // estimate later + public static class HLLCSnapshot { + byte p; + double registerSum; + int zeroBuckets; + + public HLLCSnapshot(HyperLogLogPlusCounterNew hllc) { + p = (byte) hllc.p; + registerSum = 0; + zeroBuckets = 0; + Register register = hllc.getRegister(); + DenseRegister dr; + if (register instanceof SparseRegister) { + dr = ((SparseRegister) register).toDense(p); + } else { + dr = (DenseRegister) register; + } + byte[] registers = dr.getRawRegister(); + for (int i = 0; i < hllc.m; i++) { + if (registers[i] == 0) { + registerSum++; + zeroBuckets++; + } else { + registerSum += 1.0 / (1L << registers[i]); + } + } + } + + public long getCountEstimate() { + int m = 1 << p; + double alpha = 0.7213 / (1 + 1.079 / m); + double estimate = alpha * m * m / registerSum; + + // small cardinality adjustment + if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) + estimate = m * Math.log(m * 1.0 / zeroBuckets); + } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { + estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); + } + + return Math.round(estimate); + } + } + + public static void main(String[] args) throws IOException { + dumpErrorRates(); + } + + static void dumpErrorRates() { + for (int p = 10; p <= 18; p++) { + double rate = new HyperLogLogPlusCounterNew(p, RegisterType.SPARSE).getErrorRate(); + double er = Math.round(rate * 10000) / 100D; + double er2 = Math.round(rate * 2 * 10000) / 100D; + double er3 = Math.round(rate * 3 * 10000) / 100D; + long size = Math.round(Math.pow(2, p)); + System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); + } + } + + public Register getRegister() { + return register; + } + + public void clear() { + register.clear(); + } + + public RegisterType getRegisterType() { + if (register instanceof SparseRegister) + return RegisterType.SPARSE; + else + return RegisterType.DENSE; + } + + // ============================================================================ + + public void writeRegisters(final ByteBuffer out) throws IOException { + + final int indexLen = getRegisterIndexSize(); + int size = size(); + + // decide output scheme -- map (3*size bytes) or array (2^p bytes) + byte scheme; + //byte type; + if (register instanceof SparseRegister || 5 + (indexLen + 1) * size < m) { + scheme = 0; //map + } else { + scheme = 1; // array + } + out.put(scheme); + if (scheme == 0) { // map scheme + BytesUtil.writeVInt(size, out); + if (register instanceof SparseRegister) { //sparseãregister + Collection<Map.Entry<Integer, Byte>> allValue = ((SparseRegister) register).getAllValue(); + for (Map.Entry<Integer, Byte> entry : allValue) { + writeUnsigned(entry.getKey(), indexLen, out); + out.put(entry.getValue()); + } + } else { //dense register + byte[] registers = ((DenseRegister) register).getRawRegister(); + for (int i = 0; i < m; i++) { + if (registers[i] > 0) { + writeUnsigned(i, indexLen, out); + out.put(registers[i]); + } + } + } + } else if (scheme == 1) { // array scheme + out.put(((DenseRegister) register).getRawRegister()); + } else + throw new IllegalStateException(); + } + + public void readRegisters(ByteBuffer in) throws IOException { + byte scheme = in.get(); + if (scheme == 0) { // map scheme + clear(); + int size = BytesUtil.readVInt(in); + if (size > m) + throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); + double over = overflowFactor * m; + if (size > (int) over) { + this.register = new DenseRegister(p); + } else { + this.register = new SparseRegister((int) over);//default is sparse + } + int indexLen = getRegisterIndexSize(); + int key = 0; + for (int i = 0; i < size; i++) { + key = readUnsigned(in, indexLen); + register.set(key, in.get()); + } + } else if (scheme == 1) { // array scheme + this.register = new DenseRegister(p); + for (int i = 0; i < m; i++) { + register.set(i, in.get()); + } + } else + throw new IllegalStateException(); + } + + public int peekLength(ByteBuffer in) { + int mark = in.position(); + int len; + byte scheme = in.get(); + if (scheme == 0) { // map scheme + int size = BytesUtil.readVInt(in); + int indexLen = getRegisterIndexSize(); + len = in.position() - mark + (indexLen + 1) * size; + } else { + len = in.position() - mark + m; + } + + in.position(mark); + return len; + } + + public int maxLength() { + return 1 + m; + } + + private int getRegisterIndexSize() { + return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); + result = prime * result + p; + result = prime * result + register.getHashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HyperLogLogPlusCounterNew other = (HyperLogLogPlusCounterNew) obj; + if (hashFunc == null) { + if (other.hashFunc != null) + return false; + } else if (!hashFunc.equals(other.hashFunc)) + return false; + if (p != other.p) + return false; + if (this.getRegisterType() != other.getRegisterType()) + return false; + if (register.getHashCode() != other.register.getHashCode()) + return false; + return true; + } + + @Override + public int compareTo(HyperLogLogPlusCounterNew o) { + if (o == null) + return 1; + + long e1 = this.getCountEstimate(); + long e2 = o.getCountEstimate(); + + if (e1 == e2) + return 0; + else if (e1 > e2) + return 1; + else + return -1; + } + + /** + * + * @param num + * @param size + * @param out + */ + public static void writeUnsigned(int num, int size, ByteBuffer out) { + for (int i = 0; i < size; i++) { + out.put((byte) num); + num >>>= 8; + } + } + + public static int readUnsigned(ByteBuffer in, int size) { + int integer = 0; + int mask = 0xff; + int shift = 0; + for (int i = 0; i < size; i++) { + integer |= (in.get() << shift) & mask; + mask = mask << 8; + shift += 8; + } + return integer; + } + + private int size() { + return register.getSize(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java new file mode 100644 index 0000000..cb5533e --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HyperLogLogPlusCounterOld.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +import org.apache.kylin.common.util.BytesUtil; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +/** + * About compression, test on HLLC data shows + * + * - LZF compression ratio is around 65%-80%, fast + * - GZIP compression ratio is around 41%-46%, very slow + * + * @author yangli9 + */ +@SuppressWarnings("serial") +public class HyperLogLogPlusCounterOld implements Serializable, Comparable<HyperLogLogPlusCounterOld> { + + private final int p; + private final int m; + private final HashFunction hashFunc; + byte[] registers; + int singleBucket; + + public HyperLogLogPlusCounterOld() { + this(10); + } + + public HyperLogLogPlusCounterOld(int p) { + this(p, Hashing.murmur3_128()); + } + + public HyperLogLogPlusCounterOld(HyperLogLogPlusCounterOld another) { + this(another.p, another.hashFunc); + merge(another); + } + + /** The larger p is, the more storage (2^p bytes), the better accuracy */ + private HyperLogLogPlusCounterOld(int p, HashFunction hashFunc) { + this.p = p; + this.m = 1 << p;//(int) Math.pow(2, p); + this.hashFunc = hashFunc; + this.registers = new byte[m]; + this.singleBucket = -1; + } + + public void clear() { + byte zero = (byte) 0; + if (singleBucket == -1) { + //nothing + } else if (singleBucket >= 0) { + registers[singleBucket] = 0; + } else { + Arrays.fill(registers, zero); + } + singleBucket = -1; + } + + public void add(int value) { + add(hashFunc.hashInt(value).asLong()); + } + + public void add(String value) { + add(hashFunc.hashString(value, Charset.defaultCharset()).asLong()); + } + + public void add(byte[] value) { + add(hashFunc.hashBytes(value).asLong()); + } + + public void add(byte[] value, int offset, int length) { + add(hashFunc.hashBytes(value, offset, length).asLong()); + } + + protected void add(long hash) { + int bucketMask = m - 1; + int bucket = (int) (hash & bucketMask); + int firstOnePos = Long.numberOfLeadingZeros(hash | bucketMask) + 1; + + if (firstOnePos > registers[bucket]) + registers[bucket] = (byte) firstOnePos; + + if (singleBucket == -1) + singleBucket = bucket; + else + singleBucket = Integer.MIN_VALUE; + } + + public void merge(HyperLogLogPlusCounterOld another) { + assert this.p == another.p; + assert this.hashFunc == another.hashFunc; + + // quick path for single value HLLC + if (another.singleBucket == -1) { + return; + } else if (another.singleBucket >= 0) { + int b = another.singleBucket; + if (registers[b] < another.registers[b]) + registers[b] = another.registers[b]; + } else { + // normal path + for (int i = 0; i < m; i++) { + if (registers[i] < another.registers[i]) + registers[i] = another.registers[i]; + } + } + singleBucket = Integer.MIN_VALUE; + } + + public long getCountEstimate() { + return new HLLCSnapshot(this).getCountEstimate(); + } + + public int getPrecision() { + return this.p; + } + + public double getErrorRate() { + return 1.04 / Math.sqrt(m); + } + + private int size() { + if (singleBucket == -1) { + return 0; + } else if (singleBucket >= 0) { + return 1; + } else { + int size = 0; + for (int i = 0; i < m; i++) { + if (registers[i] > 0) + size++; + } + return size; + } + } + + @Override + public String toString() { + return "" + getCountEstimate(); + } + + // ============================================================================ + + // a memory efficient snapshot of HLL registers which can yield count + // estimate later + public static class HLLCSnapshot { + byte p; + double registerSum; + int zeroBuckets; + + public HLLCSnapshot(HyperLogLogPlusCounterOld hllc) { + p = (byte) hllc.p; + registerSum = 0; + zeroBuckets = 0; + + byte[] registers = hllc.registers; + for (int i = 0; i < hllc.m; i++) { + if (registers[i] == 0) { + registerSum++; + zeroBuckets++; + } else { + registerSum += 1.0 / (1L << registers[i]); + } + } + } + + public long getCountEstimate() { + int m = 1 << p; + double alpha = 0.7213 / (1 + 1.079 / m); + double estimate = alpha * m * m / registerSum; + + // small cardinality adjustment + if (zeroBuckets >= m * 0.07) { // (reference presto's HLL impl) + estimate = m * Math.log(m * 1.0 / zeroBuckets); + } else if (HyperLogLogPlusTable.isBiasCorrection(m, estimate)) { + estimate = HyperLogLogPlusTable.biasCorrection(p, estimate); + } + + return Math.round(estimate); + } + } + + // ============================================================================ + + public void writeRegisters(final ByteBuffer out) throws IOException { + + final int indexLen = getRegisterIndexSize(); + int size = size(); + + // decide output scheme -- map (3*size bytes) or array (2^p bytes) + byte scheme; + if (5 + (indexLen + 1) * size < m) // 5 is max len of vint + scheme = 0; // map + else + scheme = 1; // array + out.put(scheme); + + if (scheme == 0) { // map scheme + BytesUtil.writeVInt(size, out); + if (singleBucket == -1) { + // no non-zero register + } else if (singleBucket >= 0) { + writeUnsigned(singleBucket, indexLen, out); + out.put(registers[singleBucket]); + } else { + for (int i = 0; i < m; i++) { + if (registers[i] > 0) { + writeUnsigned(i, indexLen, out); + out.put(registers[i]); + } + } + } + } else if (scheme == 1) { // array scheme + out.put(registers); + } else + throw new IllegalStateException(); + } + + public void readRegisters(ByteBuffer in) throws IOException { + byte scheme = in.get(); + + if (scheme == 0) { // map scheme + clear(); + int size = BytesUtil.readVInt(in); + if (size > m) + throw new IllegalArgumentException("register size (" + size + ") cannot be larger than m (" + m + ")"); + int indexLen = getRegisterIndexSize(); + int key = 0; + for (int i = 0; i < size; i++) { + key = readUnsigned(in, indexLen); + registers[key] = in.get(); + } + + if (size == 0) + singleBucket = -1; + else if (size == 1) + singleBucket = key; + else + singleBucket = Integer.MIN_VALUE; + + } else if (scheme == 1) { // array scheme + in.get(registers); + singleBucket = Integer.MIN_VALUE; + } else + throw new IllegalStateException(); + } + + public int peekLength(ByteBuffer in) { + int mark = in.position(); + int len; + + byte scheme = in.get(); + if (scheme == 0) { // map scheme + int size = BytesUtil.readVInt(in); + int indexLen = getRegisterIndexSize(); + len = in.position() - mark + (indexLen + 1) * size; + } else { + len = in.position() - mark + m; + } + + in.position(mark); + return len; + } + + public int maxLength() { + return 1 + m; + } + + /*public void writeRegistersArray(final ByteBuffer out) { + out.put(this.registers); + } + + public void readRegistersArray(ByteBuffer in) { + in.get(registers, 0, m); + singleBucket = Integer.MIN_VALUE; + }*/ + + private int getRegisterIndexSize() { + return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hashFunc == null) ? 0 : hashFunc.hashCode()); + result = prime * result + p; + result = prime * result + Arrays.hashCode(registers); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HyperLogLogPlusCounterOld other = (HyperLogLogPlusCounterOld) obj; + if (hashFunc == null) { + if (other.hashFunc != null) + return false; + } else if (!hashFunc.equals(other.hashFunc)) + return false; + if (p != other.p) + return false; + if (!Arrays.equals(registers, other.registers)) + return false; + return true; + } + + @Override + public int compareTo(HyperLogLogPlusCounterOld o) { + if (o == null) + return 1; + + long e1 = this.getCountEstimate(); + long e2 = o.getCountEstimate(); + + if (e1 == e2) + return 0; + else if (e1 > e2) + return 1; + else + return -1; + } + + public static void main(String[] args) throws IOException { + dumpErrorRates(); + } + + static void dumpErrorRates() { + for (int p = 10; p <= 18; p++) { + double rate = new HyperLogLogPlusCounterOld(p).getErrorRate(); + double er = Math.round(rate * 10000) / 100D; + double er2 = Math.round(rate * 2 * 10000) / 100D; + double er3 = Math.round(rate * 3 * 10000) / 100D; + long size = Math.round(Math.pow(2, p)); + System.out.println("HLLC" + p + ",\t" + size + " bytes,\t68% err<" + er + "%" + ",\t95% err<" + er2 + "%" + ",\t99.7% err<" + er3 + "%"); + } + } + + /** + * + * @param num + * @param size + * @param out + */ + public static void writeUnsigned(int num, int size, ByteBuffer out) { + for (int i = 0; i < size; i++) { + out.put((byte) num); + num >>>= 8; + } + } + + public static int readUnsigned(ByteBuffer in, int size) { + int integer = 0; + int mask = 0xff; + int shift = 0; + for (int i = 0; i < size; i++) { + integer |= (in.get() << shift) & mask; + mask = mask << 8; + shift += 8; + } + return integer; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java new file mode 100644 index 0000000..79c4bba --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/Register.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.hllc; + +/** + * Created by xiefan on 16-12-9. + */ +public interface Register { + + void set(int pos, byte value); + + Byte get(int pos); + + void merge(Register another); + + void clear(); + + int getSize(); + + int getHashCode(); + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/RegisterType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/RegisterType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/RegisterType.java new file mode 100644 index 0000000..fec9939 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/RegisterType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.hllc; + +/** + * Created by xiefan on 16-12-9. + */ +public enum RegisterType { + SPARSE, DENSE +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java new file mode 100644 index 0000000..d241e81 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/SparseRegister.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.hllc; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +/** + * Created by xiefan on 16-12-9. + */ +public class SparseRegister implements Register { + + private int overThreshold; + + private Map<Integer, Byte> sparseRegister = new TreeMap<>(); + + public SparseRegister(int overThreshold) { + this.overThreshold = overThreshold; + } + + public DenseRegister toDense(int p) { + DenseRegister dr = new DenseRegister(p); + for (Map.Entry<Integer, Byte> entry : sparseRegister.entrySet()) { + dr.set(entry.getKey(), entry.getValue()); + } + return dr; + } + + @Override + public void set(int pos, byte value) { + sparseRegister.put(pos, value); + } + + @Override + public Byte get(int pos) { + return sparseRegister.get(pos); + } + + @Override + public void merge(Register another) { + assert another instanceof SparseRegister; + SparseRegister sr = (SparseRegister) another; + for (Map.Entry<Integer, Byte> entry : sr.sparseRegister.entrySet()) { + Byte v = sparseRegister.get(entry.getKey()); + if (v == null || entry.getValue() > v) + sparseRegister.put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + sparseRegister.clear(); + } + + @Override + public int getSize() { + return sparseRegister.size(); + } + + @Override + public int getHashCode() { + final int prime = 31; + int result = 1; + for (Map.Entry<Integer, Byte> entry : sparseRegister.entrySet()) { + result = prime * result + entry.getKey(); + result = prime * result + entry.getValue(); + } + return result; + } + + public boolean isOverThreshold() { + if (this.sparseRegister.size() > overThreshold) + return true; + return false; + } + + public Collection<Map.Entry<Integer, Byte>> getAllValue() { + return Collections.unmodifiableCollection(sparseRegister.entrySet()); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java index 3adec73..103e721 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/AggregatorMemEstimateTest.java @@ -26,7 +26,7 @@ import org.apache.kylin.measure.bitmap.BitmapAggregator; import org.apache.kylin.measure.bitmap.BitmapCounter; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.measure.hllc.HLLCAggregator; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterNew; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; @@ -94,7 +94,7 @@ public class AggregatorMemEstimateTest extends LocalFileMetadataTestCase { @Test public void testAggregatorEstimate() { HLLCAggregator hllcAggregator = new HLLCAggregator(14); - hllcAggregator.aggregate(new HyperLogLogPlusCounter(14)); + hllcAggregator.aggregate(new HyperLogLogPlusCounterNew(14)); BitmapAggregator bitmapAggregator = new BitmapAggregator(); BitmapCounter bitmapCounter = new BitmapCounter(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f05404d5/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterOldTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterOldTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterOldTest.java new file mode 100644 index 0000000..5d17fea --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterOldTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hll; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounterOld; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author yangli9 + * + */ +public class HyperLogLogCounterOldTest { + + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + Random rand1 = new Random(1); + Random rand2 = new Random(2); + Random rand3 = new Random(3); + int errorCount1 = 0; + int errorCount2 = 0; + int errorCount3 = 0; + + @Test + public void testOneAdd() throws IOException { + HyperLogLogPlusCounterOld hllc = new HyperLogLogPlusCounterOld(14); + HyperLogLogPlusCounterOld one = new HyperLogLogPlusCounterOld(14); + for (int i = 0; i < 1000000; i++) { + one.clear(); + one.add(rand1.nextInt()); + hllc.merge(one); + } + assertTrue(hllc.getCountEstimate() > 1000000 * 0.9); + } + + @Test + public void testPeekLength() throws IOException { + HyperLogLogPlusCounterOld hllc = new HyperLogLogPlusCounterOld(10); + HyperLogLogPlusCounterOld copy = new HyperLogLogPlusCounterOld(10); + byte[] value = new byte[10]; + for (int i = 0; i < 200000; i++) { + rand1.nextBytes(value); + hllc.add(value); + + buf.clear(); + hllc.writeRegisters(buf); + + int len = buf.position(); + buf.position(0); + assertEquals(len, hllc.peekLength(buf)); + + copy.readRegisters(buf); + assertEquals(len, buf.position()); + assertEquals(hllc, copy); + } + buf.clear(); + } + + private Set<String> generateTestData(int n) { + Set<String> testData = new HashSet<String>(); + for (int i = 0; i < n; i++) { + String[] samples = generateSampleData(); + for (String sample : samples) { + testData.add(sample); + } + } + return testData; + } + + // simulate the visit (=visitor+id) + private String[] generateSampleData() { + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < 19; i++) { + buf.append(Math.abs(rand1.nextInt()) % 10); + } + String header = buf.toString(); + + int size = Math.abs(rand3.nextInt()) % 9 + 1; + String[] samples = new String[size]; + for (int k = 0; k < size; k++) { + buf = new StringBuilder(header); + buf.append("-"); + for (int i = 0; i < 10; i++) { + buf.append(Math.abs(rand3.nextInt()) % 10); + } + samples[k] = buf.toString(); + } + + return samples; + } + + @Test + public void countTest() throws IOException { + int n = 10; + for (int i = 0; i < 5; i++) { + count(n); + n *= 10; + } + } + + private void count(int n) throws IOException { + Set<String> testSet = generateTestData(n); + + HyperLogLogPlusCounterOld hllc = newHLLC(); + for (String testData : testSet) { + hllc.add(Bytes.toBytes(testData)); + } + long estimate = hllc.getCountEstimate(); + double errorRate = hllc.getErrorRate(); + double actualError = (double) Math.abs(testSet.size() - estimate) / testSet.size(); + System.out.println(estimate); + System.out.println(testSet.size()); + System.out.println(errorRate); + System.out.println("=" + actualError); + Assert.assertTrue(actualError < errorRate * 3.0); + + checkSerialize(hllc); + } + + private void checkSerialize(HyperLogLogPlusCounterOld hllc) throws IOException { + long estimate = hllc.getCountEstimate(); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + hllc.readRegisters(buf); + Assert.assertEquals(estimate, hllc.getCountEstimate()); + } + + @Test + public void mergeTest() throws IOException { + double error = 0; + int n = 100; + for (int i = 0; i < n; i++) { + double e = merge(i); + error += e; + } + System.out.println("Total average error is " + error / n); + + System.out.println(" errorRateCount1 is " + errorCount1 + "!"); + System.out.println(" errorRateCount2 is " + errorCount2 + "!"); + System.out.println(" errorRateCount3 is " + errorCount3 + "!"); + + Assert.assertTrue(errorCount1 <= n * 0.30); + Assert.assertTrue(errorCount2 <= n * 0.05); + Assert.assertTrue(errorCount3 <= n * 0.02); + } + + private double merge(int round) throws IOException { + int ln = 20; + int dn = 100 * (round + 1); + Set<String> testSet = new HashSet<String>(); + HyperLogLogPlusCounterOld[] hllcs = new HyperLogLogPlusCounterOld[ln]; + for (int i = 0; i < ln; i++) { + hllcs[i] = newHLLC(); + for (int k = 0; k < dn; k++) { + String[] samples = generateSampleData(); + for (String data : samples) { + testSet.add(data); + hllcs[i].add(Bytes.toBytes(data)); + } + } + } + HyperLogLogPlusCounterOld mergeHllc = newHLLC(); + for (HyperLogLogPlusCounterOld hllc : hllcs) { + mergeHllc.merge(serDes(hllc)); + } + + double errorRate = mergeHllc.getErrorRate(); + long estimate = mergeHllc.getCountEstimate(); + double actualError = Math.abs((double) (testSet.size() - estimate) / testSet.size()); + + System.out.println(testSet.size() + "-" + estimate + " ~ " + actualError); + Assert.assertTrue(actualError < 0.1); + + if (actualError > errorRate) { + errorCount1++; + } + if (actualError > 2 * errorRate) { + errorCount2++; + } + if (actualError > 3 * errorRate) { + errorCount3++; + } + + return actualError; + } + + private HyperLogLogPlusCounterOld serDes(HyperLogLogPlusCounterOld hllc) throws IOException { + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + HyperLogLogPlusCounterOld copy = new HyperLogLogPlusCounterOld(hllc.getPrecision()); + copy.readRegisters(buf); + Assert.assertEquals(copy.getCountEstimate(), hllc.getCountEstimate()); + return copy; + } + + @Test + public void testPerformance() throws IOException { + int N = 3; // reduce N HLLC into one + int M = 1000; // for M times, use 100000 for real perf test + + HyperLogLogPlusCounterOld samples[] = new HyperLogLogPlusCounterOld[N]; + for (int i = 0; i < N; i++) { + samples[i] = newHLLC(); + for (String str : generateTestData(10000)) + samples[i].add(str); + } + + System.out.println("Perf test running ... "); + long start = System.currentTimeMillis(); + HyperLogLogPlusCounterOld sum = newHLLC(); + for (int i = 0; i < M; i++) { + sum.clear(); + for (int j = 0; j < N; j++) { + sum.merge(samples[j]); + checkSerialize(sum); + } + } + long duration = System.currentTimeMillis() - start; + System.out.println("Perf test result: " + duration / 1000 + " seconds"); + } + + @Test + public void testEquivalence() { + byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; + byte[] b = new byte[] { 3, 4, 42 }; + HyperLogLogPlusCounterOld ha = new HyperLogLogPlusCounterOld(); + HyperLogLogPlusCounterOld hb = new HyperLogLogPlusCounterOld(); + ha.add(a, 1, 3); + hb.add(b); + + Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate()); + } + + private HyperLogLogPlusCounterOld newHLLC() { + return new HyperLogLogPlusCounterOld(16); + } +}
