Optimizing decimal datatype Optimized big decimal to use less space
Fixed comments Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/12911629 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/12911629 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/12911629 Branch: refs/heads/metadata Commit: 12911629c84aabf2400fddb74c97a4ec54a533e8 Parents: 403c3d9 Author: ravipesala <ravi.pes...@gmail.com> Authored: Wed Jun 28 16:03:03 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Jul 11 15:50:01 2017 +0800 ---------------------------------------------------------------------- .../carbondata/core/datastore/TableSpec.java | 23 +- .../AbstractMeasureChunkReaderV2V3Format.java | 15 +- ...CompressedMeasureChunkFileBasedReaderV1.java | 2 +- .../core/datastore/page/ColumnPage.java | 107 ++++++--- .../core/datastore/page/LazyColumnPage.java | 13 +- .../datastore/page/SafeFixLengthColumnPage.java | 14 +- .../datastore/page/SafeVarLengthColumnPage.java | 11 +- .../page/UnsafeFixLengthColumnPage.java | 13 +- .../page/UnsafeVarLengthColumnPage.java | 19 +- .../datastore/page/VarLengthColumnPageBase.java | 76 +++++- .../page/encoding/AdaptiveIntegerCodec.java | 11 +- .../page/encoding/DefaultEncodingStrategy.java | 8 +- .../page/encoding/DeltaIntegerCodec.java | 11 +- .../page/encoding/DirectCompressCodec.java | 16 +- .../page/encoding/EncodingStrategy.java | 4 +- .../page/statistics/ColumnPageStatsVO.java | 18 +- .../datatype/DecimalConverterFactory.java | 237 +++++++++++++++++++ .../core/util/CarbonMetadataUtil.java | 28 +++ .../newflow/sort/SortStepRowUtil.java | 17 +- .../sort/unsafe/UnsafeCarbonRowPage.java | 2 +- .../holder/UnsafeSortTempFileChunkHolder.java | 3 +- .../CarbonRowDataWriterProcessorStepImpl.java | 14 +- .../sortdata/SortTempFileChunkHolder.java | 3 +- .../carbondata/processing/store/TablePage.java | 14 +- 24 files changed, 557 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java index 365f1ca..650c2a6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java @@ -188,18 +188,27 @@ public class TableSpec { // data type of each measure, in schema order private DataType[] types; + private int[] scale; + + private int[] precision; + MeasureSpec(List<CarbonMeasure> measures) { fieldName = new String[measures.size()]; types = new DataType[measures.size()]; + scale = new int[measures.size()]; + precision = new int[measures.size()]; int i = 0; for (CarbonMeasure measure: measures) { - add(i++, measure.getColName(), measure.getDataType()); + add(i++, measure.getColName(), measure.getDataType(), measure.getScale(), + measure.getPrecision()); } } - private void add(int index, String name, DataType type) { + private void add(int index, String name, DataType type, int scale, int precision) { fieldName[index] = name; types[index] = type; + this.scale[index] = scale; + this.precision[index] = precision; } /** @@ -210,6 +219,16 @@ public class TableSpec { return types[index]; } + public int getScale(int index) { + assert (index >= 0 && index < precision.length); + return scale[index]; + } + + public int getPrecision(int index) { + assert (index >= 0 && index < precision.length); + return precision[index]; + } + /** * return number of measures */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java index 2f5af87..c35cefb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.BitSet; import java.util.List; @@ -132,10 +133,18 @@ public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk, DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException { // for measure, it should have only one ValueEncoderMeta - assert (measureColumnChunk.getEncoder_meta().size() == 1); - byte[] encodedMeta = measureColumnChunk.getEncoder_meta().get(0).array(); + List<ByteBuffer> encoder_meta = measureColumnChunk.getEncoder_meta(); + assert (encoder_meta.size() > 0); + byte[] encodedMeta = encoder_meta.get(0).array(); ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV3(encodedMeta); - ColumnPageCodec codec = strategy.createCodec(meta); + int scale = -1; + int precision = -1; + if (encoder_meta.size() > 1) { + ByteBuffer decimalInfo = encoder_meta.get(1); + scale = decimalInfo.getInt(); + precision = decimalInfo.getInt(); + } + ColumnPageCodec codec = strategy.createCodec(meta, scale, precision); byte[] rawData = measureRawColumnChunk.getRawData().array(); return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java index 6e59b9f..8f69a7c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java @@ -98,7 +98,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun DataChunk dataChunk = measureColumnChunks.get(blockIndex); ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0); - ColumnPageCodec codec = strategy.createCodec(meta); + ColumnPageCodec codec = strategy.createCodec(meta, -1, -1); ColumnPage page = codec.decode(measureRawColumnChunk.getRawData().array(), measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 730243c..2c43165 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -26,8 +26,8 @@ import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.DataTypeUtil; import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE; import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE_ARRAY; @@ -43,6 +43,8 @@ public abstract class ColumnPage { protected final int pageSize; protected final DataType dataType; + protected int scale; + protected int precision; // statistics of this column page private ColumnPageStatsVO stats; @@ -50,15 +52,22 @@ public abstract class ColumnPage { // The index of the rowId whose value is null, will be set to 1 private BitSet nullBitSet; + protected DecimalConverterFactory.DecimalConverter decimalConverter; + protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT)); - protected ColumnPage(DataType dataType, int pageSize) { + protected ColumnPage(DataType dataType, int pageSize, int scale, int precision) { this.dataType = dataType; this.pageSize = pageSize; + this.scale = scale; + this.precision = precision; this.stats = new ColumnPageStatsVO(dataType); this.nullBitSet = new BitSet(pageSize); + if (dataType == DECIMAL) { + decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale); + } } public DataType getDataType() { @@ -73,54 +82,62 @@ public abstract class ColumnPage { return pageSize; } - private static ColumnPage createVarLengthPage(DataType dataType, int pageSize) { + private static ColumnPage createVarLengthPage(DataType dataType, int pageSize, int scale, + int precision) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(dataType, pageSize); + return new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(dataType, pageSize); + return new SafeVarLengthColumnPage(dataType, pageSize, scale, precision); } } - private static ColumnPage createFixLengthPage(DataType dataType, int pageSize) { + private static ColumnPage createFixLengthPage(DataType dataType, int pageSize, int scale, + int precision) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(dataType, pageSize); + return new UnsafeFixLengthColumnPage(dataType, pageSize, scale, precision); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(dataType, pageSize); + return new SafeFixLengthColumnPage(dataType, pageSize, scale, pageSize); } } - private static ColumnPage createPage(DataType dataType, int pageSize) { + private static ColumnPage createPage(DataType dataType, int pageSize, int scale, int precision) { if (dataType.equals(BYTE_ARRAY) | dataType.equals(DECIMAL)) { - return createVarLengthPage(dataType, pageSize); + return createVarLengthPage(dataType, pageSize, scale, precision); } else { - return createFixLengthPage(dataType, pageSize); + return createFixLengthPage(dataType, pageSize, scale, precision); } } - public static ColumnPage newVarLengthPath(DataType dataType, int pageSize) { + public static ColumnPage newVarLengthPage(DataType dataType, int pageSize) { + return newVarLengthPage(dataType, pageSize, -1, -1); + } + + private static ColumnPage newVarLengthPage(DataType dataType, int pageSize, int scale, + int precision) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(dataType, pageSize); + return new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(dataType, pageSize); + return new SafeVarLengthColumnPage(dataType, pageSize, scale, precision); } } /** * Create a new page of dataType and number of row = pageSize */ - public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException { + public static ColumnPage newPage(DataType dataType, int pageSize, int scale, int precision) + throws MemoryException { ColumnPage instance; if (unsafe) { switch (dataType) { @@ -131,11 +148,11 @@ public abstract class ColumnPage { case LONG: case FLOAT: case DOUBLE: - instance = new UnsafeFixLengthColumnPage(dataType, pageSize); + instance = new UnsafeFixLengthColumnPage(dataType, pageSize, scale, precision); break; case DECIMAL: case BYTE_ARRAY: - instance = new UnsafeVarLengthColumnPage(dataType, pageSize); + instance = new UnsafeVarLengthColumnPage(dataType, pageSize, scale, precision); break; default: throw new RuntimeException("Unsupported data dataType: " + dataType); @@ -164,7 +181,7 @@ public abstract class ColumnPage { instance = newDoublePage(new double[pageSize]); break; case DECIMAL: - instance = newDecimalPage(new byte[pageSize][]); + instance = newDecimalPage(new byte[pageSize][], scale, precision); break; default: throw new RuntimeException("Unsupported data dataType: " + dataType); @@ -174,55 +191,61 @@ public abstract class ColumnPage { } private static ColumnPage newBytePage(byte[] byteData) { - ColumnPage columnPage = createPage(BYTE, byteData.length); + ColumnPage columnPage = createPage(BYTE, byteData.length, -1, -1); columnPage.setBytePage(byteData); return columnPage; } private static ColumnPage newShortPage(short[] shortData) { - ColumnPage columnPage = createPage(SHORT, shortData.length); + ColumnPage columnPage = createPage(SHORT, shortData.length, -1, -1); columnPage.setShortPage(shortData); return columnPage; } private static ColumnPage newShortIntPage(byte[] shortIntData) { - ColumnPage columnPage = createPage(SHORT_INT, shortIntData.length / 3); + ColumnPage columnPage = createPage(SHORT_INT, shortIntData.length / 3, -1, -1); columnPage.setShortIntPage(shortIntData); return columnPage; } private static ColumnPage newIntPage(int[] intData) { - ColumnPage columnPage = createPage(INT, intData.length); + ColumnPage columnPage = createPage(INT, intData.length, -1, -1); columnPage.setIntPage(intData); return columnPage; } private static ColumnPage newLongPage(long[] longData) { - ColumnPage columnPage = createPage(LONG, longData.length); + ColumnPage columnPage = createPage(LONG, longData.length, -1, -1); columnPage.setLongPage(longData); return columnPage; } private static ColumnPage newFloatPage(float[] floatData) { - ColumnPage columnPage = createPage(FLOAT, floatData.length); + ColumnPage columnPage = createPage(FLOAT, floatData.length, -1, -1); columnPage.setFloatPage(floatData); return columnPage; } private static ColumnPage newDoublePage(double[] doubleData) { - ColumnPage columnPage = createPage(DOUBLE, doubleData.length); + ColumnPage columnPage = createPage(DOUBLE, doubleData.length, -1, -1); columnPage.setDoublePage(doubleData); return columnPage; } - private static ColumnPage newDecimalPage(byte[][] byteArray) { - ColumnPage columnPage = createPage(DECIMAL, byteArray.length); + private static ColumnPage newDecimalPage(byte[][] byteArray, int scale, int precision) { + ColumnPage columnPage = createPage(DECIMAL, byteArray.length, scale, precision); columnPage.setByteArrayPage(byteArray); return columnPage; } - private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray) throws MemoryException { - return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray); + private static ColumnPage newDecimalPage(byte[] lvEncodedByteArray, int scale, int precision) + throws MemoryException { + return VarLengthColumnPageBase.newDecimalColumnPage(lvEncodedByteArray, scale, precision); + } + + private static ColumnPage newVarLengthPage(byte[] lvEncodedByteArray, int scale, int precision) + throws MemoryException { + return VarLengthColumnPageBase.newVarLengthColumnPage(lvEncodedByteArray, scale, precision); } /** @@ -297,6 +320,8 @@ public abstract class ColumnPage { putDouble(rowId, (double) value); break; case DECIMAL: + putDecimal(rowId, (BigDecimal) value); + break; case BYTE_ARRAY: putBytes(rowId, (byte[]) value); break; @@ -337,6 +362,11 @@ public abstract class ColumnPage { public abstract void putBytes(int rowId, byte[] bytes); /** + * Set byte array value at rowId + */ + public abstract void putDecimal(int rowId, BigDecimal decimal); + + /** * Type cast int value to 3 bytes value and set at rowId */ public abstract void putShortInt(int rowId, int value); @@ -346,7 +376,6 @@ public abstract class ColumnPage { */ public abstract void putBytes(int rowId, byte[] bytes, int offset, int length); - private static final byte[] ZERO = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO); /** * Set null at rowId @@ -370,7 +399,7 @@ public abstract class ColumnPage { putDouble(rowId, 0.0); break; case DECIMAL: - putBytes(rowId, ZERO); + putDecimal(rowId, BigDecimal.ZERO); break; } } @@ -468,6 +497,11 @@ public abstract class ColumnPage { public abstract byte[] getFlattenedBytePage(); /** + * For decimals + */ + public abstract byte[] getDecimalPage(); + + /** * Encode the page data by codec (Visitor) */ public abstract void encode(PrimitiveCodec codec); @@ -492,7 +526,7 @@ public abstract class ColumnPage { case DOUBLE: return compressor.compressDouble(getDoublePage()); case DECIMAL: - return compressor.compressByte(getFlattenedBytePage()); + return compressor.compressByte(getDecimalPage()); case BYTE_ARRAY: return compressor.compressByte(getFlattenedBytePage()); default: @@ -504,7 +538,8 @@ public abstract class ColumnPage { * Decompress data and create a column page using the decompressed data */ public static ColumnPage decompress(Compressor compressor, DataType dataType, - byte[] compressedData, int offset, int length) throws MemoryException { + byte[] compressedData, int offset, int length, int scale, int precision) + throws MemoryException { switch (dataType) { case BYTE: byte[] byteData = compressor.unCompressByte(compressedData, offset, length); @@ -528,9 +563,11 @@ public abstract class ColumnPage { double[] doubleData = compressor.unCompressDouble(compressedData, offset, length); return newDoublePage(doubleData); case DECIMAL: - case BYTE_ARRAY: byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length); - return newDecimalPage(lvEncodedBytes); + return newDecimalPage(lvEncodedBytes, scale, precision); + case BYTE_ARRAY: + byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); + return newVarLengthPage(lvVarBytes, scale, precision); default: throw new UnsupportedOperationException("unsupport uncompress column page: " + dataType); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java index 6ec2e07..b0978d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java @@ -32,7 +32,8 @@ public class LazyColumnPage extends ColumnPage { private PrimitiveCodec codec; private LazyColumnPage(ColumnPage columnPage, PrimitiveCodec codec) { - super(columnPage.getDataType(), columnPage.getPageSize()); + super(columnPage.getDataType(), columnPage.getPageSize(), columnPage.scale, + columnPage.precision); this.columnPage = columnPage; this.codec = codec; } @@ -137,6 +138,16 @@ public class LazyColumnPage extends ColumnPage { } @Override + public void putDecimal(int rowId, BigDecimal decimal) { + throw new UnsupportedOperationException("internal error"); + } + + @Override + public byte[] getDecimalPage() { + throw new UnsupportedOperationException("internal error"); + } + + @Override public byte[] getFlattenedBytePage() { throw new UnsupportedOperationException("internal error"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index cfb1798..9bd85e6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -36,8 +36,8 @@ public class SafeFixLengthColumnPage extends ColumnPage { private double[] doubleData; private byte[] shortIntData; - SafeFixLengthColumnPage(DataType dataType, int pageSize) { - super(dataType, pageSize); + SafeFixLengthColumnPage(DataType dataType, int pageSize, int scale, int precision) { + super(dataType, pageSize, scale, precision); } /** @@ -99,6 +99,16 @@ public class SafeFixLengthColumnPage extends ColumnPage { throw new UnsupportedOperationException("invalid data type: " + dataType); } + @Override + public void putDecimal(int rowId, BigDecimal decimal) { + throw new UnsupportedOperationException("invalid data type: " + dataType); + } + + @Override + public byte[] getDecimalPage() { + throw new UnsupportedOperationException("invalid data type: " + dataType); + } + /** * Get byte value at rowId */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java index 3a76f55..63291f3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java @@ -20,15 +20,14 @@ package org.apache.carbondata.core.datastore.page; import java.math.BigDecimal; import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.util.DataTypeUtil; public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { // for string and decimal data private byte[][] byteArrayData; - SafeVarLengthColumnPage(DataType dataType, int pageSize) { - super(dataType, pageSize); + SafeVarLengthColumnPage(DataType dataType, int pageSize, int scale, int precision) { + super(dataType, pageSize, scale, precision); byteArrayData = new byte[pageSize][]; } @@ -47,10 +46,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length); } + @Override public void putDecimal(int rowId, BigDecimal decimal) { + putBytes(rowId, decimalConverter.convert(decimal)); + } + @Override public BigDecimal getDecimal(int rowId) { byte[] bytes = byteArrayData[rowId]; - return DataTypeUtil.byteToBigDecimal(bytes); + return decimalConverter.getDecimal(bytes); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index 9f71768..2382599 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -49,8 +49,9 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { private static final int floatBits = DataType.FLOAT.getSizeBits(); private static final int doubleBits = DataType.DOUBLE.getSizeBits(); - UnsafeFixLengthColumnPage(DataType dataType, int pageSize) throws MemoryException { - super(dataType, pageSize); + UnsafeFixLengthColumnPage(DataType dataType, int pageSize, int scale, int precision) + throws MemoryException { + super(dataType, pageSize, scale, precision); switch (dataType) { case BYTE: case SHORT: @@ -124,6 +125,10 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { throw new UnsupportedOperationException("invalid data type: " + dataType); } + @Override public void putDecimal(int rowId, BigDecimal decimal) { + throw new UnsupportedOperationException("invalid data type: " + dataType); + } + @Override public byte getByte(int rowId) { long offset = rowId << byteBits; @@ -175,6 +180,10 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { throw new UnsupportedOperationException("invalid data type: " + dataType); } + @Override public byte[] getDecimalPage() { + throw new UnsupportedOperationException("invalid data type: " + dataType); + } + @Override public byte[] getBytePage() { byte[] data = new byte[getPageSize()]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java index dd6abc5..16cf94c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java @@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.util.DataTypeUtil; // This extension uses unsafe memory to store page data, for variable length data type (string, // decimal) @@ -52,10 +51,11 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { * @param dataType data type * @param pageSize number of row */ - UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException { - super(dataType, pageSize); + UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int scale, int precision) + throws MemoryException { + super(dataType, pageSize, scale, precision); capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR); - memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity)); + memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long) (capacity)); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); } @@ -66,8 +66,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { * @param pageSize number of row * @param capacity initial capacity of the page, in bytes */ - UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException { - super(dataType, pageSize); + UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity, + int scale, int precision) throws MemoryException { + super(dataType, pageSize, scale, precision); this.capacity = capacity; memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity)); baseAddress = memoryBlock.getBaseObject(); @@ -117,6 +118,10 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { baseAddress, baseOffset + rowOffset[rowId], length); } + @Override public void putDecimal(int rowId, BigDecimal decimal) { + putBytes(rowId, decimalConverter.convert(decimal)); + } + @Override public BigDecimal getDecimal(int rowId) { int length = rowOffset[rowId + 1] - rowOffset[rowId]; @@ -124,7 +129,7 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { CarbonUnsafe.unsafe.copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); - return DataTypeUtil.byteToBigDecimal(bytes); + return decimalConverter.getDecimal(bytes); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index 801cfb3..46d6787 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory; import org.apache.carbondata.core.util.ByteUtil; import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL; @@ -34,8 +35,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { // the length of bytes added in the page int totalLength; - VarLengthColumnPageBase(DataType dataType, int pageSize) { - super(dataType, pageSize); + VarLengthColumnPageBase(DataType dataType, int pageSize, int scale, int precision) { + super(dataType, pageSize, scale, precision); rowOffset = new int[pageSize + 1]; totalLength = 0; } @@ -83,7 +84,59 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { /** * Create a new column page based on the LV (Length Value) encoded bytes */ - static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes) throws MemoryException { + static ColumnPage newDecimalColumnPage(byte[] lvEncodedBytes, int scale, int precision) + throws MemoryException { + DecimalConverterFactory.DecimalConverter decimalConverter = + DecimalConverterFactory.INSTANCE.getDecimalConverter(precision, scale); + int size = decimalConverter.getSize(); + if (size < 0) { + return getLVBytesColumnPage(lvEncodedBytes, scale, precision, DataType.DECIMAL); + } else { + // Here the size is always fixed. + return getDecimalColumnPage(lvEncodedBytes, scale, precision, size); + } + } + + /** + * Create a new column page based on the LV (Length Value) encoded bytes + */ + static ColumnPage newVarLengthColumnPage(byte[] lvEncodedBytes, int scale, int precision) + throws MemoryException { + return getLVBytesColumnPage(lvEncodedBytes, scale, precision, DataType.BYTE_ARRAY); + } + + private static ColumnPage getDecimalColumnPage(byte[] lvEncodedBytes, int scale, int precision, + int size) throws MemoryException { + List<Integer> rowOffset = new ArrayList<>(); + int offset; + int rowId = 0; + for (offset = 0; offset < lvEncodedBytes.length; offset += size) { + rowOffset.add(offset); + rowId++; + } + rowOffset.add(offset); + + VarLengthColumnPageBase page; + if (unsafe) { + page = new UnsafeVarLengthColumnPage(DECIMAL, rowId, scale, precision); + } else { + page = new SafeVarLengthColumnPage(DECIMAL, rowId, scale, precision); + } + + // set total length and rowOffset in page + page.totalLength = offset; + page.rowOffset = new int[rowId + 1]; + for (int i = 0; i < rowId + 1; i++) { + page.rowOffset[i] = rowOffset.get(i); + } + for (int i = 0; i < rowId; i++) { + page.putBytes(i, lvEncodedBytes, i * size, size); + } + return page; + } + + private static ColumnPage getLVBytesColumnPage(byte[] lvEncodedBytes, int scale, + int precision, DataType dataType) throws MemoryException { // extract length and data, set them to rowOffset and unsafe memory correspondingly int rowId = 0; List<Integer> rowOffset = new ArrayList<>(); @@ -107,9 +160,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { VarLengthColumnPageBase page; int inputDataLength = offset; if (unsafe) { - page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength); + page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength, scale, precision); } else { - page = new SafeVarLengthColumnPage(DECIMAL, numRows); + page = new SafeVarLengthColumnPage(dataType, numRows, scale, precision); } // set total length and rowOffset in page @@ -242,6 +295,19 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { throw new UnsupportedOperationException("invalid data type: " + dataType); } + @Override + public byte[] getDecimalPage() { + // output LV encoded byte array + int offset = 0; + byte[] data = new byte[totalLength]; + for (int rowId = 0; rowId < pageSize; rowId++) { + int length = rowOffset[rowId + 1] - rowOffset[rowId]; + copyBytes(rowId, data, offset, length); + offset += length; + } + return data; + } + /** * Copy `length` bytes from data at rowId to dest start from destOffset */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java index 3d56f0c..fe15ba7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java @@ -52,7 +52,8 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec { @Override public byte[] encode(ColumnPage input) throws MemoryException, IOException { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + encodedPage = ColumnPage + .newPage(targetDataType, input.getPageSize(), stats.getScale(), stats.getPrecision()); input.encode(codec); byte[] result = encodedPage.compress(compressor); encodedPage.freeMemory(); @@ -62,9 +63,13 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { if (srcDataType.equals(targetDataType)) { - return ColumnPage.decompress(compressor, targetDataType, input, offset, length); + return ColumnPage + .decompress(compressor, targetDataType, input, offset, length, stats.getScale(), + stats.getPrecision()); } else { - ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + ColumnPage page = ColumnPage + .decompress(compressor, targetDataType, input, offset, length, stats.getScale(), + stats.getPrecision()); return LazyColumnPage.newPage(page, codec); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java index 3818263..659dc2a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java @@ -79,7 +79,7 @@ public class DefaultEncodingStrategy extends EncodingStrategy { if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType.getSizeInBytes()) { // no effect to use adaptive or delta, use compression only - return DirectCompressCodec.newInstance(srcDataType, compressor); + return DirectCompressCodec.newInstance(stats, compressor); } if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) { // choose adaptive encoding @@ -93,17 +93,17 @@ public class DefaultEncodingStrategy extends EncodingStrategy { @Override ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) { - return DirectCompressCodec.newInstance(stats.getDataType(), compressor); + return DirectCompressCodec.newInstance(stats, compressor); } // for decimal, currently it is a very basic implementation @Override ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) { - return DirectCompressCodec.newInstance(stats.getDataType(), compressor); + return DirectCompressCodec.newInstance(stats, compressor); } @Override ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) { - return DirectCompressCodec.newInstance(stats.getDataType(), compressor); + return DirectCompressCodec.newInstance(stats, compressor); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java index 6cf59a6..a45552a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java @@ -67,7 +67,8 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec { @Override public byte[] encode(ColumnPage input) throws MemoryException, IOException { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + encodedPage = ColumnPage + .newPage(targetDataType, input.getPageSize(), stats.getScale(), stats.getPrecision()); input.encode(codec); byte[] result = encodedPage.compress(compressor); encodedPage.freeMemory(); @@ -77,9 +78,13 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { if (srcDataType.equals(targetDataType)) { - return ColumnPage.decompress(compressor, targetDataType, input, offset, length); + return ColumnPage + .decompress(compressor, targetDataType, input, offset, length, stats.getScale(), + stats.getPrecision()); } else { - ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + ColumnPage page = ColumnPage + .decompress(compressor, targetDataType, input, offset, length, stats.getScale(), + stats.getPrecision()); return LazyColumnPage.newPage(page, codec); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java index dcb9b7c..d608fea 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java @@ -21,8 +21,8 @@ import java.io.IOException; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; /** * This codec directly apply compression on the input data @@ -30,15 +30,15 @@ import org.apache.carbondata.core.metadata.datatype.DataType; public class DirectCompressCodec implements ColumnPageCodec { private Compressor compressor; - private DataType dataType; + private ColumnPageStatsVO stats; - private DirectCompressCodec(DataType dataType, Compressor compressor) { + private DirectCompressCodec(ColumnPageStatsVO stats, Compressor compressor) { this.compressor = compressor; - this.dataType = dataType; + this.stats = stats; } - public static DirectCompressCodec newInstance(DataType dataType, Compressor compressor) { - return new DirectCompressCodec(dataType, compressor); + public static DirectCompressCodec newInstance(ColumnPageStatsVO stats, Compressor compressor) { + return new DirectCompressCodec(stats, compressor); } @Override @@ -53,6 +53,8 @@ public class DirectCompressCodec implements ColumnPageCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - return ColumnPage.decompress(compressor, dataType, input, offset, length); + return ColumnPage + .decompress(compressor, stats.getDataType(), input, offset, length, stats.getScale(), + stats.getPrecision()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java index 49fb625..77d3b74 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java @@ -51,8 +51,8 @@ public abstract class EncodingStrategy { /** * create codec based on the page data type and statistics contained by ValueEncoderMeta */ - public ColumnPageCodec createCodec(ValueEncoderMeta meta) { - ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta); + public ColumnPageCodec createCodec(ValueEncoderMeta meta, int scale, int precision) { + ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta, scale, precision); return createCodec(stats); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java index 058699a..3629101 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java @@ -40,6 +40,10 @@ public class ColumnPageStatsVO { /** decimal count of the measures */ private int decimal; + private int scale; + + private int precision; + public ColumnPageStatsVO(DataType dataType) { this.dataType = dataType; switch (dataType) { @@ -64,12 +68,14 @@ public class ColumnPageStatsVO { decimal = 0; } - public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta) { + public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta, int scale, int precision) { ColumnPageStatsVO instance = new ColumnPageStatsVO(meta.getType()); instance.min = meta.getMinValue(); instance.max = meta.getMaxValue(); instance.decimal = meta.getDecimal(); instance.nonExistValue = meta.getUniqueValue(); + instance.scale = scale; + instance.precision = precision; return instance; } @@ -101,7 +107,7 @@ public class ColumnPageStatsVO { nonExistValue = (double) min - 1; break; case DECIMAL: - BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value); + BigDecimal decimalValue = (BigDecimal) value; decimal = decimalValue.scale(); BigDecimal val = (BigDecimal) min; nonExistValue = (val.subtract(new BigDecimal(1.0))); @@ -215,6 +221,14 @@ public class ColumnPageStatsVO { return dataType; } + public int getScale() { + return scale; + } + + public int getPrecision() { + return precision; + } + @Override public String toString() { return String.format("min: %s, max: %s, decimal: %s ", min, max, decimal); http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java new file mode 100644 index 0000000..555df1c --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata.datatype; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.util.DataTypeUtil; + +/** + * Decimal converter to keep the data compact. + */ +public final class DecimalConverterFactory { + + public static DecimalConverterFactory INSTANCE = new DecimalConverterFactory(); + + private int[] minBytesForPrecision = minBytesForPrecision(); + + private DecimalConverterFactory() { + + } + + private int computeMinBytesForPrecision(int precision) { + int numBytes = 1; + while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) { + numBytes += 1; + } + return numBytes; + } + + private int[] minBytesForPrecision() { + int[] data = new int[39]; + for (int i = 0; i < data.length; i++) { + data[i] = computeMinBytesForPrecision(i); + } + return data; + } + + public interface DecimalConverter { + + byte[] convert(BigDecimal decimal); + + BigDecimal getDecimal(byte[] bytes); + + void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId); + + int getSize(); + + } + + public class DecimalIntConverter implements DecimalConverter { + + private ByteBuffer buffer = ByteBuffer.allocate(4); + + private int precision; + private int scale; + + public DecimalIntConverter(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override public byte[] convert(BigDecimal decimal) { + long longValue = decimal.unscaledValue().longValue(); + buffer.putInt(0, (int) longValue); + return buffer.array().clone(); + } + + @Override public BigDecimal getDecimal(byte[] bytes) { + long unscaled = getUnscaledLong(bytes); + return BigDecimal.valueOf(unscaled, scale); + } + + @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) { + long unscaled = getUnscaledLong(bytes); + vector.putInt(rowId, (int) unscaled); + } + + @Override public int getSize() { + return 4; + } + } + + private long getUnscaledLong(byte[] bytes) { + long unscaled = 0L; + int i = 0; + + while (i < bytes.length) { + unscaled = (unscaled << 8) | (bytes[i] & 0xff); + i += 1; + } + + int bits = 8 * bytes.length; + unscaled = (unscaled << (64 - bits)) >> (64 - bits); + return unscaled; + } + + public class DecimalLongConverter implements DecimalConverter { + + private ByteBuffer buffer = ByteBuffer.allocate(8); + + private int precision; + private int scale; + + public DecimalLongConverter(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override public byte[] convert(BigDecimal decimal) { + long longValue = decimal.unscaledValue().longValue(); + buffer.putLong(0, longValue); + return buffer.array().clone(); + } + + @Override public BigDecimal getDecimal(byte[] bytes) { + long unscaled = getUnscaledLong(bytes); + return BigDecimal.valueOf(unscaled, scale); + } + + @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) { + long unscaled = getUnscaledLong(bytes); + vector.putLong(rowId, unscaled); + } + + @Override public int getSize() { + return 8; + } + } + + public class DecimalUnscaledConverter implements DecimalConverter { + + private int precision; + + private int scale; + + private int numBytes; + + private byte[] decimalBuffer = new byte[minBytesForPrecision[38]]; + + public DecimalUnscaledConverter(int precision, int scale) { + this.precision = precision; + this.scale = scale; + this.numBytes = minBytesForPrecision[precision]; + } + + @Override public byte[] convert(BigDecimal decimal) { + byte[] bytes = decimal.unscaledValue().toByteArray(); + byte[] fixedLengthBytes = null; + if (bytes.length == numBytes) { + // If the length of the underlying byte array of the unscaled `BigInteger` happens to be + // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`. + fixedLengthBytes = bytes; + } else { + // Otherwise, the length must be less than `numBytes`. In this case we copy contents of + // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result + // fixed-length byte array. + byte signByte = 0; + if (bytes[0] < 0) { + signByte = (byte) -1; + } else { + signByte = (byte) 0; + } + Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte); + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length); + fixedLengthBytes = decimalBuffer; + } + byte[] value = new byte[numBytes]; + System.arraycopy(fixedLengthBytes, 0, value, 0, numBytes); + return value; + } + + @Override public BigDecimal getDecimal(byte[] bytes) { + BigInteger bigInteger = new BigInteger(bytes); + return new BigDecimal(bigInteger, scale); + } + + @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) { + vector.putBytes(rowId, bytes); + } + + @Override public int getSize() { + return numBytes; + } + } + + public static class LVBytesDecimalConverter implements DecimalConverter { + + public static LVBytesDecimalConverter INSTANCE = new LVBytesDecimalConverter(); + + @Override public byte[] convert(BigDecimal decimal) { + return DataTypeUtil.bigDecimalToByte(decimal); + } + + @Override public BigDecimal getDecimal(byte[] bytes) { + return DataTypeUtil.byteToBigDecimal(bytes); + } + + @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) { + throw new UnsupportedOperationException("Unsupported in vector reading for legacy format"); + } + + @Override public int getSize() { + return -1; + } + } + + public DecimalConverter getDecimalConverter(int precision, int scale) { + if (precision < 0) { + return new LVBytesDecimalConverter(); + } else if (precision <= 9) { + return new DecimalIntConverter(precision, scale); + } else if (precision <= 18) { + return new DecimalLongConverter(precision, scale); + } else { + return new DecimalUnscaledConverter(precision, scale); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index e89ce12..0a27a74 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -24,6 +24,7 @@ import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.carbondata.common.logging.LogService; @@ -431,6 +432,29 @@ public class CarbonMetadataUtil { return false; } + private static ByteBuffer writeInfoIfDecimal(int blockIndex, + SegmentProperties segmentProperties) { + Map<Integer, Integer> blockMapping = segmentProperties.getMeasuresOrdinalToBlockMapping(); + List<CarbonMeasure> measures = segmentProperties.getMeasures(); + CarbonMeasure selectedMeasure = null; + for (CarbonMeasure measure : measures) { + Integer blockId = blockMapping.get(measure.getOrdinal()); + selectedMeasure = measure; + if (blockId == blockIndex) { + break; + } + } + assert (selectedMeasure != null); + if (selectedMeasure.getDataType() == DataType.DECIMAL) { + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putInt(selectedMeasure.getScale()); + buffer.putInt(selectedMeasure.getPrecision()); + buffer.flip(); + return buffer; + } + return null; + } + private static byte[] serializeEncoderMeta(ValueEncoderMeta encoderMeta) throws IOException { // TODO : should remove the unnecessary fields. ByteArrayOutputStream aos = new ByteArrayOutputStream(); @@ -788,6 +812,10 @@ public class CarbonMetadataUtil { List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>(); encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer( createValueEncoderMeta(nodeHolder.getStats(), index)))); + ByteBuffer decimalMeta = writeInfoIfDecimal(index, segmentProperties); + if (decimalMeta != null) { + encoderMetaList.add(decimalMeta); + } dataChunk.setEncoder_meta(encoderMetaList); dataChunk.min_max .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index])); http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java index 5b0685b..50fb4c5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java @@ -17,10 +17,6 @@ package org.apache.carbondata.processing.newflow.sort; -import java.math.BigDecimal; - -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; import org.apache.carbondata.processing.util.NonDictionaryUtil; @@ -60,21 +56,10 @@ public class SortStepRowUtil { index = 0; - DataType[] measureDataType = parameters.getMeasureDataType(); // read measure values for (int i = 0; i < measureCount; i++) { if (needConvertDecimalToByte) { - Object value = data[allCount]; - if (null != value) { - if (measureDataType[i] == DataType.DECIMAL) { - BigDecimal decimal = (BigDecimal) value; - measures[index++] = DataTypeUtil.bigDecimalToByte(decimal); - } else { - measures[index++] = value; - } - } else { - measures[index++] = null; - } + measures[index++] = data[allCount]; } else { measures[index++] = data[allCount]; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java index 2ac138b..55a8693 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java @@ -230,7 +230,7 @@ public class UnsafeCarbonRowPage { CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length); size += bigDecimalInBytes.length; - rowToFill[dimensionSize + mesCount] = bigDecimalInBytes; + rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); break; } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index d69d137..7fb9b6e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator; @@ -341,7 +342,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { short aShort = stream.readShort(); byte[] bigDecimalInBytes = new byte[aShort]; stream.readFully(bigDecimalInBytes); - row[dimensionCount + mesCount] = bigDecimalInBytes; + row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); break; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java index a83e09e..71e5727 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.steps; import java.io.File; import java.io.IOException; -import java.math.BigDecimal; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -33,7 +32,6 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.newflow.DataField; @@ -257,17 +255,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces Object[] measures = new Object[outputLength]; for (int i = 0; i < this.measureCount; i++) { - Object value = row.getObject(i + this.dimensionWithComplexCount); - if (null != value) { - if (measureDataType[i] == DataType.DECIMAL) { - BigDecimal val = (BigDecimal) value; - measures[i] = DataTypeUtil.bigDecimalToByte(val); - } else { - measures[i] = value; - } - } else { - measures[i] = null; - } + measures[i] = row.getObject(i + this.dimensionWithComplexCount); } return WriteStepRowUtil.fromColumnCategory(dim, nonDicArray, measures); http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java index 0f7ae1a..ce7b321 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.NonDictionaryUtil; @@ -356,7 +357,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold int len = stream.readInt(); byte[] buff = new byte[len]; stream.readFully(buff); - measures[index++] = buff; + measures[index++] = DataTypeUtil.byteToBigDecimal(buff); break; } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/12911629/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index f068400..7930763 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -20,12 +20,12 @@ package org.apache.carbondata.processing.store; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.datastore.GenericDataType; +import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.ComplexColumnPage; @@ -35,7 +35,6 @@ import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.spark.sql.types.Decimal; @@ -69,11 +68,11 @@ public class TablePage { int numDictDimension = model.getMDKeyGenerator().getDimCount(); dictDimensionPage = new ColumnPage[numDictDimension]; for (int i = 0; i < dictDimensionPage.length; i++) { - dictDimensionPage[i] = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize); + dictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize); } noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()]; for (int i = 0; i < noDictDimensionPage.length; i++) { - noDictDimensionPage[i] = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize); + noDictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize); } complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()]; for (int i = 0; i < complexDimensionPage.length; i++) { @@ -83,8 +82,10 @@ public class TablePage { } measurePage = new ColumnPage[model.getMeasureCount()]; DataType[] dataTypes = model.getMeasureDataType(); + TableSpec.MeasureSpec measureSpec = model.getTableSpec().getMeasureSpec(); for (int i = 0; i < measurePage.length; i++) { - measurePage[i] = ColumnPage.newPage(dataTypes[i], pageSize); + measurePage[i] = ColumnPage + .newPage(dataTypes[i], pageSize, measureSpec.getScale(i), measureSpec.getPrecision(i)); } } @@ -132,8 +133,7 @@ public class TablePage { if (measurePage[i].getDataType() == DataType.DECIMAL && model.isCompactionFlow() && value != null) { - BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal(); - value = DataTypeUtil.bigDecimalToByte(bigDecimal); + value = ((Decimal) value).toJavaBigDecimal(); } measurePage[i].putData(rowId, value); }