KYLIN-976 very initial
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1218bbde Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1218bbde Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1218bbde Branch: refs/heads/KYLIN-976 Commit: 1218bbde487e973de0391162204d73c76f1a9e81 Parents: 6515b0a Author: Li, Yang <yang...@ebay.com> Authored: Tue Nov 24 13:15:54 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Fri Nov 27 14:47:05 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/util/DoubleMutable.java | 68 +++++ .../apache/kylin/common/util/LongMutable.java | 70 +++++ .../apache/kylin/aggregation/Aggregation.java | 42 +++ .../kylin/aggregation/DataTypeSerializer.java | 118 +++++++++ .../kylin/aggregation/MeasureAggregator.java | 133 ++++++++++ .../kylin/aggregation/MeasureAggregators.java | 81 ++++++ .../apache/kylin/aggregation/MeasureCodec.java | 78 ++++++ .../basic/BigDecimalMaxAggregator.java | 54 ++++ .../basic/BigDecimalMinAggregator.java | 55 ++++ .../aggregation/basic/BigDecimalSerializer.java | 111 ++++++++ .../basic/BigDecimalSumAggregator.java | 51 ++++ .../aggregation/basic/DateTimeSerializer.java | 65 +++++ .../aggregation/basic/DoubleMaxAggregator.java | 54 ++++ .../aggregation/basic/DoubleMinAggregator.java | 54 ++++ .../aggregation/basic/DoubleSerializer.java | 84 ++++++ .../aggregation/basic/DoubleSumAggregator.java | 51 ++++ .../aggregation/basic/LongMaxAggregator.java | 54 ++++ .../aggregation/basic/LongMinAggregator.java | 54 ++++ .../kylin/aggregation/basic/LongSerializer.java | 91 +++++++ .../aggregation/basic/LongSumAggregator.java | 51 ++++ .../aggregation/basic/StringSerializer.java | 56 ++++ .../kylin/aggregation/hllc/HLLCAggregator.java | 64 +++++ .../kylin/aggregation/hllc/HLLCSerializer.java | 98 +++++++ .../kylin/aggregation/hllc/LDCAggregator.java | 63 +++++ .../kylin/aggregation/topn/TopNAggregator.java | 66 +++++ .../aggregation/topn/TopNCounterSerializer.java | 117 +++++++++ .../kylin/cube/gridtable/CubeCodeSystem.java | 6 +- .../cube/gridtable/TrimmedCubeCodeSystem.java | 6 +- .../cube/inmemcubing/DoggedCubeBuilder.java | 2 +- .../cube/inmemcubing/InMemCubeBuilder.java | 2 +- .../InMemCubeBuilderInputConverter.java | 5 +- .../kylin/gridtable/GTAggregateScanner.java | 2 +- .../kylin/gridtable/GTSampleCodeSystem.java | 4 +- .../apache/kylin/gridtable/IGTCodeSystem.java | 2 +- .../apache/kylin/gridtable/UnitTestSupport.java | 2 +- .../basic/BigDecimalSerializerTest.java | 53 ++++ .../topn/TopNCounterSerializerTest.java | 61 +++++ .../gridtable/AggregationCacheMemSizeTest.java | 14 +- .../kylin/gridtable/DictGridTableTest.java | 2 +- .../kylin/gridtable/SimpleGridTableTest.java | 2 +- .../gridtable/SimpleInvertedIndexTest.java | 5 +- .../metadata/measure/MeasureCodecTest.java | 3 + .../measure/BigDecimalMaxAggregator.java | 52 ---- .../measure/BigDecimalMinAggregator.java | 53 ---- .../measure/BigDecimalSumAggregator.java | 49 ---- .../metadata/measure/DoubleMaxAggregator.java | 51 ---- .../metadata/measure/DoubleMinAggregator.java | 51 ---- .../kylin/metadata/measure/DoubleMutable.java | 68 ----- .../metadata/measure/DoubleSumAggregator.java | 48 ---- .../kylin/metadata/measure/HLLCAggregator.java | 63 ----- .../kylin/metadata/measure/LDCAggregator.java | 60 ----- .../metadata/measure/LongMaxAggregator.java | 51 ---- .../metadata/measure/LongMinAggregator.java | 51 ---- .../kylin/metadata/measure/LongMutable.java | 70 ----- .../metadata/measure/LongSumAggregator.java | 48 ---- .../metadata/measure/MeasureAggregator.java | 121 --------- .../metadata/measure/MeasureAggregators.java | 81 ------ .../kylin/metadata/measure/MeasureCodec.java | 79 ------ .../kylin/metadata/measure/TopNAggregator.java | 66 ----- .../measure/fixedlen/FixedHLLCodec.java | 80 ------ .../measure/fixedlen/FixedLenMeasureCodec.java | 49 ---- .../measure/fixedlen/FixedPointLongCodec.java | 117 --------- .../serializer/BigDecimalSerializer.java | 110 -------- .../measure/serializer/DataTypeSerializer.java | 111 -------- .../measure/serializer/DateTimeSerializer.java | 64 ----- .../measure/serializer/DoubleSerializer.java | 83 ------ .../measure/serializer/HLLCSerializer.java | 97 ------- .../measure/serializer/LongSerializer.java | 90 ------- .../measure/serializer/StringSerializer.java | 55 ---- .../serializer/TopNCounterSerializer.java | 116 -------- .../apache/kylin/metadata/model/DataType.java | 2 +- .../fixedlen/FixedPointLongCodecTest.java | 44 ---- .../serializer/BigDecimalSerializerTest.java | 52 ---- .../serializer/TopNCounterSerializerTest.java | 61 ----- .../org/apache/kylin/storage/tuple/Tuple.java | 4 +- .../engine/mr/steps/BaseCuboidMapperBase.java | 3 +- .../kylin/engine/mr/steps/CuboidReducer.java | 4 +- .../engine/mr/steps/InMemCuboidReducer.java | 4 +- .../mr/steps/MergeCuboidFromStorageMapper.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../kylin/engine/mr/steps/CubeReducerTest.java | 4 +- .../apache/kylin/engine/spark/SparkCubing.java | 8 +- .../invertedindex/index/RawTableRecord.java | 4 +- .../kylin/invertedindex/index/TableRecord.java | 2 +- .../invertedindex/index/TableRecordInfo.java | 2 +- .../index/TableRecordInfoDigest.java | 4 +- .../invertedindex/measure/FixedHLLCodec.java | 80 ++++++ .../measure/FixedLenMeasureCodec.java | 49 ++++ .../measure/FixedPointLongCodec.java | 117 +++++++++ .../invertedindex/model/IIKeyValueCodec.java | 2 +- .../kylin/invertedindex/IIDescManagerTest.java | 104 ++++++++ .../apache/kylin/invertedindex/IIDescTest.java | 67 +++++ .../kylin/invertedindex/IIInstanceTest.java | 74 ++++++ .../invertedindex/InvertedIndexLocalTest.java | 262 +++++++++++++++++++ .../org/apache/kylin/invertedindex/LZFTest.java | 49 ++++ .../invertedindex/IIDescManagerTest.java | 104 -------- .../invertedindex/invertedindex/IIDescTest.java | 67 ----- .../invertedindex/IIInstanceTest.java | 74 ------ .../invertedindex/InvertedIndexLocalTest.java | 262 ------------------- .../invertedindex/invertedindex/LZFTest.java | 49 ---- .../measure/FixedPointLongCodecTest.java | 45 ++++ .../common/coprocessor/AggregationCache.java | 2 +- .../observer/AggregationScanner.java | 2 +- .../observer/ObserverAggregationCache.java | 2 +- .../observer/ObserverAggregators.java | 4 +- .../endpoint/EndpointAggregationCache.java | 2 +- .../endpoint/EndpointAggregators.java | 6 +- .../ii/coprocessor/endpoint/IIEndpoint.java | 3 +- .../storage/hbase/steps/CubeHFileMapper.java | 2 +- .../hbase/steps/HBaseMROutput2Transition.java | 2 +- .../storage/hbase/steps/KeyValueCreator.java | 2 +- .../storage/hbase/steps/RowValueDecoder.java | 6 +- .../observer/AggregateRegionObserverTest.java | 2 +- .../endpoint/EndpointAggregationTest.java | 7 +- .../hbase/steps/CubeHFileMapper2Test.java | 2 +- .../hbase/steps/RowValueDecoderTest.java | 4 +- 116 files changed, 2924 insertions(+), 2819 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java new file mode 100644 index 0000000..520cd74 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class DoubleMutable implements Comparable<DoubleMutable>, Serializable { + + private double v; + + public DoubleMutable() { + this(0); + } + + public DoubleMutable(double v) { + set(v); + } + + public double get() { + return v; + } + + public void set(double v) { + this.v = v; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DoubleMutable)) { + return false; + } + DoubleMutable other = (DoubleMutable) o; + return this.v == other.v; + } + + @Override + public int hashCode() { + return (int) Double.doubleToLongBits(v); + } + + @Override + public int compareTo(DoubleMutable o) { + return (v < o.v ? -1 : (v == o.v ? 0 : 1)); + } + + @Override + public String toString() { + return Double.toString(v); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java new file mode 100644 index 0000000..238bb86 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class LongMutable implements Comparable<LongMutable>, Serializable { + + private long v; + + public LongMutable() { + this(0); + } + + public LongMutable(long v) { + set(v); + } + + public long get() { + return v; + } + + public void set(long v) { + this.v = v; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof LongMutable)) { + return false; + } + LongMutable other = (LongMutable) o; + return this.v == other.v; + } + + @Override + public int hashCode() { + return (int) v; + } + + @Override + public int compareTo(LongMutable o) { + long thisValue = this.v; + long thatValue = o.v; + return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1)); + } + + @Override + public String toString() { + return Long.toString(v); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java new file mode 100644 index 0000000..193c5de --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java @@ -0,0 +1,42 @@ +package org.apache.kylin.aggregation; + +import java.util.List; + +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +abstract public class Aggregation { + + /* ============================================================================ + * Define + * ---------------------------------------------------------------------------- */ + + abstract public DataType getAggregationDataType(); + + abstract public DataType getResultDataType(); + + abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException; + + /* ============================================================================ + * Build + * ---------------------------------------------------------------------------- */ + + abstract public DataTypeSerializer<?> getSeralizer(); + + abstract public MeasureAggregator<?> getAggregator(); + + abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc); + + abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts); + + /* ============================================================================ + * Cube Selection + * ---------------------------------------------------------------------------- */ + + /* ============================================================================ + * Query + * ---------------------------------------------------------------------------- */ + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java new file mode 100644 index 0000000..df6833c --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kylin.aggregation.basic.BigDecimalSerializer; +import org.apache.kylin.aggregation.basic.DateTimeSerializer; +import org.apache.kylin.aggregation.basic.DoubleSerializer; +import org.apache.kylin.aggregation.basic.LongSerializer; +import org.apache.kylin.aggregation.basic.StringSerializer; +import org.apache.kylin.aggregation.hllc.HLLCSerializer; +import org.apache.kylin.aggregation.topn.TopNCounterSerializer; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.metadata.model.DataType; + +import com.google.common.collect.Maps; + +/** + * @author yangli9 + * + * Note: the implementations MUST be thread-safe. + * + */ +abstract public class DataTypeSerializer<T> implements BytesSerializer<T> { + + final static Map<String, Class<?>> implementations; + static { + HashMap<String, Class<?>> impl = Maps.newHashMap(); + impl.put("varchar", StringSerializer.class); + impl.put("decimal", BigDecimalSerializer.class); + impl.put("double", DoubleSerializer.class); + impl.put("float", DoubleSerializer.class); + impl.put("bigint", LongSerializer.class); + impl.put("long", LongSerializer.class); + impl.put("integer", LongSerializer.class); + impl.put("int", LongSerializer.class); + impl.put("smallint", LongSerializer.class); + impl.put("date", DateTimeSerializer.class); + impl.put("datetime", DateTimeSerializer.class); + impl.put("timestamp", DateTimeSerializer.class); + implementations = Collections.unmodifiableMap(impl); + + } + + public static DataTypeSerializer<?> create(String dataType) { + return create(DataType.getInstance(dataType)); + } + + public static DataTypeSerializer<?> create(DataType type) { + if (type.isHLLC()) { + return new HLLCSerializer(type); + } + + if (type.isTopN()) { + return new TopNCounterSerializer(type); + } + + Class<?> clz = implementations.get(type.getName()); + if (clz == null) + throw new RuntimeException("No MeasureSerializer for type " + type); + + try { + return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type); + } catch (Exception e) { + throw new RuntimeException(e); // never happen + } + } + + /** peek into buffer and return the length of serialization */ + abstract public int peekLength(ByteBuffer in); + + /** return the max number of bytes to the longest serialization */ + abstract public int maxLength(); + + /** get an estimate of size in bytes of the serialized data */ + abstract public int getStorageBytesEstimate(); + + /** convert from String to obj (string often come as byte[] in mapred) */ + abstract public T valueOf(byte[] value); + + /** convert from String to obj */ + public T valueOf(String value) { + try { + return valueOf(value.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); // never happen + } + } + + /** convert from obj to string */ + public String toString(T value) { + if (value == null) + return "NULL"; + else + return value.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java new file mode 100644 index 0000000..9c8945d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation; + +import org.apache.kylin.aggregation.basic.BigDecimalMaxAggregator; +import org.apache.kylin.aggregation.basic.BigDecimalMinAggregator; +import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator; +import org.apache.kylin.aggregation.basic.DoubleMaxAggregator; +import org.apache.kylin.aggregation.basic.DoubleMinAggregator; +import org.apache.kylin.aggregation.basic.DoubleSumAggregator; +import org.apache.kylin.aggregation.basic.LongMaxAggregator; +import org.apache.kylin.aggregation.basic.LongMinAggregator; +import org.apache.kylin.aggregation.basic.LongSumAggregator; +import org.apache.kylin.aggregation.hllc.HLLCAggregator; +import org.apache.kylin.aggregation.hllc.LDCAggregator; +import org.apache.kylin.aggregation.topn.TopNAggregator; +import org.apache.kylin.metadata.model.DataType; +import org.apache.kylin.metadata.model.FunctionDesc; + +import java.io.Serializable; + +/** + */ +@SuppressWarnings("serial") +abstract public class MeasureAggregator<V> implements Serializable { + + public static MeasureAggregator<?> create(String funcName, String returnType) { + if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) { + if (isInteger(returnType)) + return new LongSumAggregator(); + else if (isBigDecimal(returnType)) + return new BigDecimalSumAggregator(); + else if (isDouble(returnType)) + return new DoubleSumAggregator(); + } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) { + DataType hllcType = DataType.getInstance(returnType); + if (hllcType.isHLLC()) + return new HLLCAggregator(hllcType.getPrecision()); + else + return new LDCAggregator(); + } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) { + if (isInteger(returnType)) + return new LongMaxAggregator(); + else if (isBigDecimal(returnType)) + return new BigDecimalMaxAggregator(); + else if (isDouble(returnType)) + return new DoubleMaxAggregator(); + } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) { + if (isInteger(returnType)) + return new LongMinAggregator(); + else if (isBigDecimal(returnType)) + return new BigDecimalMinAggregator(); + else if (isDouble(returnType)) + return new DoubleMinAggregator(); + } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) { + return new TopNAggregator(); + } + throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'"); + } + + public static boolean isBigDecimal(String type) { + return type.startsWith("decimal"); + } + + public static boolean isDouble(String type) { + return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type); + } + + public static boolean isInteger(String type) { + return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type); + } + + public static int guessBigDecimalMemBytes() { + // 116 returned by AggregationCacheMemSizeTest + return 8 // aggregator obj shell + + 8 // ref to BigDecimal + + 8 // BigDecimal obj shell + + 100; // guess of BigDecimal internal + } + + public static int guessDoubleMemBytes() { + // 29 to 44 returned by AggregationCacheMemSizeTest + return 44; + /* + return 8 // aggregator obj shell + + 8 // ref to DoubleWritable + + 8 // DoubleWritable obj shell + + 8; // size of double + */ + } + + public static int guessLongMemBytes() { + // 29 to 44 returned by AggregationCacheMemSizeTest + return 44; + /* + return 8 // aggregator obj shell + + 8 // ref to LongWritable + + 8 // LongWritable obj shell + + 8; // size of long + */ + } + + // ============================================================================ + + @SuppressWarnings("rawtypes") + public void setDependentAggregator(MeasureAggregator agg) { + } + + abstract public void reset(); + + abstract public void aggregate(V value); + + abstract public V getState(); + + // get an estimate of memory consumption UPPER BOUND + abstract public int getMemBytesEstimate(); +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java new file mode 100644 index 0000000..3aa575b --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; + +/** + */ +@SuppressWarnings({ "rawtypes", "unchecked", "serial" }) +public class MeasureAggregators implements Serializable { + + private final MeasureAggregator[] aggs; + private final int descLength; + + public MeasureAggregators(Collection<MeasureDesc> measureDescs) { + this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()])); + } + + public MeasureAggregators(MeasureDesc... measureDescs) { + descLength = measureDescs.length; + aggs = new MeasureAggregator[descLength]; + + Map<String, Integer> measureIndexMap = new HashMap<String, Integer>(); + for (int i = 0; i < descLength; i++) { + FunctionDesc func = measureDescs[i].getFunction(); + aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType()); + measureIndexMap.put(measureDescs[i].getName(), i); + } + // fill back dependent aggregator + for (int i = 0; i < descLength; i++) { + String depMsrRef = measureDescs[i].getDependentMeasureRef(); + if (depMsrRef != null) { + int index = measureIndexMap.get(depMsrRef); + aggs[i].setDependentAggregator(aggs[index]); + } + } + } + + public void reset() { + for (int i = 0; i < aggs.length; i++) { + aggs[i].reset(); + } + } + + public void aggregate(Object[] values) { + assert values.length == descLength; + + for (int i = 0; i < descLength; i++) { + aggs[i].aggregate(values[i]); + } + } + + public void collectStates(Object[] states) { + for (int i = 0; i < descLength; i++) { + states[i] = aggs[i].getState(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java new file mode 100644 index 0000000..cbcb3a8 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation; + +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.apache.kylin.metadata.model.MeasureDesc; + +/** + * @author yangli9 + * + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class MeasureCodec { + + int nMeasures; + DataTypeSerializer[] serializers; + + public MeasureCodec(Collection<MeasureDesc> measureDescs) { + this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()])); + } + + public MeasureCodec(MeasureDesc... measureDescs) { + String[] dataTypes = new String[measureDescs.length]; + for (int i = 0; i < dataTypes.length; i++) { + dataTypes[i] = measureDescs[i].getFunction().getReturnType(); + } + init(dataTypes); + } + + public MeasureCodec(String... dataTypes) { + init(dataTypes); + } + + private void init(String[] dataTypes) { + nMeasures = dataTypes.length; + serializers = new DataTypeSerializer[nMeasures]; + + for (int i = 0; i < nMeasures; i++) { + serializers[i] = DataTypeSerializer.create(dataTypes[i]); + } + } + + public DataTypeSerializer getSerializer(int idx) { + return serializers[idx]; + } + + public void decode(ByteBuffer buf, Object[] result) { + assert result.length == nMeasures; + for (int i = 0; i < nMeasures; i++) { + result[i] = serializers[i].deserialize(buf); + } + } + + public void encode(Object[] values, ByteBuffer out) { + assert values.length == nMeasures; + for (int i = 0; i < nMeasures; i++) { + serializers[i].serialize(values[i], out); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java new file mode 100644 index 0000000..ca044d0 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import java.math.BigDecimal; + +import org.apache.kylin.aggregation.MeasureAggregator; + +/** + */ +@SuppressWarnings("serial") +public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> { + + BigDecimal max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(BigDecimal value) { + if (max == null) + max = value; + else if (max.compareTo(value) < 0) + max = value; + } + + @Override + public BigDecimal getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessBigDecimalMemBytes(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java new file mode 100644 index 0000000..3c3c85e --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import java.math.BigDecimal; + +import org.apache.kylin.aggregation.MeasureAggregator; + +/** + */ +@SuppressWarnings("serial") +public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> { + + BigDecimal max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(BigDecimal value) { + if (max == null) + max = value; + else if (max.compareTo(value) > 0) + max = value; + } + + @Override + public BigDecimal getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessBigDecimalMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java new file mode 100644 index 0000000..9f7c3cf --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.model.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + * + */ +public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> { + + private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class); + + final DataType type; + final int maxLength; + + int avoidVerbose = 0; + + public BigDecimalSerializer(DataType type) { + this.type = type; + // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte + this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2; + } + + @Override + public void serialize(BigDecimal value, ByteBuffer out) { + if (value.scale() > type.getScale()) { + if (avoidVerbose % 10000 == 0) { + logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++)); + } + value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN); + } + byte[] bytes = value.unscaledValue().toByteArray(); + if (bytes.length + 2 > maxLength) { + throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type); + } + + BytesUtil.writeVInt(value.scale(), out); + BytesUtil.writeVInt(bytes.length, out); + out.put(bytes); + } + + @Override + public BigDecimal deserialize(ByteBuffer in) { + int scale = BytesUtil.readVInt(in); + int n = BytesUtil.readVInt(in); + + byte[] bytes = new byte[n]; + in.get(bytes); + + return new BigDecimal(new BigInteger(bytes), scale); + } + + @Override + public int peekLength(ByteBuffer in) { + int mark = in.position(); + + @SuppressWarnings("unused") + int scale = BytesUtil.readVInt(in); + int n = BytesUtil.readVInt(in); + int len = in.position() - mark + n; + + in.position(mark); + return len; + } + + @Override + public int maxLength() { + return maxLength; + } + + @Override + public int getStorageBytesEstimate() { + return 8; + } + + @Override + public BigDecimal valueOf(byte[] value) { + if (value == null) + return new BigDecimal(0); + else + return new BigDecimal(Bytes.toString(value)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java new file mode 100644 index 0000000..19aef3c --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import java.math.BigDecimal; + +import org.apache.kylin.aggregation.MeasureAggregator; + +/** + */ +@SuppressWarnings("serial") +public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> { + + BigDecimal sum = new BigDecimal(0); + + @Override + public void reset() { + sum = new BigDecimal(0); + } + + @Override + public void aggregate(BigDecimal value) { + sum = sum.add(value); + } + + @Override + public BigDecimal getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + return guessBigDecimalMemBytes(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java new file mode 100644 index 0000000..0bf4aba --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java @@ -0,0 +1,65 @@ +package org.apache.kylin.aggregation.basic; + +import java.nio.ByteBuffer; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.LongMutable; +import org.apache.kylin.metadata.model.DataType; + +public class DateTimeSerializer extends DataTypeSerializer<LongMutable> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>(); + + public DateTimeSerializer(DataType type) { + } + + @Override + public void serialize(LongMutable value, ByteBuffer out) { + out.putLong(value.get()); + } + + private LongMutable current() { + LongMutable l = current.get(); + if (l == null) { + l = new LongMutable(); + current.set(l); + } + return l; + } + + @Override + public LongMutable deserialize(ByteBuffer in) { + LongMutable l = current(); + l.set(in.getLong()); + return l; + } + + @Override + public int peekLength(ByteBuffer in) { + return 8; + } + + @Override + public int maxLength() { + return 8; + } + + @Override + public int getStorageBytesEstimate() { + return 8; + } + + @Override + public LongMutable valueOf(byte[] value) { + LongMutable l = current(); + if (value == null) + l.set(0L); + else + l.set(DateFormat.stringToMillis(Bytes.toString(value))); + return l; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java new file mode 100644 index 0000000..f09614d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.DoubleMutable; + +/** + */ +@SuppressWarnings("serial") +public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> { + + DoubleMutable max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(DoubleMutable value) { + if (max == null) + max = new DoubleMutable(value.get()); + else if (max.get() < value.get()) + max.set(value.get()); + } + + @Override + public DoubleMutable getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessDoubleMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java new file mode 100644 index 0000000..b93c15c --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.DoubleMutable; + +/** + */ +@SuppressWarnings("serial") +public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> { + + DoubleMutable min = null; + + @Override + public void reset() { + min = null; + } + + @Override + public void aggregate(DoubleMutable value) { + if (min == null) + min = new DoubleMutable(value.get()); + else if (min.get() > value.get()) + min.set(value.get()); + } + + @Override + public DoubleMutable getState() { + return min; + } + + @Override + public int getMemBytesEstimate() { + return guessDoubleMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java new file mode 100644 index 0000000..f207054 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import java.nio.ByteBuffer; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.DoubleMutable; +import org.apache.kylin.metadata.model.DataType; + +/** + */ +public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>(); + + public DoubleSerializer(DataType type) { + } + + @Override + public void serialize(DoubleMutable value, ByteBuffer out) { + out.putDouble(value.get()); + } + + private DoubleMutable current() { + DoubleMutable d = current.get(); + if (d == null) { + d = new DoubleMutable(); + current.set(d); + } + return d; + } + + @Override + public DoubleMutable deserialize(ByteBuffer in) { + DoubleMutable d = current(); + d.set(in.getDouble()); + return d; + } + + @Override + public int peekLength(ByteBuffer in) { + return 8; + } + + @Override + public int maxLength() { + return 8; + } + + @Override + public int getStorageBytesEstimate() { + return 8; + } + + @Override + public DoubleMutable valueOf(byte[] value) { + DoubleMutable d = current(); + if (value == null) + d.set(0d); + else + d.set(Double.parseDouble(Bytes.toString(value))); + return d; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java new file mode 100644 index 0000000..298cec6 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.DoubleMutable; + +/** + */ +@SuppressWarnings("serial") +public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> { + + DoubleMutable sum = new DoubleMutable(); + + @Override + public void reset() { + sum.set(0.0); + } + + @Override + public void aggregate(DoubleMutable value) { + sum.set(sum.get() + value.get()); + } + + @Override + public DoubleMutable getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + return guessDoubleMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java new file mode 100644 index 0000000..71d95f2 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.LongMutable; + +/** + */ +@SuppressWarnings("serial") +public class LongMaxAggregator extends MeasureAggregator<LongMutable> { + + LongMutable max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(LongMutable value) { + if (max == null) + max = new LongMutable(value.get()); + else if (max.get() < value.get()) + max.set(value.get()); + } + + @Override + public LongMutable getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java new file mode 100644 index 0000000..d1e93f2 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.LongMutable; + +/** + */ +@SuppressWarnings("serial") +public class LongMinAggregator extends MeasureAggregator<LongMutable> { + + LongMutable min = null; + + @Override + public void reset() { + min = null; + } + + @Override + public void aggregate(LongMutable value) { + if (min == null) + min = new LongMutable(value.get()); + else if (min.get() > value.get()) + min.set(value.get()); + } + + @Override + public LongMutable getState() { + return min; + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java new file mode 100644 index 0000000..202596d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import java.nio.ByteBuffer; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.LongMutable; +import org.apache.kylin.metadata.model.DataType; + +/** + */ +public class LongSerializer extends DataTypeSerializer<LongMutable> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>(); + + public LongSerializer(DataType type) { + } + + @Override + public void serialize(LongMutable value, ByteBuffer out) { + BytesUtil.writeVLong(value.get(), out); + } + + private LongMutable current() { + LongMutable l = current.get(); + if (l == null) { + l = new LongMutable(); + current.set(l); + } + return l; + } + + @Override + public LongMutable deserialize(ByteBuffer in) { + LongMutable l = current(); + l.set(BytesUtil.readVLong(in)); + return l; + } + + @Override + public int peekLength(ByteBuffer in) { + int mark = in.position(); + + BytesUtil.readVLong(in); + int len = in.position() - mark; + + in.position(mark); + return len; + } + + @Override + public int maxLength() { + return 9; // vlong: 1 + 8 + } + + @Override + public int getStorageBytesEstimate() { + return 5; + } + + @Override + public LongMutable valueOf(byte[] value) { + LongMutable l = current(); + if (value == null) + l.set(0L); + else + l.set(Long.parseLong(Bytes.toString(value))); + return l; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java new file mode 100644 index 0000000..c85c83c --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.basic; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.LongMutable; + +/** + */ +@SuppressWarnings("serial") +public class LongSumAggregator extends MeasureAggregator<LongMutable> { + + LongMutable sum = new LongMutable(); + + @Override + public void reset() { + sum.set(0); + } + + @Override + public void aggregate(LongMutable value) { + sum.set(sum.get() + value.get()); + } + + @Override + public LongMutable getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java new file mode 100644 index 0000000..e84278d --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java @@ -0,0 +1,56 @@ +package org.apache.kylin.aggregation.basic; + +import java.nio.ByteBuffer; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.model.DataType; + +public class StringSerializer extends DataTypeSerializer<String> { + + final DataType type; + final int maxLength; + + public StringSerializer(DataType type) { + this.type = type; + // see serialize(): 2 byte length, rest is String.toBytes() + this.maxLength = 2 + type.getPrecision(); + } + + @Override + public void serialize(String value, ByteBuffer out) { + int start = out.position(); + + BytesUtil.writeUTFString(value, out); + + if (out.position() - start > maxLength) + throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type); + } + + @Override + public String deserialize(ByteBuffer in) { + return BytesUtil.readUTFString(in); + } + + @Override + public int peekLength(ByteBuffer in) { + return BytesUtil.peekByteArrayLength(in); + } + + @Override + public int maxLength() { + return maxLength; + } + + @Override + public int getStorageBytesEstimate() { + return maxLength; + } + + @Override + public String valueOf(byte[] value) { + return Bytes.toString(value); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java new file mode 100644 index 0000000..8f85fe8 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.hllc; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; + +/** + */ +@SuppressWarnings("serial") +public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> { + + final int precision; + HyperLogLogPlusCounter sum = null; + + public HLLCAggregator(int precision) { + this.precision = precision; + } + + @Override + public void reset() { + sum = null; + } + + @Override + public void aggregate(HyperLogLogPlusCounter value) { + if (sum == null) + sum = new HyperLogLogPlusCounter(value); + else + sum.merge(value); + } + + @Override + public HyperLogLogPlusCounter getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + // 1024 + 60 returned by AggregationCacheMemSizeTest + return 8 // aggregator obj shell + + 4 // precision + + 8 // ref to HLLC + + 8 // HLLC obj shell + + 32 + (1 << precision); // HLLC internal + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java new file mode 100644 index 0000000..f7804f4 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.hllc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.model.DataType; + +/** + * @author yangli9 + * + */ +public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>(); + + private int precision; + + public HLLCSerializer(DataType type) { + this.precision = type.getPrecision(); + } + + @Override + public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) { + try { + value.writeRegisters(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private HyperLogLogPlusCounter current() { + HyperLogLogPlusCounter hllc = current.get(); + if (hllc == null) { + hllc = new HyperLogLogPlusCounter(precision); + current.set(hllc); + } + return hllc; + } + + @Override + public HyperLogLogPlusCounter deserialize(ByteBuffer in) { + HyperLogLogPlusCounter hllc = current(); + try { + hllc.readRegisters(in); + } catch (IOException e) { + throw new RuntimeException(e); + } + return hllc; + } + + @Override + public int peekLength(ByteBuffer in) { + return current().peekLength(in); + } + + @Override + public int maxLength() { + return current().maxLength(); + } + + @Override + public int getStorageBytesEstimate() { + return current().maxLength(); + } + + @Override + public HyperLogLogPlusCounter valueOf(byte[] value) { + HyperLogLogPlusCounter hllc = current(); + hllc.clear(); + if (value == null) + hllc.add("__nUlL__"); + else + hllc.add(value); + return hllc; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java new file mode 100644 index 0000000..643bcae --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.hllc; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.util.LongMutable; + +/** + * Long Distinct Count + */ +@SuppressWarnings("serial") +public class LDCAggregator extends MeasureAggregator<LongMutable> { + + private static LongMutable ZERO = new LongMutable(0); + + private HLLCAggregator hllAgg = null; + private LongMutable state = new LongMutable(0); + + @SuppressWarnings("rawtypes") + public void setDependentAggregator(MeasureAggregator agg) { + this.hllAgg = (HLLCAggregator) agg; + } + + @Override + public void reset() { + } + + @Override + public void aggregate(LongMutable value) { + } + + @Override + public LongMutable getState() { + if (hllAgg == null) { + return ZERO; + } else { + state.set(hllAgg.getState().getCountEstimate()); + return state; + } + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java new file mode 100644 index 0000000..4f6c7ee --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.topn; + +import java.util.Map; + +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.common.topn.TopNCounter; +import org.apache.kylin.common.util.ByteArray; + +import com.google.common.collect.Maps; + +/** + * + */ +@SuppressWarnings("serial") +public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> { + + int capacity = 0; + TopNCounter<ByteArray> sum = null; + Map<ByteArray, Double> sanityCheckMap; + + @Override + public void reset() { + sum = null; + } + + @Override + public void aggregate(TopNCounter<ByteArray> value) { + if (sum == null) { + capacity = value.getCapacity(); + sum = new TopNCounter<>(capacity); + sanityCheckMap = Maps.newHashMap(); + } + sum.merge(value); + } + + @Override + public TopNCounter<ByteArray> getState() { + + //sum.retain(capacity); + return sum; + } + + @Override + public int getMemBytesEstimate() { + return 8 * capacity / 4; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java new file mode 100644 index 0000000..8c44f8f --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.aggregation.topn; + +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.common.topn.Counter; +import org.apache.kylin.common.topn.DoubleDeltaSerializer; +import org.apache.kylin.common.topn.TopNCounter; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.model.DataType; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; + +/** + * + */ +public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> { + + private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3); + + private int precision; + + public TopNCounterSerializer(DataType dataType) { + this.precision = dataType.getPrecision(); + } + + @Override + public int peekLength(ByteBuffer in) { + int mark = in.position(); + @SuppressWarnings("unused") + int capacity = in.getInt(); + int size = in.getInt(); + int keyLength = in.getInt(); + dds.deserialize(in); + int len = in.position() - mark + keyLength * size; + in.position(mark); + return len; + } + + @Override + public int maxLength() { + return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8); + } + + @Override + public int getStorageBytesEstimate() { + return precision * TopNCounter.EXTRA_SPACE_RATE * 8; + } + + @Override + public TopNCounter<ByteArray> valueOf(byte[] value) { + ByteBuffer buffer = ByteBuffer.wrap(value); + int sizeOfId = buffer.getInt(); + int keyEncodedValue = buffer.getInt(); + double counter = buffer.getDouble(); + + ByteArray key = new ByteArray(sizeOfId); + BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId); + + TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE); + topNCounter.offer(key, counter); + return topNCounter; + } + + @Override + public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) { + double[] counters = value.getCounters(); + List<ByteArray> peek = value.peek(1); + int keyLength = peek.size() > 0 ? peek.get(0).length() : 0; + out.putInt(value.getCapacity()); + out.putInt(value.size()); + out.putInt(keyLength); + dds.serialize(counters, out); + Iterator<Counter<ByteArray>> iterator = value.iterator(); + while (iterator.hasNext()) { + out.put(iterator.next().getItem().array()); + } + } + + @Override + public TopNCounter<ByteArray> deserialize(ByteBuffer in) { + int capacity = in.getInt(); + int size = in.getInt(); + int keyLength = in.getInt(); + double[] counters = dds.deserialize(in); + + TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity); + ByteArray byteArray; + for (int i = 0; i < size; i++) { + byteArray = new ByteArray(keyLength); + in.get(byteArray.array()); + counter.offerToHead(byteArray, counters[i]); + } + + return counter; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index 65d639f..b88f9df 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -6,6 +6,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.aggregation.basic.StringSerializer; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; @@ -15,9 +18,6 @@ import org.apache.kylin.gridtable.DefaultGTComparator; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.IGTCodeSystem; import org.apache.kylin.gridtable.IGTComparator; -import org.apache.kylin.metadata.measure.MeasureAggregator; -import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer; -import org.apache.kylin.metadata.measure.serializer.StringSerializer; /** * defines how column values will be encoded to/ decoded from GTRecord http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java index e4f32fb..26f1636 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java @@ -23,13 +23,13 @@ package org.apache.kylin.cube.gridtable; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.kylin.aggregation.DataTypeSerializer; +import org.apache.kylin.aggregation.MeasureAggregator; +import org.apache.kylin.aggregation.basic.StringSerializer; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.IGTCodeSystem; import org.apache.kylin.gridtable.IGTComparator; -import org.apache.kylin.metadata.measure.MeasureAggregator; -import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer; -import org.apache.kylin.metadata.measure.serializer.StringSerializer; @SuppressWarnings({ "rawtypes", "unchecked" }) public class TrimmedCubeCodeSystem implements IGTCodeSystem { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index 7fe2122..ce912a3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -29,6 +29,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.TimeUnit; +import org.apache.kylin.aggregation.MeasureAggregators; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; @@ -37,7 +38,6 @@ import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.metadata.measure.MeasureAggregators; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index e9d940a..8c6146b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; +import org.apache.kylin.common.util.DoubleMutable; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.Pair; @@ -46,7 +47,6 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.metadata.measure.DoubleMutable; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index 69a9fc9..951c054 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -18,15 +18,16 @@ package org.apache.kylin.cube.inmemcubing; import com.google.common.base.Preconditions; + +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.LongMutable; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.metadata.measure.LongMutable; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index cd2881e..01696e8 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -7,10 +7,10 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.SortedMap; +import org.apache.kylin.aggregation.MeasureAggregator; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; -import org.apache.kylin.metadata.measure.MeasureAggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory;