http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java deleted file mode 100644 index c954a33..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.compression; - -public interface Compression { - byte[] compress(byte[] input); - byte[] decompress(byte[] input); -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java index 2e8eff2..6b3a365 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java @@ -21,7 +21,7 @@ import java.io.IOException; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -37,7 +37,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec { protected final Compressor compressor; // statistics of this page, can be used by subclass - protected final ColumnPageStatsVO stats; + protected final SimpleStatsResult stats; // the data type used for storage protected final DataType targetDataType; @@ -46,7 +46,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec { protected final DataType srcDataType; protected AdaptiveCompressionCodec(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { + SimpleStatsResult stats, Compressor compressor) { this.stats = stats; this.srcDataType = srcDataType; this.targetDataType = targetDataType; @@ -55,7 +55,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec { public abstract String getName(); - public abstract byte[] encode(ColumnPage input) throws MemoryException, IOException; + public abstract EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException; public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException; http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java deleted file mode 100644 index 3d56f0c..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import java.io.IOException; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.LazyColumnPage; -import org.apache.carbondata.core.datastore.page.PrimitiveCodec; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * Codec for integer (byte, short, int, long) data type page. - * This codec will do type casting on page data to make storage minimum. - */ -class AdaptiveIntegerCodec extends AdaptiveCompressionCodec { - - private ColumnPage encodedPage; - - public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - return new AdaptiveIntegerCodec(srcDataType, targetDataType, stats, compressor); - } - - private AdaptiveIntegerCodec(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - super(srcDataType, targetDataType, stats, compressor); - } - - @Override - public String getName() { - return "AdaptiveIntegerCodec"; - } - - @Override - public byte[] encode(ColumnPage input) throws MemoryException, IOException { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); - input.encode(codec); - byte[] result = encodedPage.compress(compressor); - encodedPage.freeMemory(); - return result; - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return ColumnPage.decompress(compressor, targetDataType, input, offset, length); - } else { - ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); - return LazyColumnPage.newPage(page, codec); - } - } - - // encoded value = (type cast page value to target data type) - private PrimitiveCodec codec = new PrimitiveCodec() { - @Override - public void encode(int rowId, byte value) { - switch (targetDataType) { - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, short value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte) value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, int value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte) value); - break; - case SHORT: - encodedPage.putShort(rowId, (short) value); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, long value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte) value); - break; - case SHORT: - encodedPage.putShort(rowId, (short) value); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int) value); - break; - case INT: - encodedPage.putInt(rowId, (int) value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, float value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte) value); - break; - case SHORT: - encodedPage.putShort(rowId, (short) value); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int) value); - break; - case INT: - encodedPage.putInt(rowId, (int) value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, double value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte) value); - break; - case SHORT: - encodedPage.putShort(rowId, (short) value); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int) value); - break; - case INT: - encodedPage.putInt(rowId, (int) value); - break; - case LONG: - encodedPage.putLong(rowId, (long) value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public long decodeLong(byte value) { - return value; - } - - @Override - public long decodeLong(short value) { - return value; - } - - @Override - public long decodeLong(int value) { - return value; - } - - @Override - public double decodeDouble(byte value) { - return value; - } - - @Override - public double decodeDouble(short value) { - return value; - } - - @Override - public double decodeDouble(int value) { - return value; - } - - @Override - public double decodeDouble(long value) { - return value; - } - - @Override - public double decodeDouble(float value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public double decodeDouble(double value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - }; - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java new file mode 100644 index 0000000..096c51a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegralCodec.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.PrimitiveCodec; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.CodecMetaFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * Codec for integer (byte, short, int, long) data type page. + * This codec will do type casting on page data to make storage minimum. + */ +class AdaptiveIntegralCodec extends AdaptiveCompressionCodec { + + private ColumnPage encodedPage; + + public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats, Compressor compressor) { + return new AdaptiveIntegralCodec(srcDataType, targetDataType, stats, compressor); + } + + private AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats, Compressor compressor) { + super(srcDataType, targetDataType, stats, compressor); + } + + @Override + public String getName() { + return "AdaptiveIntegralCodec"; + } + + @Override + public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException { + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.encode(codec); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return new EncodedMeasurePage(input.getPageSize(), result, + CodecMetaFactory.createMeta(stats, targetDataType), + ((SimpleStatsResult)input.getStatistics()).getNullBits()); + } + + @Override + public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { + ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + return LazyColumnPage.newPage(page, codec); + } + + // encoded value = (type cast page value to target data type) + private PrimitiveCodec codec = new PrimitiveCodec() { + @Override + public void encode(int rowId, byte value) { + switch (targetDataType) { + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, short value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, int value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, long value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int) value); + break; + case INT: + encodedPage.putInt(rowId, (int) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, float value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int) value); + break; + case INT: + encodedPage.putInt(rowId, (int) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, double value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int) value); + break; + case INT: + encodedPage.putInt(rowId, (int) value); + break; + case LONG: + encodedPage.putLong(rowId, (long) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public long decodeLong(byte value) { + return value; + } + + @Override + public long decodeLong(short value) { + return value; + } + + @Override + public long decodeLong(int value) { + return value; + } + + @Override + public double decodeDouble(byte value) { + return value; + } + + @Override + public double decodeDouble(short value) { + return value; + } + + @Override + public double decodeDouble(int value) { + return value; + } + + @Override + public double decodeDouble(long value) { + return value; + } + + @Override + public double decodeDouble(float value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public double decodeDouble(double value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + }; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java index 36d5989..a77bf69 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java @@ -34,11 +34,9 @@ public interface ColumnPageCodec { String getName(); /** - * apply a column page and output encoded byte array - * @param input column page to apply - * @return encoded data + * encode a column page and return the encoded data */ - byte[] encode(ColumnPage input) throws MemoryException, IOException; + EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException; /** * decode byte array from offset to a column page @@ -48,4 +46,5 @@ public interface ColumnPageCodec { * @return decoded data */ ColumnPage decode(byte[] input, int offset, int length) throws MemoryException; + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java index 3818263..d2d3a44 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java @@ -19,7 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; import org.apache.carbondata.core.metadata.datatype.DataType; /** @@ -32,29 +32,64 @@ public class DefaultEncodingStrategy extends EncodingStrategy { private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1; private static final int THREE_BYTES_MIN = - THREE_BYTES_MAX - 1; - // fit the long input value into minimum data type - private static DataType fitDataType(long value) { - if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) { + private DataType fitLongMinMax(long max, long min) { + if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) { return DataType.BYTE; - } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) { + } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) { return DataType.SHORT; - } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) { + } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) { return DataType.SHORT_INT; - } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) { + } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) { return DataType.INT; } else { return DataType.LONG; } } - private DataType fitDataType(long max, long min) { - if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) { + private DataType fitMinMax(DataType dataType, Object max, Object min) { + switch (dataType) { + case BYTE: + return fitLongMinMax((byte) max, (byte) min); + case SHORT: + return fitLongMinMax((short) max, (short) min); + case INT: + return fitLongMinMax((int) max, (int) min); + case LONG: + return fitLongMinMax((long) max, (long) min); + case DOUBLE: + return DataType.DOUBLE; + default: + throw new RuntimeException("internal error: " + dataType); + } + } + + // fit the long input value into minimum data type + private DataType fitDelta(DataType dataType, Object max, Object min) { + // use long data type to calculate delta to avoid overflow + long value; + switch (dataType) { + case BYTE: + value = (long)(byte) max - (long)(byte) min; + break; + case SHORT: + value = (long)(short) max - (long)(short) min; + break; + case INT: + value = (long)(int) max - (long)(int) min; + break; + case LONG: + // TODO: add overflow detection and return delta type + return DataType.LONG; + default: + throw new RuntimeException("internal error: " + dataType); + } + if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) { return DataType.BYTE; - } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) { + } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) { return DataType.SHORT; - } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) { + } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) { return DataType.SHORT_INT; - } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) { + } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) { return DataType.INT; } else { return DataType.LONG; @@ -63,10 +98,9 @@ public class DefaultEncodingStrategy extends EncodingStrategy { // choose between adaptive encoder or delta adaptive encoder, based on whose target data type // size is smaller - @Override - ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) { + @Override ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats) { DataType srcDataType = stats.getDataType(); - DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin()); + DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin()); DataType deltaDataType; // TODO: this handling is for data compatibility, change to Override check when implementing @@ -74,36 +108,33 @@ public class DefaultEncodingStrategy extends EncodingStrategy { if (adaptiveDataType == DataType.LONG) { deltaDataType = DataType.LONG; } else { - deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin()); + deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin()); } if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType.getSizeInBytes()) { // no effect to use adaptive or delta, use compression only - return DirectCompressCodec.newInstance(srcDataType, compressor); + return DirectCompressCodec.newInstance(stats, compressor); } if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) { // choose adaptive encoding - return AdaptiveIntegerCodec.newInstance( + return AdaptiveIntegralCodec.newInstance( stats.getDataType(), adaptiveDataType, stats, compressor); } else { // choose delta adaptive encoding - return DeltaIntegerCodec.newInstance(stats.getDataType(), deltaDataType, stats, compressor); + return DeltaIntegralCodec.newInstance(stats.getDataType(), deltaDataType, stats, compressor); } } - @Override - ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) { - return DirectCompressCodec.newInstance(stats.getDataType(), compressor); + @Override ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats) { + return DirectCompressCodec.newInstance(stats, compressor); } // for decimal, currently it is a very basic implementation - @Override - ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) { - return DirectCompressCodec.newInstance(stats.getDataType(), compressor); + @Override ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats) { + return DirectCompressCodec.newInstance(stats, compressor); } - @Override - ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) { - return DirectCompressCodec.newInstance(stats.getDataType(), compressor); + @Override ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats) { + return DirectCompressCodec.newInstance(stats, compressor); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java deleted file mode 100644 index 6cf59a6..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import java.io.IOException; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.LazyColumnPage; -import org.apache.carbondata.core.datastore.page.PrimitiveCodec; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * Codec for integer (byte, short, int, long) data type page. - * This codec will calculate delta of page max value and page value, - * and do type casting of the diff to make storage minimum. - */ -public class DeltaIntegerCodec extends AdaptiveCompressionCodec { - - private ColumnPage encodedPage; - - private long max; - - public static DeltaIntegerCodec newInstance(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - return new DeltaIntegerCodec(srcDataType, targetDataType, stats, compressor); - } - - private DeltaIntegerCodec(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - super(srcDataType, targetDataType, stats, compressor); - switch (srcDataType) { - case BYTE: - case SHORT: - case INT: - case LONG: - max = (long) stats.getMax(); - break; - case FLOAT: - case DOUBLE: - max = (long)((double) stats.getMax()); - break; - } - } - - @Override - public String getName() { - return "DeltaIntegerCodec"; - } - - @Override - public byte[] encode(ColumnPage input) throws MemoryException, IOException { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); - input.encode(codec); - byte[] result = encodedPage.compress(compressor); - encodedPage.freeMemory(); - return result; - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return ColumnPage.decompress(compressor, targetDataType, input, offset, length); - } else { - ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); - return LazyColumnPage.newPage(page, codec); - } - } - - // encoded value = (max value of page) - (page value) - private PrimitiveCodec codec = new PrimitiveCodec() { - @Override - public void encode(int rowId, byte value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, short value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, int value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, long value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - case LONG: - encodedPage.putLong(rowId, max - value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, float value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - case LONG: - encodedPage.putLong(rowId, (long)(max - value)); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, double value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - case LONG: - encodedPage.putLong(rowId, (long)(max - value)); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public long decodeLong(byte value) { - return max - value; - } - - @Override - public long decodeLong(short value) { - return max - value; - } - - @Override - public long decodeLong(int value) { - return max - value; - } - - @Override - public double decodeDouble(byte value) { - return max - value; - } - - @Override - public double decodeDouble(short value) { - return max - value; - } - - @Override - public double decodeDouble(int value) { - return max - value; - } - - @Override - public double decodeDouble(long value) { - return max - value; - } - - @Override - public double decodeDouble(float value) { - // this codec is for integer type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public double decodeDouble(double value) { - // this codec is for integer type only - throw new RuntimeException("internal error: " + debugInfo()); - } - }; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java new file mode 100644 index 0000000..ad50bfc --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegralCodec.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.PrimitiveCodec; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.CodecMetaFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * Codec for integer (byte, short, int, long) data type page. + * This codec will calculate delta of page max value and page value, + * and do type casting of the diff to make storage minimum. + */ +public class DeltaIntegralCodec extends AdaptiveCompressionCodec { + + private ColumnPage encodedPage; + + private long max; + + public static DeltaIntegralCodec newInstance(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats, Compressor compressor) { + return new DeltaIntegralCodec(srcDataType, targetDataType, stats, compressor); + } + + private DeltaIntegralCodec(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats, Compressor compressor) { + super(srcDataType, targetDataType, stats, compressor); + switch (srcDataType) { + case BYTE: + max = (byte) stats.getMax(); + break; + case SHORT: + max = (short) stats.getMax(); + break; + case INT: + max = (int) stats.getMax(); + break; + case LONG: + max = (long) stats.getMax(); + break; + case FLOAT: + case DOUBLE: + max = (long)((double) stats.getMax()); + break; + } + } + + @Override + public String getName() { + return "DeltaIntegralCodec"; + } + + @Override + public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException { + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.encode(codec); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return new EncodedMeasurePage(input.getPageSize(), + result, + CodecMetaFactory.createMeta(stats, targetDataType), + ((SimpleStatsResult)input.getStatistics()).getNullBits()); + } + + @Override + public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { + ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + return LazyColumnPage.newPage(page, codec); + } + + // encoded value = (max value of page) - (page value) + private PrimitiveCodec codec = new PrimitiveCodec() { + @Override + public void encode(int rowId, byte value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, short value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, int value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, long value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, max - value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, float value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, (long)(max - value)); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, double value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, (long)(max - value)); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public long decodeLong(byte value) { + return max - value; + } + + @Override + public long decodeLong(short value) { + return max - value; + } + + @Override + public long decodeLong(int value) { + return max - value; + } + + @Override + public double decodeDouble(byte value) { + return max - value; + } + + @Override + public double decodeDouble(short value) { + return max - value; + } + + @Override + public double decodeDouble(int value) { + return max - value; + } + + @Override + public double decodeDouble(long value) { + return max - value; + } + + @Override + public double decodeDouble(float value) { + // this codec is for integer type only + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public double decodeDouble(double value) { + // this codec is for integer type only + throw new RuntimeException("internal error: " + debugInfo()); + } + }; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java index dcb9b7c..5da0197 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java @@ -21,8 +21,11 @@ import java.io.IOException; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.PrimitiveCodec; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.CodecMetaFactory; /** * This codec directly apply compression on the input data @@ -30,15 +33,15 @@ import org.apache.carbondata.core.metadata.datatype.DataType; public class DirectCompressCodec implements ColumnPageCodec { private Compressor compressor; - private DataType dataType; + private SimpleStatsResult stats; - private DirectCompressCodec(DataType dataType, Compressor compressor) { + private DirectCompressCodec(SimpleStatsResult stats, Compressor compressor) { this.compressor = compressor; - this.dataType = dataType; + this.stats = stats; } - public static DirectCompressCodec newInstance(DataType dataType, Compressor compressor) { - return new DirectCompressCodec(dataType, compressor); + public static DirectCompressCodec newInstance(SimpleStatsResult stats, Compressor compressor) { + return new DirectCompressCodec(stats, compressor); } @Override @@ -47,12 +50,87 @@ public class DirectCompressCodec implements ColumnPageCodec { } @Override - public byte[] encode(ColumnPage input) throws IOException, MemoryException { - return input.compress(compressor); + public EncodedColumnPage encode(ColumnPage input) throws IOException, MemoryException { + byte[] result = input.compress(compressor); + return new EncodedMeasurePage(input.getPageSize(), result, + CodecMetaFactory.createMeta(stats, stats.getDataType()), + ((SimpleStatsResult)input.getStatistics()).getNullBits()); } @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - return ColumnPage.decompress(compressor, dataType, input, offset, length); + ColumnPage page = ColumnPage.decompress(compressor, stats.getDataType(), input, offset, length); + return LazyColumnPage.newPage(page, codec); } + + private PrimitiveCodec codec = new PrimitiveCodec() { + @Override + public void encode(int rowId, byte value) { + } + + @Override + public void encode(int rowId, short value) { + } + + @Override + public void encode(int rowId, int value) { + } + + @Override + public void encode(int rowId, long value) { + } + + @Override + public void encode(int rowId, float value) { + } + + @Override + public void encode(int rowId, double value) { + } + + @Override + public long decodeLong(byte value) { + return value; + } + + @Override + public long decodeLong(short value) { + return value; + } + + @Override + public long decodeLong(int value) { + return value; + } + + @Override + public double decodeDouble(byte value) { + return value; + } + + @Override + public double decodeDouble(short value) { + return value; + } + + @Override + public double decodeDouble(int value) { + return value; + } + + @Override + public double decodeDouble(long value) { + return value; + } + + @Override + public double decodeDouble(float value) { + return value; + } + + @Override + public double decodeDouble(double value) { + return value; + } + }; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java new file mode 100644 index 0000000..1630e06 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.BitSet; + +import org.apache.carbondata.format.DataChunk2; + +/** + * An column page after encoding and compression. + */ +public abstract class EncodedColumnPage { + + // number of row of this page + protected int pageSize; + + // encoded and compressed column page data + protected byte[] encodedData; + + protected BitSet nullBitSet; + + // metadata of this page + protected DataChunk2 dataChunk2; + + EncodedColumnPage(int pageSize, byte[] encodedData) { + this.pageSize = pageSize; + this.encodedData = encodedData; + } + + public abstract DataChunk2 buildDataChunk2() throws IOException; + + /** + * return the encoded and compressed data page + */ + public byte[] getEncodedData() { + return encodedData; + } + + /** + * return the size of the s + */ + public int getSerializedSize() { + return encodedData.length; + } + + public ByteBuffer serialize() { + return ByteBuffer.wrap(encodedData); + } + + public DataChunk2 getDataChunk2() { + return dataChunk2; + } + + public void setNullBitSet(BitSet nullBitSet) { + this.nullBitSet = nullBitSet; + } + + public BitSet getNullBitSet() { + return nullBitSet; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java deleted file mode 100644 index 0d1b2e4..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import org.apache.carbondata.core.datastore.columnar.IndexStorage; - -// result result of all columns -public class EncodedData { - // dimension data that include rowid (index) - public IndexStorage[] indexStorages; - - // encoded and compressed dimension data - public byte[][] dimensions; - - // encoded and compressed measure data - public byte[][] measures; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java new file mode 100644 index 0000000..30d58cf --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedDimensionPage.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.DimensionType; +import org.apache.carbondata.core.datastore.columnar.IndexStorage; +import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.format.BlockletMinMaxIndex; +import org.apache.carbondata.format.DataChunk2; +import org.apache.carbondata.format.Encoding; +import org.apache.carbondata.format.SortState; + +/** + * Encoded dimension page that include data and inverted index + */ +public class EncodedDimensionPage extends EncodedColumnPage { + private IndexStorage indexStorage; + private DimensionType dimensionType; + + public EncodedDimensionPage(int pageSize, byte[] encodedData, IndexStorage indexStorage, + DimensionType dimensionType) { + super(pageSize, encodedData); + this.indexStorage = indexStorage; + this.dimensionType = dimensionType; + this.dataChunk2 = buildDataChunk2(); + } + + private int getTotalRowIdPageLengthInBytes() { + return CarbonCommonConstants.INT_SIZE_IN_BYTE + + indexStorage.getRowIdPageLengthInBytes() + indexStorage.getRowIdRlePageLengthInBytes(); + } + + @Override + public int getSerializedSize() { + int size = encodedData.length; + if (indexStorage.getRowIdPageLengthInBytes() > 0) { + size += getTotalRowIdPageLengthInBytes(); + } + if (indexStorage.getDataRlePageLengthInBytes() > 0) { + size += indexStorage.getDataRlePageLengthInBytes(); + } + return size; + } + + @Override + public ByteBuffer serialize() { + ByteBuffer buffer = ByteBuffer.allocate(getSerializedSize()); + buffer.put(encodedData); + if (indexStorage.getRowIdPageLengthInBytes() > 0) { + buffer.putInt(indexStorage.getRowIdPageLengthInBytes()); + short[] rowIdPage = (short[])indexStorage.getRowIdPage(); + for (short rowId : rowIdPage) { + buffer.putShort(rowId); + } + if (indexStorage.getRowIdRlePageLengthInBytes() > 0) { + short[] rowIdRlePage = (short[])indexStorage.getRowIdRlePage(); + for (short rowIdRle : rowIdRlePage) { + buffer.putShort(rowIdRle); + } + } + } + if (indexStorage.getDataRlePageLengthInBytes() > 0) { + short[] dataRlePage = (short[])indexStorage.getDataRlePage(); + for (short dataRle : dataRlePage) { + buffer.putShort(dataRle); + } + } + buffer.flip(); + return buffer; + } + + @Override + public DataChunk2 buildDataChunk2() { + DataChunk2 dataChunk = new DataChunk2(); + dataChunk.min_max = new BlockletMinMaxIndex(); + dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta()); + dataChunk.setNumberOfRowsInpage(pageSize); + List<Encoding> encodings = new ArrayList<Encoding>(); + dataChunk.setData_page_length(encodedData.length); + if (dimensionType == DimensionType.GLOBAL_DICTIONARY || + dimensionType == DimensionType.DIRECT_DICTIONARY || + dimensionType == DimensionType.COMPLEX) { + encodings.add(Encoding.DICTIONARY); + } + if (dimensionType == DimensionType.DIRECT_DICTIONARY) { + encodings.add(Encoding.DIRECT_DICTIONARY); + } + if (indexStorage.getDataRlePageLengthInBytes() > 0 || + dimensionType == DimensionType.GLOBAL_DICTIONARY) { + dataChunk.setRle_page_length(indexStorage.getDataRlePageLengthInBytes()); + encodings.add(Encoding.RLE); + } + SortState sort = (indexStorage.getRowIdPageLengthInBytes() > 0) ? + SortState.SORT_EXPLICIT : SortState.SORT_NATIVE; + dataChunk.setSort_state(sort); + if (indexStorage.getRowIdPageLengthInBytes() > 0) { + dataChunk.setRowid_page_length(getTotalRowIdPageLengthInBytes()); + encodings.add(Encoding.INVERTED_INDEX); + } + if (dimensionType == DimensionType.PLAIN_VALUE) { + dataChunk.min_max.addToMax_values(ByteBuffer.wrap( + TablePageStatistics.updateMinMaxForNoDictionary(indexStorage.getMax()))); + dataChunk.min_max.addToMin_values(ByteBuffer.wrap( + TablePageStatistics.updateMinMaxForNoDictionary(indexStorage.getMin()))); + } else { + dataChunk.min_max.addToMax_values(ByteBuffer.wrap(indexStorage.getMax())); + dataChunk.min_max.addToMin_values(ByteBuffer.wrap(indexStorage.getMin())); + } + dataChunk.setEncoders(encodings); + return dataChunk; + } + + public IndexStorage getIndexStorage() { + return indexStorage; + } + + public DimensionType getDimensionType() { + return dimensionType; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java new file mode 100644 index 0000000..0ef48c6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedMeasurePage.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.metadata.ColumnPageCodecMeta; +import org.apache.carbondata.core.metadata.ValueEncoderMeta; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.BlockletMinMaxIndex; +import org.apache.carbondata.format.DataChunk2; +import org.apache.carbondata.format.Encoding; +import org.apache.carbondata.format.PresenceMeta; + +/** + * Encoded measure page that include data and statistics + */ +public class EncodedMeasurePage extends EncodedColumnPage { + + private ValueEncoderMeta metaData; + + public EncodedMeasurePage(int pageSize, byte[] encodedData, ValueEncoderMeta metaData, + BitSet nullBitSet) throws IOException { + super(pageSize, encodedData); + this.metaData = metaData; + this.nullBitSet = nullBitSet; + this.dataChunk2 = buildDataChunk2(); + } + + @Override + public DataChunk2 buildDataChunk2() throws IOException { + DataChunk2 dataChunk = new DataChunk2(); + dataChunk.min_max = new BlockletMinMaxIndex(); + dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta()); + dataChunk.setNumberOfRowsInpage(pageSize); + dataChunk.setData_page_length(encodedData.length); + dataChunk.setRowMajor(false); + // TODO : Change as per this encoders. + List<Encoding> encodings = new ArrayList<Encoding>(); + encodings.add(Encoding.DELTA); + dataChunk.setEncoders(encodings); + PresenceMeta presenceMeta = new PresenceMeta(); + presenceMeta.setPresent_bit_streamIsSet(true); + Compressor compressor = CompressorFactory.getInstance().getCompressor(); + presenceMeta.setPresent_bit_stream(compressor.compressByte(nullBitSet.toByteArray())); + dataChunk.setPresence(presenceMeta); + List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>(); + if (metaData instanceof ColumnPageCodecMeta) { + ColumnPageCodecMeta meta = (ColumnPageCodecMeta) metaData; + encoderMetaList.add(ByteBuffer.wrap(meta.serialize())); + dataChunk.min_max.addToMax_values(ByteBuffer.wrap(meta.getMaxAsBytes())); + dataChunk.min_max.addToMin_values(ByteBuffer.wrap(meta.getMinAsBytes())); + } else { + encoderMetaList.add(ByteBuffer.wrap(CarbonUtil.serializeEncodeMetaUsingByteBuffer(metaData))); + dataChunk.min_max.addToMax_values(ByteBuffer.wrap(CarbonUtil.getMaxValueAsBytes(metaData))); + dataChunk.min_max.addToMin_values(ByteBuffer.wrap(CarbonUtil.getMinValueAsBytes(metaData))); + } + dataChunk.setEncoder_meta(encoderMetaList); + return dataChunk; + } + + public ValueEncoderMeta getMetaData() { + return metaData; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java index 49fb625..17f7ee3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java @@ -17,7 +17,9 @@ package org.apache.carbondata.core.datastore.page.encoding; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.ColumnPageCodecMeta; import org.apache.carbondata.core.metadata.ValueEncoderMeta; /** @@ -28,13 +30,13 @@ public abstract class EncodingStrategy { /** * create codec based on the page data type and statistics */ - public ColumnPageCodec createCodec(ColumnPageStatsVO stats) { + public ColumnPageCodec createCodec(SimpleStatsResult stats) { switch (stats.getDataType()) { case BYTE: case SHORT: case INT: case LONG: - return newCodecForIntegerType(stats); + return newCodecForIntegralType(stats); case FLOAT: case DOUBLE: return newCodecForFloatingType(stats); @@ -52,20 +54,58 @@ public abstract class EncodingStrategy { * create codec based on the page data type and statistics contained by ValueEncoderMeta */ public ColumnPageCodec createCodec(ValueEncoderMeta meta) { - ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta); - return createCodec(stats); + if (meta instanceof ColumnPageCodecMeta) { + ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta; + SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta); + switch (codecMeta.getSrcDataType()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return newCodecForIntegralType(stats); + case FLOAT: + case DOUBLE: + return newCodecForFloatingType(stats); + case DECIMAL: + return newCodecForDecimalType(stats); + case BYTE_ARRAY: + // no dictionary dimension + return newCodecForByteArrayType(stats); + default: + throw new RuntimeException("unsupported data type: " + stats.getDataType()); + } + } else { + SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(meta); + switch (meta.getType()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return newCodecForIntegralType(stats); + case FLOAT: + case DOUBLE: + return newCodecForFloatingType(stats); + case DECIMAL: + return newCodecForDecimalType(stats); + case BYTE_ARRAY: + // no dictionary dimension + return newCodecForByteArrayType(stats); + default: + throw new RuntimeException("unsupported data type: " + stats.getDataType()); + } + } } // for byte, short, int, long - abstract ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats); + abstract ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats); // for float, double - abstract ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats); + abstract ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats); // for decimal - abstract ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats); + abstract ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats); // for byte array - abstract ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats); + abstract ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java new file mode 100644 index 0000000..a0ae06b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/key/TablePageKey.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.key; + +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.util.NonDictionaryUtil; + +public class TablePageKey { + private int pageSize; + + private byte[][] currentNoDictionaryKey; + + // MDK start key + private byte[] startKey; + + // MDK end key + private byte[] endKey; + + // startkey for no dictionary columns + private byte[][] noDictStartKey; + + // endkey for no diciotn + private byte[][] noDictEndKey; + + // startkey for no dictionary columns after packing into one column + private byte[] packedNoDictStartKey; + + // endkey for no dictionary columns after packing into one column + private byte[] packedNoDictEndKey; + + private KeyGenerator mdkGenerator; + private SegmentProperties segmentProperties; + private boolean hasNoDictionary; + + public TablePageKey(int pageSize, KeyGenerator mdkGenerator, SegmentProperties segmentProperties, + boolean hasNoDictionary) { + this.pageSize = pageSize; + this.mdkGenerator = mdkGenerator; + this.segmentProperties = segmentProperties; + this.hasNoDictionary = hasNoDictionary; + } + + /** update all keys based on the input row */ + public void update(int rowId, CarbonRow row, byte[] mdk) throws KeyGenException { + if (hasNoDictionary) { + currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row); + } + if (rowId == 0) { + startKey = mdk; + noDictStartKey = currentNoDictionaryKey; + } + endKey = mdk; + noDictEndKey = currentNoDictionaryKey; + if (rowId == pageSize - 1) { + finalizeKeys(); + } + } + + public Object getKey() { + return this; + } + + /** update all keys if SORT_COLUMNS option is used when creating table */ + private void finalizeKeys() { + // If SORT_COLUMNS is used, may need to update start/end keys since the they may + // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from + // start/end key + int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns(); + if (numberOfDictSortColumns > 0) { + // if SORT_COLUMNS contain dictionary columns + int[] keySize = segmentProperties.getFixedLengthKeySplitter().getBlockKeySize(); + if (keySize.length > numberOfDictSortColumns) { + // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here + int newMdkLength = 0; + for (int i = 0; i < numberOfDictSortColumns; i++) { + newMdkLength += keySize[i]; + } + byte[] newStartKeyOfSortKey = new byte[newMdkLength]; + byte[] newEndKeyOfSortKey = new byte[newMdkLength]; + System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength); + System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength); + startKey = newStartKeyOfSortKey; + endKey = newEndKeyOfSortKey; + } + } else { + startKey = new byte[0]; + endKey = new byte[0]; + } + + // Do the same update for noDictionary start/end Key + int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns(); + if (numberOfNoDictSortColumns > 0) { + // if sort_columns contain no-dictionary columns + if (noDictStartKey.length > numberOfNoDictSortColumns) { + byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][]; + byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][]; + System.arraycopy( + noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns); + System.arraycopy( + noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns); + noDictStartKey = newNoDictionaryStartKey; + noDictEndKey = newNoDictionaryEndKey; + } + packedNoDictStartKey = + NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey); + packedNoDictEndKey = + NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey); + } else { + noDictStartKey = new byte[0][]; + noDictEndKey = new byte[0][]; + packedNoDictStartKey = new byte[0]; + packedNoDictEndKey = new byte[0]; + } + } + + public byte[] getStartKey() { + return startKey; + } + + public byte[] getEndKey() { + return endKey; + } + + public byte[] getNoDictStartKey() { + return packedNoDictStartKey; + } + + public byte[] getNoDictEndKey() { + return packedNoDictEndKey; + } + + public int getPageSize() { + return pageSize; + } + + public byte[] serializeStartKey() { + byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(getNoDictStartKey()); + ByteBuffer buffer = ByteBuffer.allocate( + CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE + + startKey.length + updatedNoDictionaryStartKey.length); + buffer.putInt(startKey.length); + buffer.putInt(updatedNoDictionaryStartKey.length); + buffer.put(startKey); + buffer.put(updatedNoDictionaryStartKey); + buffer.rewind(); + return buffer.array(); + } + + public byte[] serializeEndKey() { + byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(getNoDictEndKey()); + ByteBuffer buffer = ByteBuffer.allocate( + CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE + + endKey.length + updatedNoDictionaryEndKey.length); + buffer.putInt(endKey.length); + buffer.putInt(updatedNoDictionaryEndKey.length); + buffer.put(endKey); + buffer.put(updatedNoDictionaryEndKey); + buffer.rewind(); + return buffer.array(); + } + + /** + * Below method will be used to update the no dictionary start and end key + * + * @param key key to be updated + * @return return no dictionary key + */ + public byte[] updateNoDictionaryStartAndEndKey(byte[] key) { + if (key.length == 0) { + return key; + } + // add key to byte buffer remove the length part of the data + ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2); + // create a output buffer without length + ByteBuffer output = ByteBuffer.allocate(key.length - 2); + short numberOfByteToStorLength = 2; + // as length part is removed, so each no dictionary value index + // needs to be reshuffled by 2 bytes + int NumberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns(); + for (int i = 0; i < NumberOfNoDictSortColumns; i++) { + output.putShort((short) (buffer.getShort() - numberOfByteToStorLength)); + } + // copy the data part + while (buffer.hasRemaining()) { + output.put(buffer.get()); + } + output.rewind(); + return output.array(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java new file mode 100644 index 0000000..5439a29 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.statistics; + +public interface ColumnPageStatsCollector { + void updateNull(int rowId); + void update(byte value); + void update(short value); + void update(int value); + void update(long value); + void update(double value); + void update(byte[] value); + + /** + * return the collected statistics + */ + Object getPageStats(); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java deleted file mode 100644 index 058699a..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.statistics; - -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.apache.carbondata.core.metadata.ValueEncoderMeta; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.util.DataTypeUtil; - -/** statics for one column page */ -public class ColumnPageStatsVO { - private DataType dataType; - - /** min and max value of the measures */ - private Object min, max; - - /** - * the unique value is the non-exist value in the row, - * and will be used as storage key for null values of measures - */ - private Object nonExistValue; - - /** decimal count of the measures */ - private int decimal; - - public ColumnPageStatsVO(DataType dataType) { - this.dataType = dataType; - switch (dataType) { - case SHORT: - case INT: - case LONG: - max = Long.MIN_VALUE; - min = Long.MAX_VALUE; - nonExistValue = Long.MIN_VALUE; - break; - case DOUBLE: - max = Double.MIN_VALUE; - min = Double.MAX_VALUE; - nonExistValue = Double.MIN_VALUE; - break; - case DECIMAL: - max = new BigDecimal(Double.MIN_VALUE); - min = new BigDecimal(Double.MAX_VALUE); - nonExistValue = new BigDecimal(Double.MIN_VALUE); - break; - } - decimal = 0; - } - - public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta) { - ColumnPageStatsVO instance = new ColumnPageStatsVO(meta.getType()); - instance.min = meta.getMinValue(); - instance.max = meta.getMaxValue(); - instance.decimal = meta.getDecimal(); - instance.nonExistValue = meta.getUniqueValue(); - return instance; - } - - /** - * update the statistics for the input row - */ - public void update(Object value) { - switch (dataType) { - case SHORT: - max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue(); - min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue(); - nonExistValue = (long) min - 1; - break; - case INT: - max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue(); - min = ((long) min < ((Integer) value).longValue()) ? min : ((Integer) value).longValue(); - nonExistValue = (long) min - 1; - break; - case LONG: - max = ((long) max > (long) value) ? max : value; - min = ((long) min < (long) value) ? min : value; - nonExistValue = (long) min - 1; - break; - case DOUBLE: - max = ((double) max > (double) value) ? max : value; - min = ((double) min < (double) value) ? min : value; - int num = Math.abs(getDecimalCount((double) value)); - decimal = decimal > num ? decimal : num; - nonExistValue = (double) min - 1; - break; - case DECIMAL: - BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value); - decimal = decimalValue.scale(); - BigDecimal val = (BigDecimal) min; - nonExistValue = (val.subtract(new BigDecimal(1.0))); - break; - case ARRAY: - case STRUCT: - // for complex type column, writer is not going to use stats, so, do nothing - } - } - - public void updateNull() { - switch (dataType) { - case SHORT: - max = ((long) max > 0) ? max : 0L; - min = ((long) min < 0) ? min : 0L; - nonExistValue = (long) min - 1; - break; - case INT: - max = ((long) max > 0) ? max : 0L; - min = ((long) min < 0) ? min : 0L; - nonExistValue = (long) min - 1; - break; - case LONG: - max = ((long) max > 0) ? max : 0L; - min = ((long) min < 0) ? min : 0L; - nonExistValue = (long) min - 1; - break; - case DOUBLE: - max = ((double) max > 0d) ? max : 0d; - min = ((double) min < 0d) ? min : 0d; - int num = getDecimalCount(0d); - decimal = decimal > num ? decimal : num; - nonExistValue = (double) min - 1; - break; - case DECIMAL: - BigDecimal decimalValue = BigDecimal.ZERO; - decimal = decimalValue.scale(); - BigDecimal val = (BigDecimal) min; - nonExistValue = (val.subtract(new BigDecimal(1.0))); - break; - case ARRAY: - case STRUCT: - // for complex type column, writer is not going to use stats, so, do nothing - } - } - - /** - * return no of digit after decimal - */ - private int getDecimalCount(double value) { - return BigDecimal.valueOf(value).scale(); - } - - /** - * return min value as byte array - */ - public byte[] minBytes() { - return getValueAsBytes(getMin()); - } - - /** - * return max value as byte array - */ - public byte[] maxBytes() { - return getValueAsBytes(getMax()); - } - - /** - * convert value to byte array - */ - private byte[] getValueAsBytes(Object value) { - ByteBuffer b; - switch (dataType) { - case BYTE: - case SHORT: - case INT: - case LONG: - b = ByteBuffer.allocate(8); - b.putLong((Long) value); - b.flip(); - return b.array(); - case DOUBLE: - b = ByteBuffer.allocate(8); - b.putDouble((Double) value); - b.flip(); - return b.array(); - case DECIMAL: - return DataTypeUtil.bigDecimalToByte((BigDecimal) value); - default: - throw new IllegalArgumentException("Invalid data type: " + dataType); - } - } - - public Object getMin() { - return min; - } - - public Object getMax() { - return max; - } - - public Object nonExistValue() { - return nonExistValue; - } - - public int getDecimal() { - return decimal; - } - - public DataType getDataType() { - return dataType; - } - - @Override - public String toString() { - return String.format("min: %s, max: %s, decimal: %s ", min, max, decimal); - } -} \ No newline at end of file
