Repository: carbondata Updated Branches: refs/heads/master b41da4f07 -> 4f7487dec
[CARBONDATA-1419] Support adaptive encoding for Double data type Add a new encoding for Double data type: AdaptiveFloatingCodec, it will multiple the column value by Math.pow(10, decimalCount) and do type cast from double to target data type like byte, short, int This closes#1295 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4f7487de Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4f7487de Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4f7487de Branch: refs/heads/master Commit: 4f7487decfa46627fda935e2772285e4986b69f1 Parents: b41da4f Author: Jacky Li <[email protected]> Authored: Tue Aug 29 17:31:20 2017 +0800 Committer: Ravindra Pesala <[email protected]> Committed: Wed Aug 30 14:06:06 2017 +0530 ---------------------------------------------------------------------- .../core/datastore/page/ColumnPage.java | 2 +- .../page/encoding/ColumnPageEncoderMeta.java | 2 +- .../page/encoding/DefaultEncodingStrategy.java | 39 ++- .../page/encoding/EncodingStrategy.java | 12 +- .../adaptive/AdaptiveDeltaIntegralCodec.java | 8 +- .../adaptive/AdaptiveFloatingCodec.java | 243 +++++++++++++++++++ .../adaptive/AdaptiveFloatingEncoderMeta.java | 47 ++++ .../adaptive/AdaptiveIntegralEncoderMeta.java | 2 +- .../adaptive/DeltaIntegralConverter.java | 213 ---------------- .../page/statistics/KeyPageStatsCollector.java | 2 +- .../page/statistics/LVStringStatsCollector.java | 2 +- .../statistics/PrimitivePageStatsCollector.java | 32 ++- .../page/statistics/SimpleStatsResult.java | 2 +- .../apache/carbondata/core/util/CarbonUtil.java | 2 +- examples/spark2/src/main/resources/data.csv | 20 +- format/src/main/thrift/schema.thrift | 1 + .../complexType/TestCreateTableWithDouble.scala | 4 +- 17 files changed, 390 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 2a78363..2e7bb3a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -83,7 +83,7 @@ public abstract class ColumnPage { return new byte[0]; } - @Override public int getDecimalPoint() { + @Override public int getDecimalCount() { return 0; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java index d30117e..cea35f0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java @@ -63,7 +63,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable setType(convertType(dataType)); if (stats != null) { assert (stats.getDataType() == dataType); - setDecimal(stats.getDecimalPoint()); + setDecimal(stats.getDecimalCount()); setMaxValue(stats.getMax()); setMinValue(stats.getMin()); this.scale = stats.getScale(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java index 8f36de7..b024888 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java @@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec; import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec; import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec; @@ -103,9 +104,10 @@ public class DefaultEncodingStrategy extends EncodingStrategy { case SHORT: case INT: case LONG: - return selectCodecByAlgorithm(stats).createEncoder(null); + return selectCodecByAlgorithmForIntegral(stats).createEncoder(null); case FLOAT: case DOUBLE: + return selectCodecByAlgorithmForFloating(stats).createEncoder(null); case DECIMAL: case BYTE_ARRAY: return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null); @@ -139,7 +141,7 @@ public class DefaultEncodingStrategy extends EncodingStrategy { case LONG: return fitLongMinMax((long) max, (long) min); case DOUBLE: - return DataType.DOUBLE; + return fitLongMinMax((long) (double) max, (long) (double) min); default: throw new RuntimeException("internal error: " + dataType); } @@ -162,6 +164,8 @@ public class DefaultEncodingStrategy extends EncodingStrategy { case LONG: // TODO: add overflow detection and return delta type return DataType.LONG; + case DOUBLE: + return DataType.LONG; default: throw new RuntimeException("internal error: " + dataType); } @@ -182,7 +186,7 @@ public class DefaultEncodingStrategy extends EncodingStrategy { * choose between adaptive encoder or delta adaptive encoder, based on whose target data type * size is smaller */ - static ColumnPageCodec selectCodecByAlgorithm(SimpleStatsResult stats) { + static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) { DataType srcDataType = stats.getDataType(); DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin()); DataType deltaDataType; @@ -206,4 +210,33 @@ public class DefaultEncodingStrategy extends EncodingStrategy { } } + // choose between upscale adaptive encoder or upscale delta adaptive encoder, + // based on whose target data type size is smaller + static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) { + DataType srcDataType = stats.getDataType(); + double maxValue = (double) stats.getMax(); + double minValue = (double) stats.getMin(); + int decimalCount = stats.getDecimalCount(); + + //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max, + //but we can't use -1 to getDatatype, we should use -10000000. + double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue)); + + if (decimalCount == 0) { + // short, int, long + return selectCodecByAlgorithmForIntegral(stats); + } else if (decimalCount < 0) { + return new DirectCompressCodec(DataType.DOUBLE); + } else { + // double + long max = (long) (Math.pow(10, decimalCount) * absMaxValue); + DataType adaptiveDataType = fitLongMinMax(max, 0); + if (adaptiveDataType.getSizeInBytes() < DataType.DOUBLE.getSizeInBytes()) { + return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats); + } else { + return new DirectCompressCodec(DataType.DOUBLE); + } + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java index 79c8101..d0f646b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java @@ -27,6 +27,8 @@ import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralEncoderMeta; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingEncoderMeta; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralEncoderMeta; import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec; @@ -40,6 +42,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.Encoding; import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL; +import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING; import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL; import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS; import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL; @@ -86,6 +89,12 @@ public abstract class EncodingStrategy { RLEEncoderMeta metadata = new RLEEncoderMeta(); metadata.readFields(in); return new RLECodec().createDecoder(metadata); + } else if (encoding == ADAPTIVE_FLOATING) { + AdaptiveFloatingEncoderMeta metadata = new AdaptiveFloatingEncoderMeta(); + metadata.readFields(in); + SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); + return new AdaptiveFloatingCodec(metadata.getDataType(), metadata.getTargetDataType(), + stats).createDecoder(metadata); } else { // for backward compatibility ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta); @@ -103,9 +112,10 @@ public abstract class EncodingStrategy { case SHORT: case INT: case LONG: - return DefaultEncodingStrategy.selectCodecByAlgorithm(stats).createDecoder(null); + return DefaultEncodingStrategy.selectCodecByAlgorithmForIntegral(stats).createDecoder(null); case FLOAT: case DOUBLE: + return DefaultEncodingStrategy.selectCodecByAlgorithmForFloating(stats).createDecoder(null); case DECIMAL: case BYTE_ARRAY: // no dictionary dimension http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java index 9107a6b..128c66b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -36,7 +36,8 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.format.Encoding; /** - * Codec for integer (byte, short, int, long) data type page. + * Codec for integer (byte, short, int, long) data type and floating data type (in case of + * scale is 0). * This codec will calculate delta of page max value and page value, * and do type casting of the diff to make storage minimum. */ @@ -61,6 +62,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { case LONG: this.max = (long) stats.getMax(); break; + case DOUBLE: + this.max = (long) (double) stats.getMax(); + break; default: // this codec is for integer type only throw new UnsupportedOperationException( @@ -116,8 +120,6 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException { ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); - DeltaIntegralConverter converter = new DeltaIntegralConverter(page, targetDataType, - srcDataType, stats.getMax()); return LazyColumnPage.newPage(page, converter); } }; http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java new file mode 100644 index 0000000..9b03049 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.format.Encoding; + +/** + * Codec for floating point (float, double) data type page. + * This codec will upscale the diff from page max value to integer value, + * and do type casting to make storage minimum. + */ +public class AdaptiveFloatingCodec extends AdaptiveCodec { + + private ColumnPage encodedPage; + private BigDecimal factor; + + public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats) { + return new AdaptiveFloatingCodec(srcDataType, targetDataType, stats); + } + + public AdaptiveFloatingCodec(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats) { + super(srcDataType, targetDataType, stats); + this.factor = BigDecimal.valueOf(Math.pow(10, stats.getDecimalCount())); + } + + @Override + public String getName() { + return "AdaptiveFloatingCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + final Compressor compressor = CompressorFactory.getInstance().getCompressor(); + return new ColumnPageEncoder() { + @Override + protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { + if (encodedPage != null) { + throw new IllegalStateException("already encoded"); + } + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.convertValue(converter); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return result; + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<Encoding>(); + encodings.add(Encoding.ADAPTIVE_FLOATING); + return encodings; + } + + @Override + protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { + return new AdaptiveFloatingEncoderMeta(targetDataType, stats, compressor.getName()); + } + + }; + } + + @Override + public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) { + AdaptiveFloatingEncoderMeta codecMeta = (AdaptiveFloatingEncoderMeta) meta; + final Compressor compressor = CompressorFactory.getInstance().getCompressor( + codecMeta.getCompressorName()); + final DataType targetDataType = codecMeta.getTargetDataType(); + return new ColumnPageDecoder() { + @Override + public ColumnPage decode(byte[] input, int offset, int length) + throws MemoryException, IOException { + ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + return LazyColumnPage.newPage(page, converter); + } + }; + } + + // encoded value = (10 power of decimal) * (page value) + private ColumnPageValueConverter converter = new ColumnPageValueConverter() { + @Override + public void encode(int rowId, byte value) { + // this codec is for floating point type only + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public void encode(int rowId, short value) { + // this codec is for floating point type only + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public void encode(int rowId, int value) { + // this codec is for floating point type only + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public void encode(int rowId, long value) { + // this codec is for floating point type only + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public void encode(int rowId, float value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, + BigDecimal.valueOf(value).multiply(factor).byteValue()); + break; + case SHORT: + encodedPage.putShort(rowId, + BigDecimal.valueOf(value).multiply(factor).shortValue()); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, + BigDecimal.valueOf(value).multiply(factor).intValue()); + break; + case INT: + encodedPage.putInt(rowId, + BigDecimal.valueOf(value).multiply(factor).intValue()); + break; + case LONG: + encodedPage.putLong(rowId, + BigDecimal.valueOf(value).multiply(factor).longValue()); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, double value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, + BigDecimal.valueOf(value).multiply(factor).byteValue()); + break; + case SHORT: + encodedPage.putShort(rowId, + BigDecimal.valueOf(value).multiply(factor).shortValue()); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, + BigDecimal.valueOf(value).multiply(factor).intValue()); + break; + case INT: + encodedPage.putInt(rowId, + BigDecimal.valueOf(value).multiply(factor).intValue()); + break; + case LONG: + encodedPage.putLong(rowId, + BigDecimal.valueOf(value).multiply(factor).longValue()); + break; + case DOUBLE: + encodedPage.putDouble(rowId, value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public long decodeLong(byte value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public long decodeLong(short value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public long decodeLong(int value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public double decodeDouble(byte value) { + return BigDecimal.valueOf(value).divide(factor).doubleValue(); + } + + @Override + public double decodeDouble(short value) { + return BigDecimal.valueOf(value).divide(factor).doubleValue(); + } + + @Override + public double decodeDouble(int value) { + return BigDecimal.valueOf(value).divide(factor).doubleValue(); + } + + @Override + public double decodeDouble(long value) { + return BigDecimal.valueOf(value).divide(factor).doubleValue(); + } + + @Override + public double decodeDouble(float value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public double decodeDouble(double value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + }; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java new file mode 100644 index 0000000..95a9011 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.Writable; + +public class AdaptiveFloatingEncoderMeta extends AdaptiveEncoderMeta implements Writable { + + public AdaptiveFloatingEncoderMeta() { + } + + AdaptiveFloatingEncoderMeta(DataType targetDataType, SimpleStatsResult stats, + String compressorName) { + super(targetDataType, stats, compressorName); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java index 3025303..2c89cee 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java @@ -30,7 +30,7 @@ public class AdaptiveIntegralEncoderMeta extends AdaptiveEncoderMeta implements public AdaptiveIntegralEncoderMeta() { } - public AdaptiveIntegralEncoderMeta(DataType targetDataType, SimpleStatsResult stats, + AdaptiveIntegralEncoderMeta(DataType targetDataType, SimpleStatsResult stats, String compressorName) { super(targetDataType, stats, compressorName); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java deleted file mode 100644 index 8a2bf6d..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding.adaptive; - -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter; -import org.apache.carbondata.core.metadata.datatype.DataType; - -public class DeltaIntegralConverter implements ColumnPageValueConverter { - private DataType targetDataType; - private ColumnPage encodedPage; - private long max; - - public DeltaIntegralConverter(ColumnPage encodedPage, DataType targetDataType, - DataType srcDataType, Object max) { - this.targetDataType = targetDataType; - this.encodedPage = encodedPage; - switch (srcDataType) { - case BYTE: - this.max = (byte) max; - break; - case SHORT: - this.max = (short) max; - break; - case INT: - this.max = (int) max; - break; - case LONG: - this.max = (long) max; - break; - case FLOAT: - case DOUBLE: - this.max = (long)(max); - break; - } - } - - @Override - public void encode(int rowId, byte value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - default: - throw new RuntimeException("internal error"); - } - } - - @Override - public void encode(int rowId, short value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - default: - throw new RuntimeException("internal error"); - } - } - - @Override - public void encode(int rowId, int value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - default: - throw new RuntimeException("internal error"); - } - } - - @Override - public void encode(int rowId, long value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - case LONG: - encodedPage.putLong(rowId, max - value); - break; - default: - throw new RuntimeException("internal error"); - } - } - - @Override - public void encode(int rowId, float value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - case LONG: - encodedPage.putLong(rowId, (long)(max - value)); - break; - default: - throw new RuntimeException("internal error"); - } - } - - @Override - public void encode(int rowId, double value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(max - value)); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(max - value)); - break; - case SHORT_INT: - encodedPage.putShortInt(rowId, (int)(max - value)); - break; - case INT: - encodedPage.putInt(rowId, (int)(max - value)); - break; - case LONG: - encodedPage.putLong(rowId, (long)(max - value)); - break; - default: - throw new RuntimeException("internal error"); - } - } - - @Override - public long decodeLong(byte value) { - return max - value; - } - - @Override - public long decodeLong(short value) { - return max - value; - } - - @Override - public long decodeLong(int value) { - return max - value; - } - - @Override - public double decodeDouble(byte value) { - return max - value; - } - - @Override - public double decodeDouble(short value) { - return max - value; - } - - @Override - public double decodeDouble(int value) { - return max - value; - } - - @Override - public double decodeDouble(long value) { - return max - value; - } - - @Override - public double decodeDouble(float value) { - // this codec is for integer type only - throw new RuntimeException("internal error"); - } - - @Override - public double decodeDouble(double value) { - // this codec is for integer type only - throw new RuntimeException("internal error"); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java index a13351b..be47966 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java @@ -98,7 +98,7 @@ public class KeyPageStatsCollector implements ColumnPageStatsCollector { return max; } - @Override public int getDecimalPoint() { + @Override public int getDecimalCount() { return 0; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java index 62b18c6..98a757f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java @@ -110,7 +110,7 @@ public class LVStringStatsCollector implements ColumnPageStatsCollector { return max; } - @Override public int getDecimalPoint() { + @Override public int getDecimalCount() { return 0; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java index 6c25d4e..4fb891f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java @@ -34,7 +34,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si private BigDecimal minDecimal, maxDecimal; private int scale, precision; - // scale of the double value + // scale of the double value, apply adaptive encoding if this is positive private int decimal; private boolean isFirst = true; @@ -150,8 +150,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si maxLong = Long.MIN_VALUE; break; case DOUBLE: - minDouble = Double.MAX_VALUE; - maxDouble = Double.MIN_VALUE; + minDouble = Double.POSITIVE_INFINITY; + maxDouble = Double.NEGATIVE_INFINITY; decimal = 0; break; case DECIMAL: @@ -241,6 +241,20 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si } } + /** + * Return number of digit after decimal point + * TODO: it operation is costly, optimize for performance + */ + private int getDecimalCount(double value) { + String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString(); + int integerPlaces = strValue.indexOf('.'); + int decimalPlaces = 0; + if (-1 != integerPlaces) { + decimalPlaces = strValue.length() - integerPlaces - 1; + } + return decimalPlaces; + } + @Override public void update(double value) { if (minDouble > value) { @@ -249,6 +263,16 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si if (maxDouble < value) { maxDouble = value; } + if (decimal >= 0) { + int decimalCount = getDecimalCount(value); + if (decimalCount > 5) { + // If deciaml count is too big, we do not do adaptive encoding. + // So set decimal to negative value + decimal = -1; + } else if (decimalCount > decimal) { + this.decimal = decimalCount; + } + } } @Override @@ -328,7 +352,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si } @Override - public int getDecimalPoint() { + public int getDecimalCount() { return decimal; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java index 60516fa..65cd40f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java @@ -25,7 +25,7 @@ public interface SimpleStatsResult { Object getMax(); - int getDecimalPoint(); + int getDecimalCount(); DataType getDataType(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 76d5dc7..67c1d81 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1462,7 +1462,7 @@ public final class CarbonUtil { valueEncoderMeta.setUniqueValue(buffer.getLong()); break; default: - throw new IllegalArgumentException("invalid measure type"); + throw new IllegalArgumentException("invalid measure type: " + measureType); } valueEncoderMeta.setDecimal(buffer.getInt()); valueEncoderMeta.setDataTypeSelected(buffer.get()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/examples/spark2/src/main/resources/data.csv ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv index b7adf55..3061ec7 100644 --- a/examples/spark2/src/main/resources/data.csv +++ b/examples/spark2/src/main/resources/data.csv @@ -1,10 +1,10 @@ -1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world' -5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world' -1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world' -1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world' -1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world' -3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world' -2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world' -1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world' -4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world' -1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world' +1,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world' +5,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world' +1,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world' +1,10,1150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24,ddd,2.5,'foo'#'bar'#'world' +1,10,1100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23,eeee,3.5,'foo'#'bar'#'world' +3,14,1160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26,ff,2.5,'foo'#'bar'#'world' +2,10,1100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23,ggg,2.5,'foo'#'bar'#'world' +1,10,1100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23,hhh,2.5,'foo'#'bar'#'world' +4,16,1130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23,iii,2.5,'foo'#'bar'#'world' +1,10,1100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23,jjj,2.5,'foo'#'bar'#'world' http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/format/src/main/thrift/schema.thrift ---------------------------------------------------------------------- diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift index 638e523..de1a5aa 100644 --- a/format/src/main/thrift/schema.thrift +++ b/format/src/main/thrift/schema.thrift @@ -52,6 +52,7 @@ enum Encoding{ ADAPTIVE_DELTA_INTEGRAL = 8; // Identifies that a column is encoded using AdaptiveDeltaIntegralCodec RLE_INTEGRAL = 9; // Identifies that a column is encoded using RLECodec DIRECT_STRING = 10; // Stores string value and string length separately in page data + ADAPTIVE_FLOATING = 11; // Identifies that a column is encoded using AdaptiveFloatingCodec } enum PartitionType{ http://git-wip-us.apache.org/repos/asf/carbondata/blob/4f7487de/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala index fc9d497..2bda616 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala @@ -54,8 +54,8 @@ class TestCreateTableWithDouble extends QueryTest with BeforeAndAfterAll { case e : Throwable => fail(e) } // assert that load and query is successful - assertResult(countNum)(Array(Row(3))) - assertResult(doubleField)(Array(Row(1.5), Row(2.0), Row(3.0))) + assertResult(Array(Row(3)))(countNum) + assertResult(Array(Row(1.5), Row(2.0), Row(3.0)))(doubleField) } test("test creating carbon table with double as dimension") {
