[CARBONDATA-2851][CARBONDATA-2852] Support zstd as column compressor in final store
1. add zstd compressor for compressing column data 2. add zstd support in thrift 3. since zstd does not support zero-copy while compressing, offheap will not take effect for zstd 4. support lazy load for compressor 5. Support new compressor on legacy store : In query procedure, we need to decompress the column page. Previously we get the compressor from system property. Now since we support new compressors, we should read the compressor information from the metadata in datafiles. 6. Determine the column compressor before data loading. we will get the column compressor before data loading/compaction start, so that it can make all the pages use the same compressor in case of concurrent modifying compressor during loading. 7. optimize parameters for column page, use columnPageEncodeMeta instead of its members Support column compressor in table properties Support specifying column compressor while creating table in table properties. Store compressor name in metadata instead of enum store compressor name in metadata instead of enum, this will make it more extensible. This closes #2628 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f08c4ab Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f08c4ab Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f08c4ab Branch: refs/heads/master Commit: 8f08c4abb5a0dc1abd4513613bcb75559ab51761 Parents: 7b31b91 Author: xuchuanyin <[email protected]> Authored: Tue Sep 11 14:20:12 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Sep 12 17:54:36 2018 +0800 ---------------------------------------------------------------------- .../chunk/impl/DimensionRawColumnChunk.java | 20 +- .../reader/dimension/AbstractChunkReader.java | 3 +- ...mpressedDimensionChunkFileBasedReaderV1.java | 5 +- ...mpressedDimensionChunkFileBasedReaderV2.java | 5 +- ...mpressedDimensionChunkFileBasedReaderV3.java | 14 +- .../measure/AbstractMeasureChunkReader.java | 2 + ...CompressedMeasureChunkFileBasedReaderV1.java | 4 +- ...CompressedMeasureChunkFileBasedReaderV2.java | 7 +- ...CompressedMeasureChunkFileBasedReaderV3.java | 12 +- ...essedMsrChunkFileBasedPageLevelReaderV3.java | 7 +- .../core/datastore/compression/Compressor.java | 13 +- .../compression/CompressorFactory.java | 75 +++- .../datastore/compression/SnappyCompressor.java | 16 +- .../datastore/compression/ZstdCompressor.java | 170 ++++++++ .../core/datastore/page/ColumnPage.java | 280 +++++++------ .../core/datastore/page/ComplexColumnPage.java | 16 +- .../core/datastore/page/DecimalColumnPage.java | 61 +-- .../page/DecoderBasedFallbackEncoder.java | 15 +- .../core/datastore/page/LazyColumnPage.java | 2 +- .../datastore/page/LocalDictColumnPage.java | 5 +- .../datastore/page/SafeDecimalColumnPage.java | 11 +- .../datastore/page/SafeFixLengthColumnPage.java | 40 +- .../datastore/page/SafeVarLengthColumnPage.java | 13 +- .../datastore/page/UnsafeDecimalColumnPage.java | 38 +- .../page/UnsafeFixLengthColumnPage.java | 96 +++-- .../page/UnsafeVarLengthColumnPage.java | 10 +- .../datastore/page/VarLengthColumnPageBase.java | 160 +++++--- .../page/encoding/ColumnPageEncoder.java | 6 +- .../page/encoding/ColumnPageEncoderMeta.java | 12 +- .../page/encoding/DefaultEncodingFactory.java | 21 +- .../page/encoding/EncodingFactory.java | 9 +- .../adaptive/AdaptiveDeltaFloatingCodec.java | 9 +- .../adaptive/AdaptiveDeltaIntegralCodec.java | 9 +- .../adaptive/AdaptiveFloatingCodec.java | 9 +- .../adaptive/AdaptiveIntegralCodec.java | 9 +- .../encoding/compress/DirectCompressCodec.java | 95 ++--- .../legacy/ComplexDimensionIndexCodec.java | 8 +- .../legacy/DictDimensionIndexCodec.java | 7 +- .../legacy/DirectDictDimensionIndexCodec.java | 8 +- .../legacy/HighCardDictDimensionIndexCodec.java | 7 +- .../dimension/legacy/IndexStorageCodec.java | 5 +- .../datastore/page/encoding/rle/RLECodec.java | 14 +- .../page/encoding/rle/RLEEncoderMeta.java | 4 +- .../localdictionary/PageLevelDictionary.java | 16 +- .../core/metadata/datatype/DataType.java | 1 + .../core/scan/executor/util/QueryUtil.java | 8 +- .../core/util/BlockletDataMapUtil.java | 7 +- .../apache/carbondata/core/util/ByteUtil.java | 2 + .../core/util/CarbonMetadataUtil.java | 26 +- .../datastore/page/encoding/RLECodecTest.java | 8 +- .../TestPageLevelDictionary.java | 24 +- docs/configuration-parameters.md | 3 +- format/src/main/thrift/carbondata.thrift | 7 +- .../hadoop/api/CarbonTableOutputFormat.java | 7 + .../hadoop/testutil/StoreCreator.java | 7 + .../presto/util/CarbonDataStoreCreator.scala | 6 + .../dataload/TestLoadDataWithCompression.scala | 411 +++++++++++++++++++ .../TestNonTransactionalCarbonTable.scala | 8 +- .../LocalDictionarySupportLoadTableTest.scala | 8 +- .../streaming/StreamSinkFactory.scala | 5 + .../spark/rdd/CarbonDataRDDFactory.scala | 5 + .../stream/CarbonStreamRecordReader.java | 12 +- .../CarbonAlterTableCompactionCommand.scala | 5 + .../management/CarbonLoadDataCommand.scala | 5 + ...arbonAlterTableAddHivePartitionCommand.scala | 5 + .../CarbonAlterTableDropPartitionCommand.scala | 5 + .../CarbonAlterTableSplitPartitionCommand.scala | 8 +- .../table/CarbonCreateTableCommand.scala | 14 + .../datasources/SparkCarbonTableFormat.scala | 6 + .../spark/util/AllDictionaryTestCase.scala | 6 + .../util/ExternalColumnDictionaryTestCase.scala | 6 + .../CarbonGetTableDetailComandTestCase.scala | 6 +- .../loading/CarbonDataLoadConfiguration.java | 12 + .../loading/DataLoadProcessBuilder.java | 1 + .../loading/model/CarbonLoadModel.java | 15 + .../loading/model/CarbonLoadModelBuilder.java | 26 +- .../store/CarbonFactDataHandlerModel.java | 11 + .../carbondata/processing/store/TablePage.java | 32 +- .../streaming/CarbonStreamRecordWriter.java | 37 +- .../streaming/StreamBlockletReader.java | 5 +- .../streaming/StreamBlockletWriter.java | 5 +- 81 files changed, 1570 insertions(+), 543 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java index d645e08..8791cea 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader; +import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; @@ -33,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl; +import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.format.Encoding; import org.apache.carbondata.format.LocalDictionaryChunk; @@ -144,7 +146,11 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk { if (null != getDataChunkV3() && null != getDataChunkV3().local_dictionary && null == localDictionary) { try { - localDictionary = getDictionary(getDataChunkV3().local_dictionary); + String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + getDataChunkV3().data_chunk_list.get(0).chunk_meta); + + Compressor compressor = CompressorFactory.getInstance().getCompressor(compressorName); + localDictionary = getDictionary(getDataChunkV3().local_dictionary, compressor); } catch (IOException | MemoryException e) { throw new RuntimeException(e); } @@ -160,17 +166,17 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk { * @throws IOException * @throws MemoryException */ - private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk) - throws IOException, MemoryException { + private CarbonDictionary getDictionary(LocalDictionaryChunk localDictionaryChunk, + Compressor compressor) throws IOException, MemoryException { if (null != localDictionaryChunk) { List<Encoding> encodings = localDictionaryChunk.getDictionary_meta().getEncoders(); List<ByteBuffer> encoderMetas = localDictionaryChunk.getDictionary_meta().getEncoder_meta(); - ColumnPageDecoder decoder = - DefaultEncodingFactory.getInstance().createDecoder(encodings, encoderMetas); + ColumnPageDecoder decoder = DefaultEncodingFactory.getInstance().createDecoder( + encodings, encoderMetas, compressor.getName()); ColumnPage decode = decoder.decode(localDictionaryChunk.getDictionary_data(), 0, localDictionaryChunk.getDictionary_data().length); - BitSet usedDictionary = BitSet.valueOf(CompressorFactory.getInstance().getCompressor() - .unCompressByte(localDictionaryChunk.getDictionary_values())); + BitSet usedDictionary = BitSet.valueOf(compressor.unCompressByte( + localDictionaryChunk.getDictionary_values())); int length = usedDictionary.length(); int index = 0; byte[][] dictionary = new byte[length][]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java index 28e8741..b08f9ed 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java @@ -19,7 +19,6 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader; import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.util.CarbonProperties; @@ -32,7 +31,7 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader /** * compressor will be used to uncompress the data */ - protected static final Compressor COMPRESSOR = CompressorFactory.getInstance().getCompressor(); + protected Compressor compressor; /** * size of the each column value http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java index 8256b7e..3df7efb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionCo import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -55,6 +56,8 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead final int[] eachColumnValueSize, final String filePath) { super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows()); this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk(); + // for v1 store, the compressor is snappy + this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor(); } /** @@ -108,7 +111,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead FileReader fileReader = dimensionRawColumnChunk.getFileReader(); ByteBuffer rawData = dimensionRawColumnChunk.getRawData(); - dataPage = COMPRESSOR.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(), + dataPage = compressor.unCompressByte(rawData.array(), (int) dimensionRawColumnChunk.getOffSet(), dimensionRawColumnChunk.getLength()); // if row id block is present then read the row id chunk and uncompress it http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java index a44d710..7d00fa4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionCo import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; @@ -47,6 +48,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo, final int[] eachColumnValueSize, final String filePath) { super(blockletInfo, eachColumnValueSize, filePath); + // for v2 store, the compressor is snappy + this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor(); } /** @@ -143,7 +146,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead } // first read the data and uncompressed it - dataPage = COMPRESSOR + dataPage = compressor .unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length); copySourcePoint += dimensionColumnChunk.data_page_length; // if row id block is present then read the row id chunk and uncompress it http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index 8a2b74e..dc0f171 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunk import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory; @@ -37,6 +38,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.scan.executor.util.QueryUtil; +import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; @@ -200,6 +202,9 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead // get the data buffer ByteBuffer rawData = rawColumnPage.getRawData(); DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber); + String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + pageMetadata.getChunk_meta()); + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); // calculating the start point of data // as buffer can contain multiple column data, start point will be datachunkoffset + // data chunk length + page offset @@ -214,7 +219,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead throws IOException, MemoryException { List<Encoding> encodings = pageMetadata.getEncoders(); List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta(); - ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas); + String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + pageMetadata.getChunk_meta()); + ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas, + compressorName); return decoder .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage); } @@ -242,7 +250,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead if (isEncodedWithMeta(pageMetadata)) { ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset, null != rawColumnPage.getLocalDictionary()); - decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); + decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor)); return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), isEncodedWithAdaptiveMeta(pageMetadata)); } else { @@ -273,7 +281,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead int[] rlePage; int[] invertedIndexes = new int[0]; int[] invertedIndexesReverse = new int[0]; - dataPage = COMPRESSOR.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length); + dataPage = compressor.unCompressByte(pageData.array(), offset, pageMetadata.data_page_length); offset += pageMetadata.data_page_length; // if row id block is present then read the row id chunk and uncompress it if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java index d781cea..6774fcb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure; import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader; +import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory; import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; @@ -24,6 +25,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; * Measure block reader abstract class */ public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader { + protected Compressor compressor; protected EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java index f0c1b75..e1bcdc0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; import org.apache.carbondata.core.memory.MemoryException; @@ -96,7 +97,8 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun int blockIndex = measureRawColumnChunk.getColumnIndex(); DataChunk dataChunk = measureColumnChunks.get(blockIndex); ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0); - ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta); + ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta, + CompressorFactory.SupportedCompressor.SNAPPY.getName()); ColumnPage decodedPage = codec.decode(measureRawColumnChunk.getRawData().array(), (int) measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength()); decodedPage.setNullBits(dataChunk.getNullValueIndexForColumn()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java index 9864ab8..86083cd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; import org.apache.carbondata.core.memory.MemoryException; @@ -46,6 +47,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo, final String filePath) { super(blockletInfo, filePath); + this.compressor = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor(); } @Override @@ -126,7 +128,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun copyPoint += measureColumnChunkLength.get(blockIndex); ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint); - page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence)); + page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence, this.compressor)); return page; } @@ -137,7 +139,8 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun byte[] encodedMeta = encoder_meta.get(0).array(); ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV2(encodedMeta); - ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta); + ColumnPageDecoder codec = encodingFactory.createDecoderLegacy(meta, + CompressorFactory.SupportedCompressor.SNAPPY.getName()); byte[] rawData = measureRawColumnChunk.getRawData().array(); return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java index e389ac6..240771a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java @@ -23,11 +23,13 @@ import java.util.List; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.scan.executor.util.QueryUtil; +import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; @@ -192,6 +194,9 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3(); // data chunk of page DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber); + String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + pageMetadata.getChunk_meta()); + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); // calculating the start point of data // as buffer can contain multiple column data, start point will be datachunkoffset + // data chunk length + page offset @@ -199,7 +204,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber); ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset); - decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); + decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor)); return decodedPage; } @@ -210,7 +215,10 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun throws MemoryException, IOException { List<Encoding> encodings = pageMetadata.getEncoders(); List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta(); - ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas); + String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + pageMetadata.getChunk_meta()); + ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas, + compressorName); return codec.decode(pageData.array(), offset, pageMetadata.data_page_length); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java index 052f745..924a206 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java @@ -22,10 +22,12 @@ import java.nio.ByteBuffer; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.scan.executor.util.QueryUtil; +import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; @@ -138,6 +140,9 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3 DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3(); // data chunk of page DataChunk2 pageMetadata = dataChunk3.getData_chunk_list().get(pageNumber); + String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + pageMetadata.getChunk_meta()); + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); // calculating the start point of data // as buffer can contain multiple column data, start point will be datachunkoffset + // data chunk length + page offset @@ -147,7 +152,7 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3 .readByteBuffer(filePath, offset, pageMetadata.data_page_length); ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0); - decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); + decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor)); return decodedPage; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java index 5c2a5fb..282e12c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java @@ -33,7 +33,7 @@ public interface Compressor { byte[] compressShort(short[] unCompInput); - short[] unCompressShort(byte[] compInput, int offset, int lenght); + short[] unCompressShort(byte[] compInput, int offset, int length); byte[] compressInt(int[] unCompInput); @@ -55,5 +55,14 @@ public interface Compressor { long rawUncompress(byte[] input, byte[] output) throws IOException; - int maxCompressedLength(int inputSize); + long maxCompressedLength(long inputSize); + + /** + * Whether this compressor support zero-copy during compression. + * Zero-copy means that the compressor support receiving memory address (pointer) + * and returning result in memory address (pointer). + * Currently not all java version of the compressors support this feature. + * @return true if it supports, otherwise return false + */ + boolean supportUnsafe(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java index 18f6252..40459b1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java @@ -17,25 +17,53 @@ package org.apache.carbondata.core.datastore.compression; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; public class CompressorFactory { - private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory(); - private final Compressor snappyCompressor; + private final Map<String, SupportedCompressor> compressors = new HashMap<>(); + + public enum SupportedCompressor { + SNAPPY("snappy", SnappyCompressor.class), + ZSTD("zstd", ZstdCompressor.class); + + private String name; + private Class<Compressor> compressorClass; + private transient Compressor compressor; + + SupportedCompressor(String name, Class compressorCls) { + this.name = name; + this.compressorClass = compressorCls; + } + + public String getName() { + return name; + } + + /** + * we will load the compressor only if it is needed + */ + public Compressor getCompressor() { + if (this.compressor == null) { + try { + this.compressor = compressorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Exception occurs while getting compressor for " + name); + } + } + return this.compressor; + } + } private CompressorFactory() { - String compressorType = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); - switch (compressorType) { - case "snappy": - snappyCompressor = new SnappyCompressor(); - break; - default: - throw new RuntimeException( - "Invalid compressor type provided! Please provide valid compressor type"); + for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) { + compressors.put(supportedCompressor.getName(), supportedCompressor); } } @@ -43,16 +71,29 @@ public class CompressorFactory { return COMPRESSOR_FACTORY; } + /** + * get the default compressor. + * This method can only be called in data load procedure to compress column page. + * In query procedure, we should read the compressor information from the metadata + * in datafiles when we want to decompress the content. + */ public Compressor getCompressor() { - return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + String compressorType = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); + if (!compressors.containsKey(compressorType)) { + throw new UnsupportedOperationException( + "Invalid compressor type provided! Currently we only support " + + Arrays.toString(SupportedCompressor.values())); + } + return getCompressor(compressorType); } public Compressor getCompressor(String name) { - if (name.equalsIgnoreCase("snappy")) { - return snappyCompressor; - } else { - throw new UnsupportedOperationException(name + " compressor is not supported"); + if (compressors.containsKey(name.toLowerCase())) { + return compressors.get(name.toLowerCase()).getCompressor(); } + throw new UnsupportedOperationException( + name + " compressor is not supported, currently we only support " + + Arrays.toString(SupportedCompressor.values())); } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java index bd740b2..15f912a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java @@ -34,7 +34,7 @@ public class SnappyCompressor implements Compressor { // snappy estimate max compressed length as 32 + source_len + source_len/6 public static final int MAX_BYTE_TO_COMPRESS = (int)((Integer.MAX_VALUE - 32) / 7.0 * 6); - private final SnappyNative snappyNative; + private final transient SnappyNative snappyNative; public SnappyCompressor() { Snappy snappy = new Snappy(); @@ -107,9 +107,9 @@ public class SnappyCompressor implements Compressor { } } - @Override public short[] unCompressShort(byte[] compInput, int offset, int lenght) { + @Override public short[] unCompressShort(byte[] compInput, int offset, int length) { try { - return Snappy.uncompressShortArray(compInput, offset, lenght); + return Snappy.uncompressShortArray(compInput, offset, length); } catch (IOException e) { LOGGER.error(e, e.getMessage()); throw new RuntimeException(e); @@ -196,12 +196,18 @@ public class SnappyCompressor implements Compressor { return snappyNative.rawCompress(inputAddress, inputSize, outputAddress); } + @Override public long rawUncompress(byte[] input, byte[] output) throws IOException { return snappyNative.rawUncompress(input, 0, input.length, output, 0); } @Override - public int maxCompressedLength(int inputSize) { - return snappyNative.maxCompressedLength(inputSize); + public long maxCompressedLength(long inputSize) { + return snappyNative.maxCompressedLength((int) inputSize); + } + + @Override + public boolean supportUnsafe() { + return true; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java new file mode 100644 index 0000000..914c3e7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ZstdCompressor.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.ShortBuffer; + +import org.apache.carbondata.core.util.ByteUtil; + +import com.github.luben.zstd.Zstd; + +public class ZstdCompressor implements Compressor { + private static final int COMPRESS_LEVEL = 3; + + public ZstdCompressor() { + } + + @Override + public String getName() { + return "zstd"; + } + + @Override + public byte[] compressByte(byte[] unCompInput) { + return Zstd.compress(unCompInput, COMPRESS_LEVEL); + } + + @Override + public byte[] compressByte(byte[] unCompInput, int byteSize) { + return Zstd.compress(unCompInput, COMPRESS_LEVEL); + } + + @Override + public byte[] unCompressByte(byte[] compInput) { + long decompressedSize = Zstd.decompressedSize(compInput); + return Zstd.decompress(compInput, (int) decompressedSize); + } + + @Override + public byte[] unCompressByte(byte[] compInput, int offset, int length) { + // todo: how to avoid memory copy + byte[] dstBytes = new byte[length]; + System.arraycopy(compInput, offset, dstBytes, 0, length); + return unCompressByte(dstBytes); + } + + @Override + public byte[] compressShort(short[] unCompInput) { + ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_SHORT); + unCompBuffer.asShortBuffer().put(unCompInput); + return compressByte(unCompBuffer.array()); + } + + @Override + public short[] unCompressShort(byte[] compInput, int offset, int length) { + byte[] unCompArray = unCompressByte(compInput, offset, length); + ShortBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asShortBuffer(); + short[] shorts = new short[unCompArray.length / ByteUtil.SIZEOF_SHORT]; + unCompBuffer.get(shorts); + return shorts; + } + + @Override + public byte[] compressInt(int[] unCompInput) { + ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_INT); + unCompBuffer.asIntBuffer().put(unCompInput); + return compressByte(unCompBuffer.array()); + } + + @Override + public int[] unCompressInt(byte[] compInput, int offset, int length) { + byte[] unCompArray = unCompressByte(compInput, offset, length); + IntBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asIntBuffer(); + int[] ints = new int[unCompArray.length / ByteUtil.SIZEOF_INT]; + unCompBuffer.get(ints); + return ints; + } + + @Override + public byte[] compressLong(long[] unCompInput) { + ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_LONG); + unCompBuffer.asLongBuffer().put(unCompInput); + return compressByte(unCompBuffer.array()); + } + + @Override + public long[] unCompressLong(byte[] compInput, int offset, int length) { + byte[] unCompArray = unCompressByte(compInput, offset, length); + LongBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asLongBuffer(); + long[] longs = new long[unCompArray.length / ByteUtil.SIZEOF_LONG]; + unCompBuffer.get(longs); + return longs; + } + + @Override + public byte[] compressFloat(float[] unCompInput) { + ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_FLOAT); + unCompBuffer.asFloatBuffer().put(unCompInput); + return compressByte(unCompBuffer.array()); + } + + @Override + public float[] unCompressFloat(byte[] compInput, int offset, int length) { + byte[] unCompArray = unCompressByte(compInput, offset, length); + FloatBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asFloatBuffer(); + float[] floats = new float[unCompArray.length / ByteUtil.SIZEOF_FLOAT]; + unCompBuffer.get(floats); + return floats; + } + + @Override + public byte[] compressDouble(double[] unCompInput) { + ByteBuffer unCompBuffer = ByteBuffer.allocate(unCompInput.length * ByteUtil.SIZEOF_DOUBLE); + unCompBuffer.asDoubleBuffer().put(unCompInput); + return compressByte(unCompBuffer.array()); + } + + @Override + public double[] unCompressDouble(byte[] compInput, int offset, int length) { + byte[] unCompArray = unCompressByte(compInput, offset, length); + DoubleBuffer unCompBuffer = ByteBuffer.wrap(unCompArray).asDoubleBuffer(); + double[] doubles = new double[unCompArray.length / ByteUtil.SIZEOF_DOUBLE]; + unCompBuffer.get(doubles); + return doubles; + } + + @Override + public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException { + throw new RuntimeException("Not implemented rawCompress for zstd yet"); + } + + @Override + public long rawUncompress(byte[] input, byte[] output) throws IOException { + return Zstd.decompress(output, input); + } + + @Override + public long maxCompressedLength(long inputSize) { + return Zstd.compressBound(inputSize); + } + + /** + * currently java version of zstd does not support this feature. + * It may support it in upcoming release 1.3.5-3, then we can optimize this accordingly. + */ + @Override + public boolean supportUnsafe() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 5b42735..796083d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -51,11 +51,7 @@ public abstract class ColumnPage { // number of row in this page protected int pageSize; - // data type of the page storage - protected final DataType dataType; - - // specification of this column - private final TableSpec.ColumnSpec columnSpec; + protected ColumnPageEncoderMeta columnPageEncoderMeta; // The index of the rowId whose value is null, will be set to 1 protected BitSet nullBitSet; @@ -70,15 +66,14 @@ public abstract class ColumnPage { /** * Create a new column page with input data type and page size. */ - protected ColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { - this.columnSpec = columnSpec; - this.dataType = dataType; + protected ColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { this.pageSize = pageSize; this.nullBitSet = new BitSet(pageSize); + this.columnPageEncoderMeta = columnPageEncoderMeta; } public DataType getDataType() { - return dataType; + return columnPageEncoderMeta.getStoreDataType(); } public SimpleStatsResult getStatistics() { @@ -93,102 +88,112 @@ public abstract class ColumnPage { this.statsCollector = statsCollector; } - private static ColumnPage createDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, + private static ColumnPage createDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { if (unsafe) { try { - return new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new UnsafeDecimalColumnPage(columnPageEncoderMeta, pageSize); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeDecimalColumnPage(columnSpec, dataType, pageSize); + return new SafeDecimalColumnPage(columnPageEncoderMeta, pageSize); } } - private static ColumnPage createVarLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, + private static ColumnPage createVarLengthPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { if (unsafe) { try { - return new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeVarLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize); } } - private static ColumnPage createFixLengthPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { + private static ColumnPage createFixLengthPage( + ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnPageEncoderMeta, pageSize); } } - private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec, - DataType dataType, int pageSize, int eachValueSize) { + private static ColumnPage createFixLengthByteArrayPage( + ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int eachValueSize) { if (unsafe) { try { - return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize, eachValueSize); } catch (MemoryException e) { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); + return new SafeFixLengthColumnPage(columnPageEncoderMeta, pageSize); } } - private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) { - if (DataTypes.isDecimal(dataType)) { - return createDecimalPage(columnSpec, dataType, pageSize); - } else if (dataType.equals(BYTE_ARRAY)) { - return createVarLengthPage(columnSpec, dataType, pageSize); + private static ColumnPage createPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { + if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType())) { + return createDecimalPage(columnPageEncoderMeta, pageSize); + } else if (columnPageEncoderMeta.getStoreDataType().equals(BYTE_ARRAY)) { + return createVarLengthPage(columnPageEncoderMeta, pageSize); } else { - return createFixLengthPage(columnSpec, dataType, pageSize); + return createFixLengthPage(columnPageEncoderMeta, pageSize); } } - public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) - throws MemoryException { - return newPage(columnSpec, dataType, pageSize); + public static ColumnPage newDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta, + int pageSize) throws MemoryException { + return newPage(columnPageEncoderMeta, pageSize); } - public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType, + public static ColumnPage newLocalDictPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, LocalDictionaryGenerator localDictionaryGenerator, boolean isComplexTypePrimitive) throws MemoryException { boolean isDecoderBasedFallBackEnabled = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK, CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT)); + ColumnPage actualPage; + ColumnPage encodedPage; if (unsafe) { - return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, - CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), localDictionaryGenerator, - isComplexTypePrimitive, isDecoderBasedFallBackEnabled); + actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize); + encodedPage = new UnsafeFixLengthColumnPage( + new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY, + columnPageEncoderMeta.getCompressorName()), + pageSize, + CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE); } else { - return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), - localDictionaryGenerator, isComplexTypePrimitive, isDecoderBasedFallBackEnabled); + actualPage = new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize); + encodedPage = new SafeFixLengthColumnPage( + new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY, + columnPageEncoderMeta.getCompressorName()), + pageSize); } + return new LocalDictColumnPage(actualPage, encodedPage, localDictionaryGenerator, + isComplexTypePrimitive, isDecoderBasedFallBackEnabled); } /** * Create a new page of dataType and number of row = pageSize */ - public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataType, - int pageSize) throws MemoryException { + public static ColumnPage newPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) + throws MemoryException { ColumnPage instance; + DataType dataType = columnPageEncoderMeta.getStoreDataType(); + TableSpec.ColumnSpec columnSpec = columnPageEncoderMeta.getColumnSpec(); + String compressorName = columnPageEncoderMeta.getCompressorName(); if (unsafe) { if (dataType == DataTypes.BOOLEAN) { - instance = new UnsafeFixLengthColumnPage(columnSpec, BYTE, pageSize); + instance = new UnsafeFixLengthColumnPage( + new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), pageSize); } else if (dataType == DataTypes.BYTE || dataType == DataTypes.SHORT || dataType == DataTypes.SHORT_INT || @@ -196,39 +201,43 @@ public abstract class ColumnPage { dataType == DataTypes.LONG || dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) { - instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + instance = new UnsafeFixLengthColumnPage( + new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize); } else if (dataType == DataTypes.TIMESTAMP) { - instance = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.LONG, pageSize); + instance = new UnsafeFixLengthColumnPage( + new ColumnPageEncoderMeta(columnSpec, DataTypes.LONG, compressorName), pageSize); } else if (DataTypes.isDecimal(dataType)) { - instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); + instance = new UnsafeDecimalColumnPage( + new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize); } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.VARCHAR) { - instance = new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize); + instance = new UnsafeVarLengthColumnPage( + new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize); } else { throw new RuntimeException("Unsupported data dataType: " + dataType); } } else { if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) { - instance = newBytePage(columnSpec, new byte[pageSize]); + instance = newBytePage(columnSpec, new byte[pageSize], compressorName); } else if (dataType == DataTypes.SHORT) { - instance = newShortPage(columnSpec, new short[pageSize]); + instance = newShortPage(columnSpec, new short[pageSize], compressorName); } else if (dataType == DataTypes.SHORT_INT) { - instance = newShortIntPage(columnSpec, new byte[pageSize * 3]); + instance = newShortIntPage(columnSpec, new byte[pageSize * 3], compressorName); } else if (dataType == DataTypes.INT) { - instance = newIntPage(columnSpec, new int[pageSize]); + instance = newIntPage(columnSpec, new int[pageSize], compressorName); } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { - instance = newLongPage(columnSpec, new long[pageSize]); + instance = newLongPage(columnSpec, new long[pageSize], compressorName); } else if (dataType == DataTypes.FLOAT) { - instance = newFloatPage(columnSpec, new float[pageSize]); + instance = newFloatPage(columnSpec, new float[pageSize], compressorName); } else if (dataType == DataTypes.DOUBLE) { - instance = newDoublePage(columnSpec, new double[pageSize]); + instance = newDoublePage(columnSpec, new double[pageSize], compressorName); } else if (DataTypes.isDecimal(dataType)) { - instance = newDecimalPage(columnSpec, new byte[pageSize][]); + instance = newDecimalPage(columnSpec, new byte[pageSize][], compressorName); } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY || dataType == DataTypes.VARCHAR) { - instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize); + instance = new SafeVarLengthColumnPage(columnPageEncoderMeta, pageSize); } else { throw new RuntimeException("Unsupported data dataType: " + dataType); } @@ -236,83 +245,103 @@ public abstract class ColumnPage { return instance; } - public static ColumnPage wrapByteArrayPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) { - ColumnPage columnPage = createPage(columnSpec, BYTE_ARRAY, byteArray.length); + public static ColumnPage wrapByteArrayPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, BYTE_ARRAY, compressorName), byteArray.length); columnPage.setByteArrayPage(byteArray); return columnPage; } - private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData) { - ColumnPage columnPage = createPage(columnSpec, BYTE, byteData.length); + private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), byteData.length); columnPage.setBytePage(byteData); return columnPage; } - private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData) { - ColumnPage columnPage = createPage(columnSpec, SHORT, shortData.length); + private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, SHORT, compressorName), shortData.length); columnPage.setShortPage(shortData); return columnPage; } - private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData) { - ColumnPage columnPage = createPage(columnSpec, SHORT_INT, shortIntData.length / 3); + private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, SHORT_INT, compressorName), shortIntData.length / 3); columnPage.setShortIntPage(shortIntData); return columnPage; } - private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData) { - ColumnPage columnPage = createPage(columnSpec, INT, intData.length); + private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, INT, compressorName), intData.length); columnPage.setIntPage(intData); return columnPage; } - private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData) { - ColumnPage columnPage = createPage(columnSpec, LONG, longData.length); + private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), longData.length); columnPage.setLongPage(longData); return columnPage; } - private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData) { - ColumnPage columnPage = createPage(columnSpec, FLOAT, floatData.length); + private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, FLOAT, compressorName), floatData.length); columnPage.setFloatPage(floatData); return columnPage; } - private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData) { - ColumnPage columnPage = createPage(columnSpec, DOUBLE, doubleData.length); + private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, DOUBLE, compressorName), doubleData.length); columnPage.setDoublePage(doubleData); return columnPage; } - private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) { - ColumnPage columnPage = - createPage(columnSpec, columnSpec.getSchemaDataType(), byteArray.length); + private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray, + String compressorName) { + ColumnPage columnPage = createPage( + new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName), + byteArray.length); columnPage.setByteArrayPage(byteArray); return columnPage; } private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedByteArray) throws MemoryException { - return VarLengthColumnPageBase.newDecimalColumnPage(columnSpec, lvEncodedByteArray); + byte[] lvEncodedByteArray, String compressorName) throws MemoryException { + return VarLengthColumnPageBase.newDecimalColumnPage( + columnSpec, lvEncodedByteArray, compressorName); } private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedByteArray, int lvLength) throws MemoryException { - return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength); + byte[] lvEncodedByteArray, int lvLength, String compressorName) throws MemoryException { + return VarLengthColumnPageBase.newLVBytesColumnPage( + columnSpec, lvEncodedByteArray, lvLength, compressorName); } private static ColumnPage newComplexLVBytesPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedByteArray, int lvLength) throws MemoryException { - return VarLengthColumnPageBase - .newComplexLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength); + byte[] lvEncodedByteArray, int lvLength, String compressorName) throws MemoryException { + return VarLengthColumnPageBase.newComplexLVBytesColumnPage( + columnSpec, lvEncodedByteArray, lvLength, compressorName); } private static ColumnPage newFixedByteArrayPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedByteArray, int eachValueSize) throws MemoryException { + byte[] lvEncodedByteArray, int eachValueSize, String compressorName) throws MemoryException { int pageSize = lvEncodedByteArray.length / eachValueSize; - ColumnPage fixLengthByteArrayPage = - createFixLengthByteArrayPage(columnSpec, columnSpec.getSchemaDataType(), pageSize, - eachValueSize); + ColumnPage fixLengthByteArrayPage = createFixLengthByteArrayPage( + new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName), + pageSize, eachValueSize); byte[] data = null; int offset = 0; for (int i = 0; i < pageSize; i++) { @@ -379,8 +408,9 @@ public abstract class ColumnPage { nullBitSet.set(rowId); return; } + DataType dataType = columnPageEncoderMeta.getStoreDataType(); if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) { - if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) { + if (columnPageEncoderMeta.getColumnSpec().getSchemaDataType() == DataTypes.BOOLEAN) { value = BooleanConvert.boolean2Byte((Boolean) value); } putByte(rowId, (byte) value); @@ -419,9 +449,10 @@ public abstract class ColumnPage { if (nullBitSet.get(rowId)) { return getNull(rowId); } + DataType dataType = columnPageEncoderMeta.getStoreDataType(); if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) { byte value = getByte(rowId); - if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) { + if (columnPageEncoderMeta.getColumnSpec().getSchemaDataType() == DataTypes.BOOLEAN) { return BooleanConvert.byte2Boolean(value); } return value; @@ -501,6 +532,7 @@ public abstract class ColumnPage { * Set null at rowId */ protected void putNull(int rowId) { + DataType dataType = columnPageEncoderMeta.getStoreDataType(); if (dataType == DataTypes.BOOLEAN) { putBoolean(rowId, false); } else if (dataType == DataTypes.BYTE) { @@ -525,11 +557,12 @@ public abstract class ColumnPage { */ private Object getNull(int rowId) { Object result; + DataType dataType = columnPageEncoderMeta.getStoreDataType(); if (dataType == DataTypes.BOOLEAN) { result = getBoolean(rowId); } else if (dataType == DataTypes.BYTE) { result = getByte(rowId); - if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) { + if (columnPageEncoderMeta.getColumnSpec().getSchemaDataType() == DataTypes.BOOLEAN) { result = BooleanConvert.byte2Boolean((byte)result); } } else if (dataType == DataTypes.SHORT) { @@ -679,10 +712,12 @@ public abstract class ColumnPage { public PageLevelDictionary getPageDictionary() { throw new UnsupportedOperationException("Operation Not Supported"); } + /** * Compress page data using specified compressor */ public byte[] compress(Compressor compressor) throws MemoryException, IOException { + DataType dataType = columnPageEncoderMeta.getStoreDataType(); if (dataType == DataTypes.BOOLEAN) { return compressor.compressByte(getBooleanPage()); } else if (dataType == DataTypes.BYTE) { @@ -702,13 +737,13 @@ public abstract class ColumnPage { } else if (DataTypes.isDecimal(dataType)) { return compressor.compressByte(getDecimalPage()); } else if (dataType == DataTypes.BYTE_ARRAY - && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { + && columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { return compressor.compressByte(getComplexChildrenLVFlattenedBytePage()); - } else if (dataType == DataTypes.BYTE_ARRAY && ( - columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT - || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY - || columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE - || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) { + } else if (dataType == DataTypes.BYTE_ARRAY + && (columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_STRUCT + || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_ARRAY + || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.PLAIN_LONG_VALUE + || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.PLAIN_VALUE)) { return compressor.compressByte(getComplexParentFlattenedBytePage()); } else if (dataType == DataTypes.BYTE_ARRAY) { return compressor.compressByte(getLVFlattenedBytePage()); @@ -729,51 +764,54 @@ public abstract class ColumnPage { DataType storeDataType = meta.getStoreDataType(); if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) { byte[] byteData = compressor.unCompressByte(compressedData, offset, length); - return newBytePage(columnSpec, byteData); + return newBytePage(columnSpec, byteData, meta.getCompressorName()); } else if (storeDataType == DataTypes.SHORT) { short[] shortData = compressor.unCompressShort(compressedData, offset, length); - return newShortPage(columnSpec, shortData); + return newShortPage(columnSpec, shortData, meta.getCompressorName()); } else if (storeDataType == DataTypes.SHORT_INT) { byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length); - return newShortIntPage(columnSpec, shortIntData); + return newShortIntPage(columnSpec, shortIntData, meta.getCompressorName()); } else if (storeDataType == DataTypes.INT) { int[] intData = compressor.unCompressInt(compressedData, offset, length); - return newIntPage(columnSpec, intData); + return newIntPage(columnSpec, intData, meta.getCompressorName()); } else if (storeDataType == DataTypes.LONG) { long[] longData = compressor.unCompressLong(compressedData, offset, length); - return newLongPage(columnSpec, longData); + return newLongPage(columnSpec, longData, meta.getCompressorName()); } else if (storeDataType == DataTypes.FLOAT) { float[] floatData = compressor.unCompressFloat(compressedData, offset, length); - return newFloatPage(columnSpec, floatData); + return newFloatPage(columnSpec, floatData, meta.getCompressorName()); } else if (storeDataType == DataTypes.DOUBLE) { double[] doubleData = compressor.unCompressDouble(compressedData, offset, length); - return newDoublePage(columnSpec, doubleData); + return newDoublePage(columnSpec, doubleData, meta.getCompressorName()); } else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && ( columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); return newComplexLVBytesPage(columnSpec, lvVarBytes, - CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName()); } else if (isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); - return newFixedByteArrayPage(columnSpec, lvVarBytes, 3); + return newFixedByteArrayPage(columnSpec, lvVarBytes, 3, meta.getCompressorName()); } else if (storeDataType == DataTypes.BYTE_ARRAY && columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); return newFixedByteArrayPage(columnSpec, lvVarBytes, - CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName()); } else if (storeDataType == DataTypes.BYTE_ARRAY && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); - return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE); + return newFixedByteArrayPage(columnSpec, lvVarBytes, + CarbonCommonConstants.LONG_SIZE_IN_BYTE, meta.getCompressorName()); } else if (storeDataType == DataTypes.BYTE_ARRAY && columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); - return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE); + return newLVBytesPage(columnSpec, lvVarBytes, + CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName()); } else if (storeDataType == DataTypes.BYTE_ARRAY) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); - return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE); + return newLVBytesPage(columnSpec, lvVarBytes, + CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName()); } else { throw new UnsupportedOperationException( "unsupport uncompress column page: " + meta.getStoreDataType()); @@ -791,32 +829,32 @@ public abstract class ColumnPage { DataType storeDataType = meta.getStoreDataType(); if (storeDataType == DataTypes.BYTE) { byte[] byteData = compressor.unCompressByte(compressedData, offset, length); - decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), byteData.length); + decimalPage = createDecimalPage(meta, byteData.length); decimalPage.setBytePage(byteData); return decimalPage; } else if (storeDataType == DataTypes.SHORT) { short[] shortData = compressor.unCompressShort(compressedData, offset, length); - decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortData.length); + decimalPage = createDecimalPage(meta, shortData.length); decimalPage.setShortPage(shortData); return decimalPage; } else if (storeDataType == DataTypes.SHORT_INT) { byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length); - decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortIntData.length); + decimalPage = createDecimalPage(meta, shortIntData.length); decimalPage.setShortIntPage(shortIntData); return decimalPage; } else if (storeDataType == DataTypes.INT) { int[] intData = compressor.unCompressInt(compressedData, offset, length); - decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), intData.length); + decimalPage = createDecimalPage(meta, intData.length); decimalPage.setIntPage(intData); return decimalPage; } else if (storeDataType == DataTypes.LONG) { long[] longData = compressor.unCompressLong(compressedData, offset, length); - decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), longData.length); + decimalPage = createDecimalPage(meta, longData.length); decimalPage.setLongPage(longData); return decimalPage; } else { byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length); - return newDecimalPage(columnSpec, lvEncodedBytes); + return newDecimalPage(columnSpec, lvEncodedBytes, meta.getCompressorName()); } } @@ -829,7 +867,7 @@ public abstract class ColumnPage { } public TableSpec.ColumnSpec getColumnSpec() { - return columnSpec; + return columnPageEncoderMeta.getColumnSpec(); } public boolean isLocalDictGeneratedPage() { @@ -847,4 +885,12 @@ public abstract class ColumnPage { public int getActualRowCount() { throw new UnsupportedOperationException("Operation not supported"); } + + public String getColumnCompressorName() { + return columnPageEncoderMeta.getCompressorName(); + } + + public ColumnPageEncoderMeta getColumnPageEncoderMeta() { + return columnPageEncoderMeta; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java index a7f94e2..921ae50 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector; import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; @@ -71,8 +72,8 @@ public class ComplexColumnPage { * @throws MemoryException * if memory is not sufficient */ - public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap, int pageSize) - throws MemoryException { + public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap, int pageSize, + String columnCompressor) throws MemoryException { DataType dataType; for (int i = 0; i < this.columnPages.length; i++) { LocalDictionaryGenerator localDictionaryGenerator = @@ -83,15 +84,18 @@ public class ComplexColumnPage { if (isColumnPageBasedOnDataType(i)) { // no dictionary primitive types need adaptive encoding, // hence store as actual value instead of byte array - this.columnPages[i] = ColumnPage.newPage(spec, dataType, pageSize); + this.columnPages[i] = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, dataType, columnCompressor), pageSize); this.columnPages[i].setStatsCollector(PrimitivePageStatsCollector.newInstance(dataType)); } else { - this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize); + this.columnPages[i] = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), pageSize); this.columnPages[i].setStatsCollector(new DummyStatsCollector()); } } else { - this.columnPages[i] = ColumnPage - .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator, true); + this.columnPages[i] = ColumnPage.newLocalDictPage( + new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), pageSize, + localDictionaryGenerator, true); this.columnPages[i].setStatsCollector(new DummyStatsCollector()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java index 368a289..e63614f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java @@ -19,8 +19,7 @@ package org.apache.carbondata.core.datastore.page; import java.math.BigDecimal; -import org.apache.carbondata.core.datastore.TableSpec; -import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory; @@ -34,10 +33,11 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase { */ DecimalConverterFactory.DecimalConverter decimalConverter; - DecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { - super(columnSpec, dataType, pageSize); - decimalConverter = DecimalConverterFactory.INSTANCE - .getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale()); + DecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { + super(columnPageEncoderMeta, pageSize); + decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter( + columnPageEncoderMeta.getColumnSpec().getPrecision(), + columnPageEncoderMeta.getColumnSpec().getScale()); } public DecimalConverterFactory.DecimalConverter getDecimalConverter() { @@ -46,67 +46,80 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase { @Override public byte[] getBytePage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public short[] getShortPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public byte[] getShortIntPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public int[] getIntPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public long[] getLongPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public float[] getFloatPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public double[] getDoublePage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public byte[][] getByteArrayPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public float getFloat(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public double getDouble(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putDouble(int rowId, double value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setFloatPage(float[] floatData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setDoublePage(double[] doubleData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } // used for building datamap in loading process @@ -127,15 +140,15 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase { private BigDecimal getDecimalFromDecompressData(int rowId) { long value; - if (dataType == DataTypes.BYTE) { + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) { value = getByte(rowId); - } else if (dataType == DataTypes.SHORT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) { value = getShort(rowId); - } else if (dataType == DataTypes.SHORT_INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) { value = getShortInt(rowId); - } else if (dataType == DataTypes.INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) { value = getInt(rowId); - } else if (dataType == DataTypes.LONG) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) { value = getLong(rowId); } else { return decimalConverter.getDecimal(getBytes(rowId));
