http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java deleted file mode 100644 index db27ca0..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.apache.kylin.metadata.measure.serializer; - -import java.nio.ByteBuffer; - -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.metadata.model.DataType; - -public class StringSerializer extends DataTypeSerializer<String> { - - final DataType type; - final int maxLength; - - public StringSerializer(DataType type) { - this.type = type; - // see serialize(): 2 byte length, rest is String.toBytes() - this.maxLength = 2 + type.getPrecision(); - } - - @Override - public void serialize(String value, ByteBuffer out) { - int start = out.position(); - - BytesUtil.writeUTFString(value, out); - - if (out.position() - start > maxLength) - throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type); - } - - @Override - public String deserialize(ByteBuffer in) { - return BytesUtil.readUTFString(in); - } - - @Override - public int peekLength(ByteBuffer in) { - return BytesUtil.peekByteArrayLength(in); - } - - @Override - public int maxLength() { - return maxLength; - } - - @Override - public int getStorageBytesEstimate() { - return maxLength; - } - - @Override - public String valueOf(byte[] value) { - return Bytes.toString(value); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java deleted file mode 100644 index 468d077..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metadata.measure.serializer; - -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.DoubleDeltaSerializer; -import org.apache.kylin.common.topn.TopNCounter; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.metadata.model.DataType; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; - -/** - * - */ -public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> { - - private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3); - - private int precision; - - public TopNCounterSerializer(DataType dataType) { - this.precision = dataType.getPrecision(); - } - - @Override - public int peekLength(ByteBuffer in) { - int mark = in.position(); - @SuppressWarnings("unused") - int capacity = in.getInt(); - int size = in.getInt(); - int keyLength = in.getInt(); - dds.deserialize(in); - int len = in.position() - mark + keyLength * size; - in.position(mark); - return len; - } - - @Override - public int maxLength() { - return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8); - } - - @Override - public int getStorageBytesEstimate() { - return precision * TopNCounter.EXTRA_SPACE_RATE * 8; - } - - @Override - public TopNCounter<ByteArray> valueOf(byte[] value) { - ByteBuffer buffer = ByteBuffer.wrap(value); - int sizeOfId = buffer.getInt(); - int keyEncodedValue = buffer.getInt(); - double counter = buffer.getDouble(); - - ByteArray key = new ByteArray(sizeOfId); - BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId); - - TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE); - topNCounter.offer(key, counter); - return topNCounter; - } - - @Override - public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) { - double[] counters = value.getCounters(); - List<ByteArray> peek = value.peek(1); - int keyLength = peek.size() > 0 ? peek.get(0).length() : 0; - out.putInt(value.getCapacity()); - out.putInt(value.size()); - out.putInt(keyLength); - dds.serialize(counters, out); - Iterator<Counter<ByteArray>> iterator = value.iterator(); - while (iterator.hasNext()) { - out.put(iterator.next().getItem().array()); - } - } - - @Override - public TopNCounter<ByteArray> deserialize(ByteBuffer in) { - int capacity = in.getInt(); - int size = in.getInt(); - int keyLength = in.getInt(); - double[] counters = dds.deserialize(in); - - TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity); - ByteArray byteArray; - for (int i = 0; i < size; i++) { - byteArray = new ByteArray(keyLength); - in.get(byteArray.array()); - counter.offerToHead(byteArray, counters[i]); - } - - return counter; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java index 590ef3d..1333426 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer; +import org.apache.kylin.aggregation.DataTypeSerializer; /** */ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-metadata/src/test/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodecTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodecTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodecTest.java deleted file mode 100644 index 6ba7115..0000000 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodecTest.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.apache.kylin.metadata.measure.fixedlen; - -import org.apache.kylin.metadata.model.DataType; -import org.junit.Test; - -/** - */ -public class FixedPointLongCodecTest { - - @Test - public void testEncode1() { - FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); - long x = codec.getValueIgnoringDecimalPoint("12.12345"); - org.junit.Assert.assertEquals(1212345, x); - } - - @Test - public void testEncode2() { - FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); - long x = codec.getValueIgnoringDecimalPoint("12.1234"); - org.junit.Assert.assertEquals(1212340, x); - } - - @Test - public void testEncode3() { - FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); - long x = codec.getValueIgnoringDecimalPoint("12.123456"); - org.junit.Assert.assertEquals(1212345, x); - } - - @Test - public void testEncode4() { - FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); - long x = codec.getValueIgnoringDecimalPoint("12"); - org.junit.Assert.assertEquals(1200000, x); - } - - @Test - public void testDecode1() { - FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); - String x = codec.restoreDecimalPoint(1212345); - org.junit.Assert.assertEquals("12.12345", x); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java deleted file mode 100644 index 682bc24..0000000 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.kylin.metadata.measure.serializer; - -import static org.junit.Assert.assertEquals; - -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.apache.kylin.metadata.model.DataType; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - */ -public class BigDecimalSerializerTest { - - private static BigDecimalSerializer bigDecimalSerializer; - - @BeforeClass - public static void beforeClass() { - bigDecimalSerializer = new BigDecimalSerializer(DataType.getInstance("decimal")); - } - - @Test - public void testNormal() { - BigDecimal input = new BigDecimal("1234.1234"); - ByteBuffer buffer = ByteBuffer.allocate(256); - buffer.mark(); - bigDecimalSerializer.serialize(input, buffer); - buffer.reset(); - BigDecimal output = bigDecimalSerializer.deserialize(buffer); - assertEquals(input, output); - } - - @Test - public void testScaleOutOfRange() { - BigDecimal input = new BigDecimal("1234.1234567890"); - ByteBuffer buffer = ByteBuffer.allocate(256); - buffer.mark(); - bigDecimalSerializer.serialize(input, buffer); - buffer.reset(); - BigDecimal output = bigDecimalSerializer.deserialize(buffer); - assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output); - } - - @Test(expected = IllegalArgumentException.class) - public void testOutOfPrecision() { - BigDecimal input = new BigDecimal("66855344214907231736.4924"); - ByteBuffer buffer = ByteBuffer.allocate(256); - bigDecimalSerializer.serialize(input, buffer); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java deleted file mode 100644 index 3c88446..0000000 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/TopNCounterSerializerTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package org.apache.kylin.metadata.measure.serializer; - -import org.apache.kylin.common.topn.TopNCounter; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.metadata.model.DataType; -import org.junit.Assert; -import org.junit.Test; - -import java.nio.ByteBuffer; - -/** - * - */ -public class TopNCounterSerializerTest { - - private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getInstance("topn(10)")); - - @SuppressWarnings("unchecked") - @Test - public void testSerialization() { - TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50); - Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 }; - for (Integer i : stream) { - vs.offer(new ByteArray(Bytes.toBytes(i))); - } - - ByteBuffer out = ByteBuffer.allocate(1024); - serializer.serialize(vs, out); - - byte[] copyBytes = new byte[out.position()]; - System.arraycopy(out.array(), 0, copyBytes, 0, out.position()); - - ByteBuffer in = ByteBuffer.wrap(copyBytes); - TopNCounter<ByteArray> vsNew = serializer.deserialize(in); - - Assert.assertEquals(vs.toString(), vsNew.toString()); - - } - - @Test - public void testValueOf() { - - TopNCounter<ByteArray> origin = new TopNCounter<ByteArray>(10); - ByteArray key = new ByteArray(1); - ByteBuffer byteBuffer = key.asBuffer(); - BytesUtil.writeVLong(20l, byteBuffer); - origin.offer(key, 1.0); - - byteBuffer = ByteBuffer.allocate(1024); - byteBuffer.putInt(1); - byteBuffer.putInt(20); - byteBuffer.putDouble(1.0); - TopNCounter<ByteArray> counter = serializer.valueOf(byteBuffer.array()); - - - Assert.assertEquals(origin.toString(), counter.toString()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java index 11b03bd..da009df 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java @@ -27,8 +27,8 @@ import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.metadata.measure.DoubleMutable; -import org.apache.kylin.metadata.measure.LongMutable; +import org.apache.kylin.common.util.DoubleMutable; +import org.apache.kylin.common.util.LongMutable; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITuple; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 557e325..3fa7d5c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -7,7 +7,9 @@ import java.util.List; import java.util.Map; import com.google.common.collect.Maps; + import org.apache.hadoop.io.Text; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSplitter; @@ -24,7 +26,6 @@ import org.apache.kylin.dict.Dictionary; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index f7b9e9a..0f94dca 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.io.Text; +import org.apache.kylin.aggregation.MeasureAggregators; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.kv.RowConstants; @@ -30,8 +32,6 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureAggregators; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index db254f6..ab87b21 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -3,6 +3,8 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.List; +import org.apache.kylin.aggregation.MeasureAggregators; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -14,8 +16,6 @@ import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureAggregators; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index fc616fa..8d00084 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; @@ -47,7 +48,6 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 6c2679e..d08d2a4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; @@ -47,7 +48,6 @@ import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index fac3ff6..5b2d20e 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -31,13 +31,13 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.mrunit.types.Pair; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.LongMutable; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.measure.LongMutable; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 01d97fd..3d2badd 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.aggregation.MeasureAggregators; +import org.apache.kylin.aggregation.MeasureCodec; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; @@ -56,15 +58,11 @@ import org.apache.kylin.cube.model.*; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.*; import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.metadata.measure.MeasureAggregators; -import org.apache.kylin.metadata.measure.MeasureCodec; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -84,9 +82,11 @@ import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import org.reflections.Reflections; + import scala.Tuple2; import javax.annotation.Nullable; + import java.io.File; import java.io.FileFilter; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java index c126fdd..11b1897 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java @@ -23,9 +23,9 @@ import java.util.Arrays; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.LongMutable; import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.measure.LongMutable; -import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; +import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec; /** */ http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java index 2521fbf..71d7bae 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java @@ -23,9 +23,9 @@ import java.util.Arrays; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.LongMutable; import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.measure.LongMutable; /** * TableRecord extends RawTableRecord by decorating it with a TableRecordInfo. http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java index 9627187..27519bc 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java @@ -23,8 +23,8 @@ import java.util.List; import org.apache.kylin.common.util.Array; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec; import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; import org.apache.kylin.metadata.model.DataType; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java index 2b2c514..bd27e38 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java @@ -23,8 +23,8 @@ import java.util.Arrays; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.metadata.measure.LongMutable; -import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; +import org.apache.kylin.common.util.LongMutable; +import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec; import org.apache.kylin.metadata.model.DataType; import com.google.common.base.Objects; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java new file mode 100644 index 0000000..de35f91 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.invertedindex.measure; + +import java.nio.ByteBuffer; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.model.DataType; + +/** + */ +public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter> { + + private DataType type; + private int presision; + private HyperLogLogPlusCounter current; + + public FixedHLLCodec(DataType type) { + this.type = type; + this.presision = type.getPrecision(); + this.current = new HyperLogLogPlusCounter(this.presision); + } + + @Override + public int getLength() { + return 1 << presision; + } + + @Override + public DataType getDataType() { + return type; + } + + @Override + public HyperLogLogPlusCounter valueOf(String value) { + current.clear(); + if (value == null) + current.add("__nUlL__"); + else + current.add(value.getBytes()); + return current; + } + + @Override + public Object getValue() { + return current; + } + + @Override + public HyperLogLogPlusCounter read(byte[] buf, int offset) { + current.readRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset)); + return current; + } + + @Override + public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) { + v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset)); + } + + @Override + public HyperLogLogPlusCounter read(ByteBuffer buffer) { + current.readRegistersArray(buffer); + return current; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java new file mode 100644 index 0000000..35872be --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex.measure; + +import org.apache.kylin.metadata.model.DataType; + +import java.nio.ByteBuffer; + +abstract public class FixedLenMeasureCodec<T> { + + public static FixedLenMeasureCodec<?> get(DataType type) { + if (type.isHLLC()) { + return new FixedHLLCodec(type); + } else { + return new FixedPointLongCodec(type); + } + } + + abstract public int getLength(); + + abstract public DataType getDataType(); + + abstract public T valueOf(String value); + + abstract public Object getValue(); + + abstract public T read(byte[] buf, int offset); + + abstract public void write(T v, byte[] buf, int offset); + + abstract public T read(ByteBuffer buffer); + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java new file mode 100644 index 0000000..a5d74f7 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex.measure; + +import java.nio.ByteBuffer; + +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.LongMutable; +import org.apache.kylin.metadata.model.DataType; + +public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> { + + private static final int SIZE = 8; + // number of digits after decimal point + int scale; + DataType type; + // avoid massive object creation + LongMutable current = new LongMutable(); + + public FixedPointLongCodec(DataType type) { + this.type = type; + this.scale = Math.max(0, type.getScale()); + } + + @Override + public int getLength() { + return SIZE; + } + + @Override + public DataType getDataType() { + return type; + } + + long getValueIgnoringDecimalPoint(String value) { + int index = value.indexOf('.'); + + if (index == 0 || index == value.length() - 1) { + throw new RuntimeException("Bad decimal format: " + value); + } else if (index < 0) { + return Long.valueOf(value) * (int) Math.pow(10, scale); + } else { + StringBuilder sb = new StringBuilder(); + sb.append(value.substring(0, index)); + + //if there are more than scale digits after the decimal point, the tail will be discarded + int end = Math.min(value.length(), index + scale + 1); + sb.append(value.substring(index + 1, end)); + int diff = index + scale + 1 - value.length(); + //if there are less than scale digits after the decimal point, the tail will be compensated + for (int i = 0; i < diff; i++) { + sb.append('0'); + } + return Long.valueOf(sb.toString()); + } + } + + String restoreDecimalPoint(long value) { + if (scale < 0) { + throw new RuntimeException("Bad scale: " + scale + " with value: " + value); + } else if (scale == 0) { + return Long.toString(value); + } else { + return String.format("%." + scale + "f", value / (Math.pow(10, scale))); + } + } + + @Override + public LongMutable valueOf(String value) { + if (value == null) + current.set(0L); + else + current.set(getValueIgnoringDecimalPoint(value)); + return current; + } + + @Override + public String getValue() { + if (scale == 0) + return current.toString(); + else + return restoreDecimalPoint(current.get()); + } + + @Override + public LongMutable read(byte[] buf, int offset) { + current.set(BytesUtil.readLong(buf, offset, SIZE)); + return current; + } + + @Override + public void write(LongMutable v, byte[] buf, int offset) { + BytesUtil.writeLong(v == null ? 0 : v.get(), buf, offset, SIZE); + } + + @Override + public LongMutable read(ByteBuffer buffer) { + current.set(BytesUtil.readLong(buffer, SIZE)); + return current; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java index e17133f..4dd1723 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java @@ -32,7 +32,7 @@ import org.apache.kylin.invertedindex.index.ColumnValueContainer; import org.apache.kylin.invertedindex.index.CompressedValueContainer; import org.apache.kylin.invertedindex.index.Slice; import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; -import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; +import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec; import org.apache.kylin.metadata.model.DataType; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescManagerTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescManagerTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescManagerTest.java new file mode 100644 index 0000000..8ae3dfd --- /dev/null +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescManagerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.invertedindex.IIDescManager; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + */ +public class IIDescManagerTest extends LocalFileMetadataTestCase { + + public static final String TEST_II_DESC_NAME = "test_kylin_ii_left_join_desc"; + + @Before + public void setup() { + createTestMetadata(); + } + + @After + public void clean() { + this.cleanupTestMetadata(); + } + + @Test + public void testCRUD() throws IOException { + IIDescManager mgr = IIDescManager.getInstance(getTestConfig()); + + String newDescName = "Copy of " + TEST_II_DESC_NAME; + + try { + IIDesc testRecord = mgr.getIIDesc(newDescName); + if (testRecord != null) + mgr.removeIIDesc(testRecord); + } catch (IOException e) { + // just ensure the old one is removed + } + + Assert.assertNull(mgr.getIIDesc(newDescName)); + IIDesc desc = mgr.getIIDesc(TEST_II_DESC_NAME); + + desc.setName(newDescName); + desc.setLastModified(0); + + mgr.createIIDesc(desc); + + desc = mgr.getIIDesc(newDescName); + + Assert.assertNotNull(desc); + + mgr.updateIIDesc(desc); // this will trigger cache wipe; please ignore the HTTP error in logs. + + mgr.removeIIDesc(desc); + + Assert.assertNull(mgr.getIIDesc(newDescName)); + + } + + @Test + public void testReload() throws IOException { + IIDescManager mgr = IIDescManager.getInstance(getTestConfig()); + + IIDesc desc = mgr.getIIDesc(TEST_II_DESC_NAME); + + // do some modification + desc.setUuid(UUID.randomUUID().toString()); + + IIDesc newDesc = mgr.getIIDesc(TEST_II_DESC_NAME); + + Assert.assertEquals(desc, newDesc); + + // reload the cache + mgr.reloadIIDescLocal(TEST_II_DESC_NAME); + + newDesc = mgr.getIIDesc(TEST_II_DESC_NAME); + + Assert.assertNotEquals(desc, newDesc); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescTest.java new file mode 100644 index 0000000..aac915d --- /dev/null +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIDescTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex; + +import java.io.IOException; + +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.invertedindex.IIDescManager; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + */ +public class IIDescTest extends LocalFileMetadataTestCase { + + @Before + public void setup() { + this.createTestMetadata(); + + } + + @After + public void clear() { + this.cleanupTestMetadata(); + } + + @Test + public void testGetIIDesc() { + + IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_left_join_desc"); + DataModelDesc model = iiDesc.getModel(); + Assert.assertNotNull(iiDesc); + Assert.assertNotNull(model); + + } + + @Test + public void testSerialization() throws IOException { + IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_left_join_desc"); + String str = JsonUtil.writeValueAsIndentString(iiDesc); + System.out.println(str); + IIDesc desc2 = JsonUtil.readValue(str, IIDesc.class); + + Assert.assertEquals(iiDesc, desc2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIInstanceTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIInstanceTest.java new file mode 100644 index 0000000..f30a876 --- /dev/null +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/IIInstanceTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex; + +import java.io.IOException; +import java.util.List; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.invertedindex.IIDescManager; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + */ +public class IIInstanceTest extends LocalFileMetadataTestCase { + @Before + public void setup() { + createTestMetadata(); + } + + @After + public void clean() { + cleanupTestMetadata(); + } + + @Test + public void testGetIIsByDesc() throws IOException { + IIManager mgr = IIManager.getInstance(getTestConfig()); + + List<IIInstance> iiInstances = mgr.getIIsByDesc("test_kylin_ii_left_join_desc"); + + Assert.assertTrue(iiInstances.size() > 0); + + } + + @Test + public void testCreateIIInstance() throws IOException { + + IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_left_join_desc"); + + IIInstance ii = IIInstance.create("new ii", "default", iiDesc); + + IIManager iiMgr = IIManager.getInstance(getTestConfig()); + + List<IIInstance> allIIList = iiMgr.listAllIIs(); + + iiMgr.createII(ii); + + Assert.assertNotNull(iiMgr.getII("new ii")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/InvertedIndexLocalTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/InvertedIndexLocalTest.java new file mode 100644 index 0000000..d72899b --- /dev/null +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/InvertedIndexLocalTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.index.CompressedValueContainer; +import org.apache.kylin.invertedindex.index.RawTableRecord; +import org.apache.kylin.invertedindex.index.ShardingSliceBuilder; +import org.apache.kylin.invertedindex.index.Slice; +import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.model.IIKeyValueCodec; +import org.apache.kylin.invertedindex.model.IIRow; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; + +public class InvertedIndexLocalTest extends LocalFileMetadataTestCase { + + IIInstance ii; + TableRecordInfo info; + List<String> lines; + private Dictionary<?>[] dictionaryMap; + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); + + File file = new File(LOCALMETA_TEST_DATA, "data/flatten_data_for_ii.csv"); + FileInputStream in = new FileInputStream(file); + this.lines = IOUtils.readLines(in, "UTF-8"); + in.close(); + + dictionaryMap = buildDictionary(Lists.transform(lines, new Function<String, List<String>>() { + @Nullable + @Override + public List<String> apply(@Nullable String input) { + return Lists.newArrayList(input.split(",")); + } + }), ii.getDescriptor()); + this.info = new TableRecordInfo(ii.getDescriptor(), dictionaryMap); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testCompressedValueContainer() { + // create container + CompressedValueContainer container = new CompressedValueContainer(info.getDigest(), 0, 500); + Dictionary<String> dict = info.dict(0); + + byte[] buf = new byte[dict.getSizeOfId()]; + ImmutableBytesWritable bytes = new ImmutableBytesWritable(buf); + + for (int v = dict.getMinId(); v <= dict.getMaxId(); v++) { + BytesUtil.writeUnsigned(v, buf, 0, dict.getSizeOfId()); + container.append(bytes); + } + BytesUtil.writeUnsigned(Dictionary.NULL_ID[dict.getSizeOfId()], buf, 0, dict.getSizeOfId()); + container.append(bytes); + container.closeForChange(); + + // copy by serialization + ImmutableBytesWritable copy = container.toBytes(); + CompressedValueContainer container2 = new CompressedValueContainer(info.getDigest(), 0, 500); + container2.fromBytes(copy); + + // check the copy + int i = 0; + for (int v = dict.getMinId(); v <= dict.getMaxId(); v++) { + container2.getValueAt(i++, bytes); + int value = BytesUtil.readUnsigned(bytes.get(), bytes.getOffset(), bytes.getLength()); + assertEquals(v, value); + } + container2.getValueAt(i++, bytes); + int value = BytesUtil.readUnsigned(bytes.get(), bytes.getOffset(), bytes.getLength()); + assertEquals(Dictionary.NULL_ID[dict.getSizeOfId()], value); + assertEquals(container, container2); + } + + @Test + public void testCodec() throws IOException { + List<TableRecord> records = loadRecordsSorted(); + System.out.println(records.size() + " records"); + List<Slice> slices = buildTimeSlices(records); + System.out.println(slices.size() + " slices"); + + IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest()); + List<IIRow> kvs = encodeKVs(codec, slices); + System.out.println(kvs.size() + " KV pairs"); + + List<Slice> slicesCopy = decodeKVs(codec, kvs); + assertEquals(slices.size(), slicesCopy.size()); + for (int i = 0; i < slices.size(); i++) { + assertEquals(slices.get(i), slicesCopy.get(i)); + } + + List<TableRecord> recordsCopy = iterateRecords(slicesCopy); + assertEquals(new HashSet<TableRecord>(records), new HashSet<TableRecord>(recordsCopy)); + dump(recordsCopy); + } + + private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) throws IOException{ + SetMultimap<TblColRef, String> valueMap = HashMultimap.create(); + Set<TblColRef> dimensionColumns = Sets.newHashSet(); + for (int i = 0; i < desc.listAllColumns().size(); i++) { + if (!desc.isMetricsCol(i)) { + dimensionColumns.add(desc.listAllColumns().get(i)); + } + } + for (List<String> row : table) { + for (int i = 0; i < row.size(); i++) { + String cell = row.get(i); + valueMap.put(desc.listAllColumns().get(i), cell); + } + } + Dictionary<?>[] result = new Dictionary<?>[desc.listAllColumns().size()]; + for (TblColRef tblColRef : valueMap.keys()) { + result[desc.findColumn(tblColRef)] = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), + new IterableDictionaryValueEnumerator(Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() { + @Nullable + @Override + public byte[] apply(String input) { + return input.getBytes(); + } + }))); + } + return result; + } + + private List<TableRecord> loadRecordsSorted() throws IOException { + List<TableRecord> records = Lists.newArrayList(); + for (String line : lines) { + String[] fields = line.split(","); + TableRecord rec = info.createTableRecord(); + for (int col = 0; col < fields.length; col++) { + rec.setValueString(col, fields[col]); + } + records.add(rec); + } + + Collections.sort(records, new Comparator<TableRecord>() { + @Override + public int compare(TableRecord a, TableRecord b) { + long x = a.getTimestamp() - b.getTimestamp(); + if (x > 0) + return 1; + else if (x == 0) + return 0; + else + return -1; + } + }); + + return records; + } + + private List<Slice> buildTimeSlices(List<TableRecord> records) throws IOException { + ShardingSliceBuilder builder = new ShardingSliceBuilder(info); + List<Slice> slices = Lists.newArrayList(); + for (TableRecord rec : records) { + //here assume there less records than slice size for each shard + Slice slice = builder.append(rec); + if (slice != null) { + slice.setLocalDictionaries(dictionaryMap); + slices.add(slice); + } + } + List<Slice> finals = builder.close(); + for (Slice slice : finals) { + slice.setLocalDictionaries(dictionaryMap); + } + slices.addAll(finals); + + Collections.sort(slices); + return slices; + } + + private List<IIRow> encodeKVs(IIKeyValueCodec codec, List<Slice> slices) { + + List<IIRow> kvs = Lists.newArrayList(); + for (Slice slice : slices) { + kvs.addAll(codec.encodeKeyValue(slice)); + } + return kvs; + } + + private List<Slice> decodeKVs(IIKeyValueCodec codec, List<IIRow> kvs) { + List<Slice> slices = Lists.newArrayList(); + for (Slice slice : codec.decodeKeyValue(kvs)) { + slices.add(slice); + } + return slices; + } + + private List<TableRecord> iterateRecords(List<Slice> slices) { + List<TableRecord> records = Lists.newArrayList(); + for (Slice slice : slices) { + for (RawTableRecord rec : slice) { + records.add(new TableRecord((RawTableRecord) rec.clone(), info)); + } + } + return records; + } + + private void dump(Iterable<TableRecord> records) { + for (TableRecord rec : records) { + System.out.println(rec.toString()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/LZFTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/LZFTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/LZFTest.java new file mode 100644 index 0000000..943e76c --- /dev/null +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/LZFTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.invertedindex; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.junit.Test; + +import com.ning.compress.lzf.LZFDecoder; +import com.ning.compress.lzf.LZFEncoder; + +/** + */ +public class LZFTest { + @Test + public void test() throws IOException { + + byte[] raw = new byte[] { 1, 2, 3, 3, 2, 23 }; + byte[] data = LZFEncoder.encode(raw); + + byte[] data2 = new byte[data.length * 2]; + java.lang.System.arraycopy(data, 0, data2, 0, data.length); + ImmutableBytesWritable bytes = new ImmutableBytesWritable(); + bytes.set(data2, 0, data.length); + + try { + byte[] uncompressed = LZFDecoder.decode(bytes.get(), bytes.getOffset(), bytes.getLength()); + } catch (IOException e) { + throw new RuntimeException("LZF decode failure", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java deleted file mode 100644 index e715b4f..0000000 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescManagerTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.invertedindex.invertedindex; - -import java.io.IOException; -import java.util.UUID; - -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.invertedindex.IIDescManager; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - */ -public class IIDescManagerTest extends LocalFileMetadataTestCase { - - public static final String TEST_II_DESC_NAME = "test_kylin_ii_left_join_desc"; - - @Before - public void setup() { - createTestMetadata(); - } - - @After - public void clean() { - this.cleanupTestMetadata(); - } - - @Test - public void testCRUD() throws IOException { - IIDescManager mgr = IIDescManager.getInstance(getTestConfig()); - - String newDescName = "Copy of " + TEST_II_DESC_NAME; - - try { - IIDesc testRecord = mgr.getIIDesc(newDescName); - if (testRecord != null) - mgr.removeIIDesc(testRecord); - } catch (IOException e) { - // just ensure the old one is removed - } - - Assert.assertNull(mgr.getIIDesc(newDescName)); - IIDesc desc = mgr.getIIDesc(TEST_II_DESC_NAME); - - desc.setName(newDescName); - desc.setLastModified(0); - - mgr.createIIDesc(desc); - - desc = mgr.getIIDesc(newDescName); - - Assert.assertNotNull(desc); - - mgr.updateIIDesc(desc); // this will trigger cache wipe; please ignore the HTTP error in logs. - - mgr.removeIIDesc(desc); - - Assert.assertNull(mgr.getIIDesc(newDescName)); - - } - - @Test - public void testReload() throws IOException { - IIDescManager mgr = IIDescManager.getInstance(getTestConfig()); - - IIDesc desc = mgr.getIIDesc(TEST_II_DESC_NAME); - - // do some modification - desc.setUuid(UUID.randomUUID().toString()); - - IIDesc newDesc = mgr.getIIDesc(TEST_II_DESC_NAME); - - Assert.assertEquals(desc, newDesc); - - // reload the cache - mgr.reloadIIDescLocal(TEST_II_DESC_NAME); - - newDesc = mgr.getIIDesc(TEST_II_DESC_NAME); - - Assert.assertNotEquals(desc, newDesc); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescTest.java deleted file mode 100644 index 007d36f..0000000 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIDescTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.invertedindex.invertedindex; - -import java.io.IOException; - -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.invertedindex.IIDescManager; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - */ -public class IIDescTest extends LocalFileMetadataTestCase { - - @Before - public void setup() { - this.createTestMetadata(); - - } - - @After - public void clear() { - this.cleanupTestMetadata(); - } - - @Test - public void testGetIIDesc() { - - IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_left_join_desc"); - DataModelDesc model = iiDesc.getModel(); - Assert.assertNotNull(iiDesc); - Assert.assertNotNull(model); - - } - - @Test - public void testSerialization() throws IOException { - IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_left_join_desc"); - String str = JsonUtil.writeValueAsIndentString(iiDesc); - System.out.println(str); - IIDesc desc2 = JsonUtil.readValue(str, IIDesc.class); - - Assert.assertEquals(iiDesc, desc2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java deleted file mode 100644 index 8a0c2ba..0000000 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.invertedindex.invertedindex; - -import java.io.IOException; -import java.util.List; - -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.invertedindex.IIDescManager; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - */ -public class IIInstanceTest extends LocalFileMetadataTestCase { - @Before - public void setup() { - createTestMetadata(); - } - - @After - public void clean() { - cleanupTestMetadata(); - } - - @Test - public void testGetIIsByDesc() throws IOException { - IIManager mgr = IIManager.getInstance(getTestConfig()); - - List<IIInstance> iiInstances = mgr.getIIsByDesc("test_kylin_ii_left_join_desc"); - - Assert.assertTrue(iiInstances.size() > 0); - - } - - @Test - public void testCreateIIInstance() throws IOException { - - IIDesc iiDesc = IIDescManager.getInstance(getTestConfig()).getIIDesc("test_kylin_ii_left_join_desc"); - - IIInstance ii = IIInstance.create("new ii", "default", iiDesc); - - IIManager iiMgr = IIManager.getInstance(getTestConfig()); - - List<IIInstance> allIIList = iiMgr.listAllIIs(); - - iiMgr.createII(ii); - - Assert.assertNotNull(iiMgr.getII("new ii")); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java deleted file mode 100644 index 6e05759..0000000 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.invertedindex.invertedindex; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import javax.annotation.Nullable; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.IterableDictionaryValueEnumerator; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.index.CompressedValueContainer; -import org.apache.kylin.invertedindex.index.RawTableRecord; -import org.apache.kylin.invertedindex.index.ShardingSliceBuilder; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.invertedindex.model.IIRow; -import org.apache.kylin.metadata.model.TblColRef; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; - -public class InvertedIndexLocalTest extends LocalFileMetadataTestCase { - - IIInstance ii; - TableRecordInfo info; - List<String> lines; - private Dictionary<?>[] dictionaryMap; - - @Before - public void setUp() throws Exception { - this.createTestMetadata(); - this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); - - File file = new File(LOCALMETA_TEST_DATA, "data/flatten_data_for_ii.csv"); - FileInputStream in = new FileInputStream(file); - this.lines = IOUtils.readLines(in, "UTF-8"); - in.close(); - - dictionaryMap = buildDictionary(Lists.transform(lines, new Function<String, List<String>>() { - @Nullable - @Override - public List<String> apply(@Nullable String input) { - return Lists.newArrayList(input.split(",")); - } - }), ii.getDescriptor()); - this.info = new TableRecordInfo(ii.getDescriptor(), dictionaryMap); - } - - @After - public void after() throws Exception { - this.cleanupTestMetadata(); - } - - @Test - public void testCompressedValueContainer() { - // create container - CompressedValueContainer container = new CompressedValueContainer(info.getDigest(), 0, 500); - Dictionary<String> dict = info.dict(0); - - byte[] buf = new byte[dict.getSizeOfId()]; - ImmutableBytesWritable bytes = new ImmutableBytesWritable(buf); - - for (int v = dict.getMinId(); v <= dict.getMaxId(); v++) { - BytesUtil.writeUnsigned(v, buf, 0, dict.getSizeOfId()); - container.append(bytes); - } - BytesUtil.writeUnsigned(Dictionary.NULL_ID[dict.getSizeOfId()], buf, 0, dict.getSizeOfId()); - container.append(bytes); - container.closeForChange(); - - // copy by serialization - ImmutableBytesWritable copy = container.toBytes(); - CompressedValueContainer container2 = new CompressedValueContainer(info.getDigest(), 0, 500); - container2.fromBytes(copy); - - // check the copy - int i = 0; - for (int v = dict.getMinId(); v <= dict.getMaxId(); v++) { - container2.getValueAt(i++, bytes); - int value = BytesUtil.readUnsigned(bytes.get(), bytes.getOffset(), bytes.getLength()); - assertEquals(v, value); - } - container2.getValueAt(i++, bytes); - int value = BytesUtil.readUnsigned(bytes.get(), bytes.getOffset(), bytes.getLength()); - assertEquals(Dictionary.NULL_ID[dict.getSizeOfId()], value); - assertEquals(container, container2); - } - - @Test - public void testCodec() throws IOException { - List<TableRecord> records = loadRecordsSorted(); - System.out.println(records.size() + " records"); - List<Slice> slices = buildTimeSlices(records); - System.out.println(slices.size() + " slices"); - - IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest()); - List<IIRow> kvs = encodeKVs(codec, slices); - System.out.println(kvs.size() + " KV pairs"); - - List<Slice> slicesCopy = decodeKVs(codec, kvs); - assertEquals(slices.size(), slicesCopy.size()); - for (int i = 0; i < slices.size(); i++) { - assertEquals(slices.get(i), slicesCopy.get(i)); - } - - List<TableRecord> recordsCopy = iterateRecords(slicesCopy); - assertEquals(new HashSet<TableRecord>(records), new HashSet<TableRecord>(recordsCopy)); - dump(recordsCopy); - } - - private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) throws IOException{ - SetMultimap<TblColRef, String> valueMap = HashMultimap.create(); - Set<TblColRef> dimensionColumns = Sets.newHashSet(); - for (int i = 0; i < desc.listAllColumns().size(); i++) { - if (!desc.isMetricsCol(i)) { - dimensionColumns.add(desc.listAllColumns().get(i)); - } - } - for (List<String> row : table) { - for (int i = 0; i < row.size(); i++) { - String cell = row.get(i); - valueMap.put(desc.listAllColumns().get(i), cell); - } - } - Dictionary<?>[] result = new Dictionary<?>[desc.listAllColumns().size()]; - for (TblColRef tblColRef : valueMap.keys()) { - result[desc.findColumn(tblColRef)] = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), - new IterableDictionaryValueEnumerator(Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() { - @Nullable - @Override - public byte[] apply(String input) { - return input.getBytes(); - } - }))); - } - return result; - } - - private List<TableRecord> loadRecordsSorted() throws IOException { - List<TableRecord> records = Lists.newArrayList(); - for (String line : lines) { - String[] fields = line.split(","); - TableRecord rec = info.createTableRecord(); - for (int col = 0; col < fields.length; col++) { - rec.setValueString(col, fields[col]); - } - records.add(rec); - } - - Collections.sort(records, new Comparator<TableRecord>() { - @Override - public int compare(TableRecord a, TableRecord b) { - long x = a.getTimestamp() - b.getTimestamp(); - if (x > 0) - return 1; - else if (x == 0) - return 0; - else - return -1; - } - }); - - return records; - } - - private List<Slice> buildTimeSlices(List<TableRecord> records) throws IOException { - ShardingSliceBuilder builder = new ShardingSliceBuilder(info); - List<Slice> slices = Lists.newArrayList(); - for (TableRecord rec : records) { - //here assume there less records than slice size for each shard - Slice slice = builder.append(rec); - if (slice != null) { - slice.setLocalDictionaries(dictionaryMap); - slices.add(slice); - } - } - List<Slice> finals = builder.close(); - for (Slice slice : finals) { - slice.setLocalDictionaries(dictionaryMap); - } - slices.addAll(finals); - - Collections.sort(slices); - return slices; - } - - private List<IIRow> encodeKVs(IIKeyValueCodec codec, List<Slice> slices) { - - List<IIRow> kvs = Lists.newArrayList(); - for (Slice slice : slices) { - kvs.addAll(codec.encodeKeyValue(slice)); - } - return kvs; - } - - private List<Slice> decodeKVs(IIKeyValueCodec codec, List<IIRow> kvs) { - List<Slice> slices = Lists.newArrayList(); - for (Slice slice : codec.decodeKeyValue(kvs)) { - slices.add(slice); - } - return slices; - } - - private List<TableRecord> iterateRecords(List<Slice> slices) { - List<TableRecord> records = Lists.newArrayList(); - for (Slice slice : slices) { - for (RawTableRecord rec : slice) { - records.add(new TableRecord((RawTableRecord) rec.clone(), info)); - } - } - return records; - } - - private void dump(Iterable<TableRecord> records) { - for (TableRecord rec : records) { - System.out.println(rec.toString()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/LZFTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/LZFTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/LZFTest.java deleted file mode 100644 index d39c421..0000000 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/LZFTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.invertedindex.invertedindex; - -import java.io.IOException; - -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.junit.Test; - -import com.ning.compress.lzf.LZFDecoder; -import com.ning.compress.lzf.LZFEncoder; - -/** - */ -public class LZFTest { - @Test - public void test() throws IOException { - - byte[] raw = new byte[] { 1, 2, 3, 3, 2, 23 }; - byte[] data = LZFEncoder.encode(raw); - - byte[] data2 = new byte[data.length * 2]; - java.lang.System.arraycopy(data, 0, data2, 0, data.length); - ImmutableBytesWritable bytes = new ImmutableBytesWritable(); - bytes.set(data2, 0, data.length); - - try { - byte[] uncompressed = LZFDecoder.decode(bytes.get(), bytes.getOffset(), bytes.getLength()); - } catch (IOException e) { - throw new RuntimeException("LZF decode failure", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java new file mode 100644 index 0000000..27a4f71 --- /dev/null +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java @@ -0,0 +1,45 @@ +package org.apache.kylin.invertedindex.measure; + +import org.apache.kylin.invertedindex.measure.FixedPointLongCodec; +import org.apache.kylin.metadata.model.DataType; +import org.junit.Test; + +/** + */ +public class FixedPointLongCodecTest { + + @Test + public void testEncode1() { + FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); + long x = codec.getValueIgnoringDecimalPoint("12.12345"); + org.junit.Assert.assertEquals(1212345, x); + } + + @Test + public void testEncode2() { + FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); + long x = codec.getValueIgnoringDecimalPoint("12.1234"); + org.junit.Assert.assertEquals(1212340, x); + } + + @Test + public void testEncode3() { + FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); + long x = codec.getValueIgnoringDecimalPoint("12.123456"); + org.junit.Assert.assertEquals(1212345, x); + } + + @Test + public void testEncode4() { + FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); + long x = codec.getValueIgnoringDecimalPoint("12"); + org.junit.Assert.assertEquals(1200000, x); + } + + @Test + public void testDecode1() { + FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)")); + String x = codec.restoreDecimalPoint(1212345); + org.junit.Assert.assertEquals("12.12345", x); + } +}