http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java index 419fd9e..d6c2352 100644 --- a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec; import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector; import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; @@ -57,14 +58,17 @@ public class PageLevelDictionary { private DataType dataType; private boolean isComplexTypePrimitive; + // compressor to be used for the dictionary. The compressor is the same as column compressor. + private String columnCompressor; public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName, - DataType dataType, boolean isComplexTypePrimitive) { + DataType dataType, boolean isComplexTypePrimitive, String columnCompressor) { this.localDictionaryGenerator = localDictionaryGenerator; this.usedDictionaryValues = new BitSet(); this.columnName = columnName; this.dataType = dataType; this.isComplexTypePrimitive = isComplexTypePrimitive; + this.columnCompressor = columnCompressor; } /** @@ -111,8 +115,9 @@ public class PageLevelDictionary { } TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, columnType); - ColumnPage dictionaryColumnPage = - ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, usedDictionaryValues.cardinality()); + ColumnPage dictionaryColumnPage = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), + usedDictionaryValues.cardinality()); // TODO support data type specific stats collector for numeric data types dictionaryColumnPage.setStatsCollector(new DummyStatsCollector()); int rowId = 0; @@ -139,8 +144,9 @@ public class PageLevelDictionary { // get encoded dictionary values LocalDictionaryChunk localDictionaryChunk = encoder.encodeDictionary(dictionaryColumnPage); // set compressed dictionary values - localDictionaryChunk.setDictionary_values(CompressorFactory.getInstance().getCompressor() - .compressByte(usedDictionaryValues.toByteArray())); + localDictionaryChunk.setDictionary_values( + CompressorFactory.getInstance().getCompressor(columnCompressor).compressByte( + usedDictionaryValues.toByteArray())); // free the dictionary page memory dictionaryColumnPage.freeMemory(); return localDictionaryChunk;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java index 4dc1fbc..5a19073 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java @@ -79,6 +79,7 @@ public class DataType implements Serializable { public static char convertType(DataType dataType) { if (dataType == DataTypes.BYTE || + dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.SHORT_INT || dataType == DataTypes.INT || http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java index 2285284..7cc2b09 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -717,16 +716,13 @@ public class QueryUtil { * Below method will be used to convert the thrift presence meta to wrapper * presence meta * - * @param presentMetadataThrift * @return wrapper presence meta */ public static BitSet getNullBitSet( - org.apache.carbondata.format.PresenceMeta presentMetadataThrift) { - Compressor compressor = CompressorFactory.getInstance().getCompressor(); + org.apache.carbondata.format.PresenceMeta presentMetadataThrift, Compressor compressor) { final byte[] present_bit_stream = presentMetadataThrift.getPresent_bit_stream(); if (null != present_bit_stream) { - return BitSet - .valueOf(compressor.unCompressByte(present_bit_stream)); + return BitSet.valueOf(compressor.unCompressByte(present_bit_stream)); } else { return new BitSet(1); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index f14610c..1832cf5 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -357,8 +357,8 @@ public class BlockletDataMapUtil { columnSchema.write(dataOutput); } byte[] byteArray = stream.toByteArray(); - // Compress with snappy to reduce the size of schema - return CompressorFactory.getInstance().getCompressor().compressByte(byteArray); + // Compress to reduce the size of schema + return CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().compressByte(byteArray); } /** @@ -369,7 +369,8 @@ public class BlockletDataMapUtil { */ public static List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException { // uncompress it. - schemaArray = CompressorFactory.getInstance().getCompressor().unCompressByte(schemaArray); + schemaArray = CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().unCompressByte( + schemaArray); ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray); DataInput schemaInput = new DataInputStream(schemaStream); List<ColumnSchema> columnSchemas = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java index 3473aca..4efd5ae 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java @@ -39,6 +39,8 @@ public final class ByteUtil { public static final int SIZEOF_LONG = 8; + public static final int SIZEOF_FLOAT = 4; + public static final int SIZEOF_DOUBLE = 8; public static final String UTF8_CSN = StandardCharsets.UTF_8.name(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index 571a247..4be4f78 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage; import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; @@ -249,17 +250,36 @@ public class CarbonMetadataUtil { } /** - * Right now it is set to default values. We may use this in future + * set the compressor. + * before 1.5.0, we set a enum 'compression_codec'; + * after 1.5.0, we use string 'compressor_name' instead */ - public static ChunkCompressionMeta getSnappyChunkCompressionMeta() { + public static ChunkCompressionMeta getChunkCompressorMeta(String compressorName) { ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta(); - chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY); + // we will not use this field any longer and will use compressor_name instead, + // but in thrift definition, this field is required so we cannot set it to null, otherwise + // it will cause deserialization error in runtime (required field cannot be null). + chunkCompressionMeta.setCompression_codec(CompressionCodec.DEPRECATED); + chunkCompressionMeta.setCompressor_name(compressorName); chunkCompressionMeta.setTotal_compressed_size(0); chunkCompressionMeta.setTotal_uncompressed_size(0); return chunkCompressionMeta; } /** + * get the compressor name from chunk meta + * before 1.5.0, we only support snappy and do not have compressor_name field; + * after 1.5.0, we directly get the compressor from the compressor_name field + */ + public static String getCompressorNameFromChunkMeta(ChunkCompressionMeta chunkCompressionMeta) { + if (chunkCompressionMeta.isSetCompressor_name()) { + return chunkCompressionMeta.getCompressor_name(); + } else { + // this is for legacy store before 1.5.0 + return CompressorFactory.SupportedCompressor.SNAPPY.getName(); + } + } + /** * Below method will be used to get the index header * * @param columnCardinality cardinality of each column http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java index 8360e02..acdfcf3 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java @@ -45,8 +45,10 @@ public class RLECodecTest { TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws IOException, MemoryException { this.inputByteData = inputByteData; inputBytePage = ColumnPage.newPage( - TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, ColumnType.MEASURE), - DataTypes.BYTE, inputByteData.length); + new ColumnPageEncoderMeta( + TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, ColumnType.MEASURE), + DataTypes.BYTE, "snappy"), + inputByteData.length); inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataTypes.BYTE)); for (int i = 0; i < inputByteData.length; i++) { inputBytePage.putData(i, inputByteData[i]); @@ -131,7 +133,7 @@ public class RLECodecTest { RLECodec codec = new RLECodec(); RLEEncoderMeta meta = new RLEEncoderMeta( TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, ColumnType.MEASURE), - DataTypes.BYTE, expectedDecodedBytes.length, null); + DataTypes.BYTE, expectedDecodedBytes.length, null, "snappy"); ColumnPageDecoder decoder = codec.createDecoder(meta); ColumnPage page = decoder.decode(inputBytes, 0, inputBytes.length); byte[] decoded = page.getBytePage(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java b/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java index 3337a7d..93c770b 100644 --- a/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java +++ b/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java @@ -40,12 +40,14 @@ import org.junit.Assert; import org.junit.Test; public class TestPageLevelDictionary { + private String compressorName = CompressorFactory.getInstance().getCompressor( + CarbonCommonConstants.DEFAULT_COMPRESSOR).getName(); @Test public void testPageLevelDictionaryGenerateDataIsGenertingProperDictionaryValues() { LocalDictionaryGenerator generator = new ColumnLocalDictionaryGenerator(1000, 2); String columnName = "column1"; PageLevelDictionary pageLevelDictionary = new PageLevelDictionary(generator, columnName, - DataTypes.STRING, false); + DataTypes.STRING, false, compressorName); try { for (int i = 1; i <= 1000; i++) { Assert.assertTrue((i + 1) == pageLevelDictionary.getDictionaryValue(("" + i).getBytes())); @@ -59,7 +61,8 @@ public class TestPageLevelDictionary { @Test public void testPageLevelDictionaryContainsOnlyUsedDictionaryValues() { LocalDictionaryGenerator generator = new ColumnLocalDictionaryGenerator(1000, 2); String columnName = "column1"; - PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false); + PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary( + generator, columnName, DataTypes.STRING, false, compressorName); byte[][] validateData = new byte[500][]; try { for (int i = 1; i <= 500; i++) { @@ -74,7 +77,8 @@ public class TestPageLevelDictionary { } catch (DictionaryThresholdReachedException e) { Assert.assertTrue(false); } - PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false); + PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary( + generator, columnName, DataTypes.STRING, false, compressorName); try { for (int i = 1; i <= 500; i++) { byte[] data = ("vikas" + i).getBytes(); @@ -94,7 +98,8 @@ public class TestPageLevelDictionary { EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance(); List<ByteBuffer> encoderMetas = localDictionaryChunkForBlocklet.getDictionary_meta().getEncoder_meta(); - ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas); + ColumnPageDecoder decoder = encodingFactory.createDecoder( + encodings, encoderMetas, compressorName); ColumnPage decode = decoder.decode(localDictionaryChunkForBlocklet.getDictionary_data(), 0, localDictionaryChunkForBlocklet.getDictionary_data().length); for (int i = 0; i < 500; i++) { @@ -111,7 +116,8 @@ public class TestPageLevelDictionary { public void testPageLevelDictionaryContainsOnlyUsedDictionaryValuesWhenMultiplePagesUseSameDictionary() { LocalDictionaryGenerator generator = new ColumnLocalDictionaryGenerator(1000, 2); String columnName = "column1"; - PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false); + PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary( + generator, columnName, DataTypes.STRING, false, compressorName); byte[][] validateData = new byte[10][]; int index = 0; try { @@ -128,7 +134,8 @@ public class TestPageLevelDictionary { } catch (DictionaryThresholdReachedException e) { Assert.assertTrue(false); } - PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(generator, columnName, DataTypes.STRING, false); + PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary( + generator, columnName, DataTypes.STRING, false, compressorName); try { for (int i = 1; i <= 5; i++) { byte[] data = ("vikas" + i).getBytes(); @@ -174,10 +181,11 @@ public class TestPageLevelDictionary { EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance(); List<ByteBuffer> encoderMetas = localDictionaryChunkForBlocklet.getDictionary_meta().getEncoder_meta(); - ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas); + ColumnPageDecoder decoder = encodingFactory.createDecoder( + encodings, encoderMetas, compressorName); ColumnPage decode = decoder.decode(localDictionaryChunkForBlocklet.getDictionary_data(), 0, localDictionaryChunkForBlocklet.getDictionary_data().length); - BitSet bitSet = BitSet.valueOf(CompressorFactory.getInstance().getCompressor() + BitSet bitSet = BitSet.valueOf(CompressorFactory.getInstance().getCompressor(compressorName) .unCompressByte(localDictionaryChunkForBlocklet.getDictionary_values())); Assert.assertTrue(bitSet.cardinality()==validateData.length); for(int i =0; i<validateData.length;i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/docs/configuration-parameters.md ---------------------------------------------------------------------- diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index c8c74f2..c6b0fcb 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -7,7 +7,7 @@ the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -92,6 +92,7 @@ This section provides the details of all the configurations required for the Car | carbon.load.directWriteHdfs.enabled | false | During data load all the carbondata files are written to local disk and finally copied to the target location in HDFS.Enabling this parameter will make carrbondata files to be written directly onto target HDFS location bypassing the local disk.**NOTE:** Writing directly to HDFS saves local disk IO(once for writing the files and again for copying to HDFS) there by improving the performance.But the drawback is when data loading fails or the application crashes, unwanted carbondata files will remain in the target HDFS location until it is cleared during next data load or by running *CLEAN FILES* DDL command | | carbon.options.serialization.null.format | \N | Based on the business scenarios, some columns might need to be loaded with null values.As null value cannot be written in csv files, some special characters might be adopted to specify null values.This configuration can be used to specify the null values format in the data being loaded. | | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits.When ***enable.unsafe.sort*** configuration is enabled, instead of using ***carbon.sort.size*** which is based on rows count, size occupied in memory is used to determine when to flush data pages to intermediate temp files.This configuration determines the memory to be used for storing data pages in memory.**NOTE:** Configuring a higher values ensures more data is maintained in memory and hence increases data loading performance due to reduced or no IO.Based on the memory availability in the nodes of the cluster, configure the values accordingly. | +| carbon.column.compressor | snappy | CarbonData will compress the column values using the compressor specified by this configuration. Currently CarbonData supports 'snappy' and 'zstd' compressors. | | ## Compaction Configuration http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/format/src/main/thrift/carbondata.thrift ---------------------------------------------------------------------- diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift index a495b6d..2423ffa 100644 --- a/format/src/main/thrift/carbondata.thrift +++ b/format/src/main/thrift/carbondata.thrift @@ -65,10 +65,12 @@ enum SortState{ } /** - * Compressions supported by CarbonData. + * Compressions for column page supported by CarbonData. */ enum CompressionCodec{ SNAPPY = 0; + //** We will not use this CompressionCodec any longer since 1.5.0, but because it is required in some structure, we cannot get rid of it. So here I add another deprecated enum to alert the people who want to use it **// + DEPRECATED = 1; } /** @@ -82,6 +84,8 @@ struct ChunkCompressionMeta{ 2: required i64 total_uncompressed_size; /** Total byte size of all compressed pages in this column chunk (including the headers) **/ 3: required i64 total_compressed_size; + /** compressor name for chunk, this is introduced in 1.5.0 to make compression for final store more extensible. We will first check compression_codec, if it is not set, we will use this compressor_name **/ + 4: optional string compressor_name; } /** @@ -212,6 +216,7 @@ struct FileHeader{ 4: optional i64 time_stamp; // Timestamp to compare column schema against master schema 5: optional bool is_splitable; // Whether file is splitable or not 6: optional binary sync_marker; // 16 bytes sync marker + 7: optional string compressor_name; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 2d4f370..28817e9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -299,6 +300,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje model.setTableName(CarbonTableOutputFormat.getTableName(conf)); model.setCarbonTransactionalTable(true); CarbonTable carbonTable = getCarbonTable(conf); + String columnCompressor = carbonTable.getTableInfo().getFactTable().getTableProperties().get( + CarbonCommonConstants.COMPRESSOR); + if (null == columnCompressor) { + columnCompressor = CompressorFactory.getInstance().getCompressor().getName(); + } + model.setColumnCompressor(columnCompressor); model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)); model.setTablePath(getTablePath(conf)); setFileHeader(conf, model); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index 935c52d..7cd241a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -43,6 +43,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; @@ -134,6 +135,12 @@ public class StoreCreator { AbsoluteTableIdentifier absoluteTableIdentifier) { CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); CarbonLoadModel loadModel = new CarbonLoadModel(); + String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get( + CarbonCommonConstants.COMPRESSOR); + if (columnCompressor == null) { + columnCompressor = CompressorFactory.getInstance().getCompressor().getName(); + } + loadModel.setColumnCompressor(columnCompressor); loadModel.setCarbonDataLoadSchema(schema); loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index 895d6a5..4b973a1 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -38,6 +38,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier, ReverseDictionary} import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.fileoperations.{AtomicFileOperationFactory, AtomicFileOperations, FileWriteOperation} import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl} @@ -83,6 +84,11 @@ object CarbonDataStoreCreator { writeDictionary(dataFilePath, table, absoluteTableIdentifier) val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table) val loadModel: CarbonLoadModel = new CarbonLoadModel() + import scala.collection.JavaConverters._ + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor().getName()) + loadModel.setColumnCompressor(columnCompressor) loadModel.setCarbonDataLoadSchema(schema) loadModel.setDatabaseName( absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala new file mode 100644 index 0000000..628a0dc --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import java.io.File +import java.text.SimpleDateFormat +import java.util.concurrent.{ExecutorService, Executors, Future} +import java.util.Calendar + +import scala.util.Random + +import org.apache.commons.lang3.{RandomStringUtils, StringUtils} +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} +import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.InvalidConfigurationException +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +case class Rcd(booleanField: Boolean, shortField: Short, intField: Int, bigintField: Long, + doubleField: Double, stringField: String, timestampField: String, decimalField: Double, + dateField: String, charField: String, floatField: Float, stringDictField: String, + stringSortField: String, stringLocalDictField: String, longStringField: String) + +class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val tableName = "load_test_with_compressor" + private var executorService: ExecutorService = _ + private val csvDataDir = s"$integrationPath/spark2/target/csv_load_compression" + + override protected def beforeAll(): Unit = { + executorService = Executors.newFixedThreadPool(3) + CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(csvDataDir)) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + override protected def afterAll(): Unit = { + executorService.shutdown() + CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(csvDataDir)) + try { + sql(s"DROP TABLE IF EXISTS $tableName") + } catch { + case _: Exception => + } + } + + override protected def afterEach(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, + CarbonCommonConstants.DEFAULT_COMPRESSOR) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL) + + try { + sql(s"DROP TABLE IF EXISTS $tableName") + } catch { + case _: Exception => + } + } + + private def createTable(streaming: Boolean = false, columnCompressor: String = ""): Unit = { + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName( + | booleanField boolean, + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float, + | stringDictField string, + | stringSortField string, + | stringLocalDictField string, + | longStringField string + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | ${if (StringUtils.isBlank(columnCompressor)) "" else s"'${CarbonCommonConstants.COMPRESSOR}'='$columnCompressor',"} + | ${if (streaming) "" else s"'LONG_STRING_COLUMNS'='longStringField',"} + | 'SORT_COLUMNS'='stringSortField', + | 'DICTIONARY_INCLUDE'='stringDictField', + | 'local_dictionary_enable'='true', + | 'local_dictionary_threshold'='10000', + | 'local_dictionary_include'='stringLocalDictField' ${if (streaming) s", 'STREAMING'='true'" else ""}) + """.stripMargin) + } + + private def loadData(): Unit = { + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,1,11,101,41.4,'string1','2015/4/23 12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,3,13,163,43.4,'string3','2015/7/26 12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + sql( + s""" + | INSERT INTO TABLE $tableName VALUES + | (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 2},'2015/4/23','aaa',${Float.MaxValue - 2},'dict1','sort1','local_dict1','longstring1'), + | (false,2,12,102,42.4,'string2','2015/5/23 12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'), + | (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 2},'2015/7/26','ccc',${Float.MinValue + 2},'dict3','sort3','local_dict3','longstring3'), + | (NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) + """.stripMargin) + } + + test("test data loading with snappy compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with zstd compressor and offheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test data loading with zstd compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + } + + test("test current zstd compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test current snappy compressor on legacy store with zstd") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) + } + + test("test compaction with different compressor for each load") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + createTable() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + loadData() + + // there are 8 loads + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8))) + assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8) + sql(s"ALTER TABLE $tableName COMPACT 'major'") + sql(s"CLEAN FILES FOR TABLE $tableName") + // after compaction and clean, there should be on segment + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8))) + assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1) + } + + test("test data loading with unsupported compressor and onheap") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake") + createTable() + val exception = intercept[UnsupportedOperationException] { + loadData() + } + assert(exception.getMessage.contains("Invalid compressor type provided")) + } + + test("test compaction with unsupported compressor") { + createTable() + loadData() + loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "fake") + val exception = intercept[UnsupportedOperationException] { + sql(s"ALTER TABLE $tableName COMPACT 'major'") + } + assert(exception.getMessage.contains("Invalid compressor type provided")) + } + + private def generateAllDataTypeDF(lineNum: Int) = { + val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val dateFormat = new SimpleDateFormat("yyyy-MM-dd") + val calendar = Calendar.getInstance() + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to lineNum) + .map { p => + calendar.add(Calendar.HOUR, p) + Rcd(Random.nextBoolean(), (Random.nextInt() % Short.MaxValue).toShort, Random.nextInt(), Random.nextLong(), + Random.nextDouble(), Random.nextString(6), tsFormat.format(calendar.getTime), 0.01 * p, + dateFormat.format(calendar.getTime), s"$p", Random.nextFloat(), s"stringDict$p", + s"stringSort$p", s"stringLocalDict$p", RandomStringUtils.randomAlphabetic(33000)) + } + .toDF() + .cache() + } + + test("test data loading & compaction with more pages and change the compressor during loading") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "2000") + val lineNum = 5000 + val df = generateAllDataTypeDF(lineNum) + + def loadDataAsync(): Future[_] = { + executorService.submit(new Runnable { + override def run(): Unit = { + df.write + .format("carbondata") + .option("tableName", tableName) + .mode(SaveMode.Append) + .save() + } + }) + } + + createTable() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + var future = loadDataAsync() + // change the compressor randomly during the loading + while (!future.isDone) { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd") + } + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + future = loadDataAsync() + while (!future.isDone) { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd") + } + + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2))) + checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"))) + + def compactAsync(): Future[_] = { + executorService.submit(new Runnable { + override def run(): Unit = { + sql(s"ALTER TABLE $tableName COMPACT 'MAJOR'") + } + }) + } + + // change the compressor randomly during compaction + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + future = compactAsync() + while (!future.isDone) { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if (Random.nextBoolean()) "snappy" else "zstd") + } + + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2))) + checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"))) + } + + test("test creating table with specified compressor") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + // the system configuration for compressor is snappy + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + // create table with zstd as compressor + createTable(columnCompressor = "zstd") + loadData() + checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8))) + val carbonTable = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession) + val tableColumnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR) + assert("zstd".equalsIgnoreCase(tableColumnCompressor)) + } + + test("test creating table with unsupported compressor") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + // the system configuration for compressor is snappy + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + // create table with unsupported compressor + val exception = intercept[InvalidConfigurationException] { + createTable (columnCompressor = "fakecompressor") + } + assert(exception.getMessage.contains("fakecompressor compressor is not supported")) + } + + private def generateAllDataTypeFiles(lineNum: Int, csvDir: String, + saveMode: SaveMode = SaveMode.Overwrite): Unit = { + val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val dateFormat = new SimpleDateFormat("yyyy-MM-dd") + val calendar = Calendar.getInstance() + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to lineNum) + .map { p => + calendar.add(Calendar.HOUR, p) + Rcd(Random.nextBoolean(), (Random.nextInt() % Short.MaxValue / 2).toShort, Random.nextInt(), Random.nextLong(), + Random.nextDouble(), RandomStringUtils.randomAlphabetic(6), tsFormat.format(calendar.getTime), 0.01 * p, + dateFormat.format(calendar.getTime), s"$p", Random.nextFloat(), s"stringDict$p", + s"stringSort$p", s"stringLocalDict$p", RandomStringUtils.randomAlphabetic(3)) + } + .toDF() + .write + .option("header", "false") + .mode(saveMode) + .csv(csvDir) + } + + test("test streaming ingestion with different compressor for each mini-batch") { + createTable(streaming = true) + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), tableName)(sqlContext.sparkSession) + val lineNum = 10 + val dataLocation = new File(csvDataDir).getCanonicalPath + + def doStreamingIngestionThread(): Thread = { + new Thread() { + override def run(): Unit = { + var streamingQuery: StreamingQuery = null + try { + val streamingQuery = sqlContext.sparkSession.readStream + .text(dataLocation) + .writeStream + .format("carbondata") + .trigger(ProcessingTime(s"1 seconds")) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) + .option("dbName", "default") + .option("tableName", tableName) + .option(CarbonStreamParser.CARBON_STREAM_PARSER, CarbonStreamParser.CARBON_STREAM_PARSER_CSV) + .start() + streamingQuery.awaitTermination() + } catch { + case ex: Exception => LOGGER.error(ex) + } finally { + streamingQuery.stop() + } + } + } + } + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + generateAllDataTypeFiles(lineNum, dataLocation) + val thread = doStreamingIngestionThread() + thread.start() + Thread.sleep(10 * 1000) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append) + Thread.sleep(10 * 1000) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "zstd") + generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append) + Thread.sleep(10 * 1000) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") + generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append) + Thread.sleep(40 * 1000) + thread.interrupt() + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 4))) + checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), + Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"))) + + sql(s"alter table $tableName compact 'streaming'") + + checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 4))) + checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName WHERE stringDictField='stringDict1'"), + Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", "stringSort1"))) + try { + sql(s"DROP TABLE IF EXISTS $tableName") + } catch { + case _: Exception => + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index a03a5eb..643471c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -51,7 +51,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory import org.apache.carbondata.core.metadata.ColumnarFormatVersion import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverterV3} +import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, CarbonUtil, DataFileFooterConverterV3} import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.sdk.file._ @@ -2604,16 +2604,18 @@ object testUtil{ data: Array[String]): Boolean = { val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary if (null != local_dictionary) { + val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta) val encodings = local_dictionary.getDictionary_meta.encoders val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta val encodingFactory = DefaultEncodingFactory.getInstance - val decoder = encodingFactory.createDecoder(encodings, encoderMetas) + val decoder = encodingFactory.createDecoder(encodings, encoderMetas, compressorName) val dictionaryPage = decoder .decode(local_dictionary.getDictionary_data, 0, local_dictionary.getDictionary_data.length) val dictionaryMap = new util.HashMap[DictionaryByteArrayWrapper, Integer] val usedDictionaryValues = util.BitSet - .valueOf(CompressorFactory.getInstance.getCompressor + .valueOf(CompressorFactory.getInstance.getCompressor(compressorName) .unCompressByte(local_dictionary.getDictionary_values)) var index = 0 var i = usedDictionaryValues.nextSetBit(0) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala index 59586c0..e88d8a9 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala @@ -35,7 +35,7 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory import org.apache.carbondata.core.metadata.ColumnarFormatVersion -import org.apache.carbondata.core.util.{CarbonProperties, DataFileFooterConverterV3} +import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, DataFileFooterConverterV3} class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterAll { @@ -277,16 +277,18 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA data: Array[String]): Boolean = { val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary if (null != local_dictionary) { + val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta( + rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta) val encodings = local_dictionary.getDictionary_meta.encoders val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta val encodingFactory = DefaultEncodingFactory.getInstance - val decoder = encodingFactory.createDecoder(encodings, encoderMetas) + val decoder = encodingFactory.createDecoder(encodings, encoderMetas, compressorName) val dictionaryPage = decoder .decode(local_dictionary.getDictionary_data, 0, local_dictionary.getDictionary_data.length) val dictionaryMap = new util.HashMap[DictionaryByteArrayWrapper, Integer] val usedDictionaryValues = util.BitSet - .valueOf(CompressorFactory.getInstance.getCompressor + .valueOf(CompressorFactory.getInstance.getCompressor(compressorName) .unCompressByte(local_dictionary.getDictionary_values)) var index = 0 var i = usedDictionaryValues.nextSetBit(0) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index cc8a28e..b382693 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sin import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider @@ -271,6 +272,10 @@ object StreamSinkFactory { getConf.get("spark.driver.host") carbonLoadModel.setDictionaryServerHost(sparkDriverHost) carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt) + val columnCompressor = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) carbonLoadModel } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 57887a7..6350b50 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -51,6 +51,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer @@ -288,6 +289,10 @@ object CarbonDataRDDFactory { loadModel.readAndSetLoadMetadataDetails() val loadStartTime = CarbonUpdateUtil.readCurrentTime() loadModel.setFactTimeStamp(loadStartTime) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + loadModel.setColumnCompressor(columnCompressor) loadModel } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java index 747b064..6d69eb5 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java @@ -19,7 +19,6 @@ package org.apache.carbondata.stream; import java.io.IOException; import java.math.BigDecimal; -import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.HashMap; @@ -33,6 +32,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; @@ -64,7 +64,6 @@ import org.apache.carbondata.hadoop.InputMetricsStats; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.streaming.CarbonStreamInputFormat; -import org.apache.carbondata.streaming.CarbonStreamUtils; import org.apache.carbondata.streaming.StreamBlockletReader; import org.apache.hadoop.conf.Configuration; @@ -110,6 +109,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { private CacheProvider cacheProvider; private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; private GenericQueryType[] queryTypes; + private String compressorName; // vectorized reader private StructType outputSchema; @@ -262,6 +262,12 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { private byte[] getSyncMarker(String filePath) throws IOException { CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); FileHeader header = headerReader.readHeader(); + // legacy store does not have this member + if (header.isSetCompressor_name()) { + compressorName = header.getCompressor_name(); + } else { + compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName(); + } return header.getSync_marker(); } @@ -285,7 +291,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { FSDataInputStream fileIn = fs.open(file, bufferSize); fileIn.seek(fileSplit.getStart()); input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), - fileSplit.getStart() == 0); + fileSplit.getStart() == 0, compressorName); cacheProvider = CacheProvider.getInstance(); cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index e0b0547..a13dfdc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -34,6 +34,7 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -159,6 +160,10 @@ case class CarbonAlterTableCompactionCommand( carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable) carbonLoadModel.setDatabaseName(table.getDatabaseName) carbonLoadModel.setTablePath(table.getTablePath) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) var storeLocation = System.getProperty("java.io.tmpdir") storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 63da404..f7a5f42 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -51,6 +51,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider @@ -206,6 +207,10 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) val javaPartition = mutable.Map[String, String]() partition.foreach { case (k, v) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index 807c925..6c8b0b0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -123,6 +124,10 @@ case class CarbonAlterTableAddHivePartitionCommand( "Schema of index files located in location is not matching with current table schema") } val loadModel = new CarbonLoadModel + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + loadModel.setColumnCompressor(columnCompressor) loadModel.setCarbonTransactionalTable(true) loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table)) // Create new entry in tablestatus file http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index cd26fe8..b76a485 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl @@ -145,6 +146,10 @@ case class CarbonAlterTableDropPartitionCommand( carbonLoadModel.setTablePath(table.getTablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime carbonLoadModel.setFactTimeStamp(loadStartTime) + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) alterTableDropPartition( sparkSession.sqlContext, model.partitionId, http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index f4b6de0..753abaf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -33,8 +33,8 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType @@ -142,8 +142,12 @@ case class CarbonAlterTableSplitPartitionCommand( LockUsage.ALTER_PARTITION_LOCK) locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - val carbonLoadModel = new CarbonLoadModel() val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + val carbonLoadModel = new CarbonLoadModel() + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) val tablePath = table.getTablePath val dataLoadSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 1beda11..42ea0bd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.table import scala.collection.JavaConverters._ +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY @@ -26,6 +27,7 @@ import org.apache.spark.sql.execution.command.MetadataCommand import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -99,6 +101,18 @@ case class CarbonCreateTableCommand( throwMetadataException(dbName, tableName, "Table should have at least one column.") } + // Add validatation for column compressor when create table + val columnCompressor = tableInfo.getFactTable.getTableProperties.get( + CarbonCommonConstants.COMPRESSOR) + try { + if (null != columnCompressor) { + CompressorFactory.getInstance().getCompressor(columnCompressor) + } + } catch { + case ex : UnsupportedOperationException => + throw new InvalidConfigurationException(ex.getMessage) + } + val operationContext = new OperationContext val createTablePreExecutionEvent: CreateTablePreExecutionEvent = CreateTablePreExecutionEvent(sparkSession, tableIdentifier, Some(tableInfo)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 6716707..b605a1d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types._ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore import org.apache.carbondata.core.metadata.SegmentFileStore @@ -87,6 +88,11 @@ with Serializable { val table = CarbonEnv.getCarbonTable( TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) val model = new CarbonLoadModel + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + model.setColumnCompressor(columnCompressor) + val carbonProperty = CarbonProperties.getInstance() val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) val tableProperties = table.getTableInfo.getFactTable.getTableProperties http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index 3298009..08c149b 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -70,6 +71,11 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) } + import scala.collection.JavaConverters._ + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index d98229a..060afca 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -185,6 +186,11 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) } + import scala.collection.JavaConverters._ + val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.COMPRESSOR, + CompressorFactory.getInstance().getCompressor.getName) + carbonLoadModel.setColumnCompressor(columnCompressor) carbonLoadModel } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala index 7ef86a5..a49d5bb 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala @@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA assertResult(2)(result.length) assertResult("table_info1")(result(0).getString(0)) - // 2096 is the size of carbon table - assertResult(2147)(result(0).getLong(1)) + // 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata + assertResult(2187)(result(0).getLong(1)) assertResult("table_info2")(result(1).getString(0)) - assertResult(2147)(result(1).getLong(1)) + assertResult(2187)(result(1).getLong(1)) } override def afterAll: Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 46ad32f..4d85296 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -121,6 +121,11 @@ public class CarbonDataLoadConfiguration { private String parentTablePath; + /** + * name of compressor to be used to compress column page + */ + private String columnCompressor; + public CarbonDataLoadConfiguration() { } @@ -408,4 +413,11 @@ public class CarbonDataLoadConfiguration { return complexNonDictionaryColumnCount; } + public String getColumnCompressor() { + return columnCompressor; + } + + public void setColumnCompressor(String columnCompressor) { + this.columnCompressor = columnCompressor; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 225da26..f89bc2f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -314,6 +314,7 @@ public final class DataLoadProcessBuilder { if (loadModel.getSdkWriterCores() > 0) { configuration.setWritingCoresCount(loadModel.getSdkWriterCores()); } + configuration.setColumnCompressor(loadModel.getColumnCompressor()); return configuration; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 97e329d..e15fb5d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -229,6 +229,11 @@ public class CarbonLoadModel implements Serializable { private List<String> mergedSegmentIds; + /** + * compressor used to compress column page + */ + private String columnCompressor; + public boolean isAggLoadRequest() { return isAggLoadRequest; } @@ -473,6 +478,7 @@ public class CarbonLoadModel implements Serializable { copy.loadMinSize = loadMinSize; copy.parentTablePath = parentTablePath; copy.sdkWriterCores = sdkWriterCores; + copy.columnCompressor = columnCompressor; return copy; } @@ -529,6 +535,7 @@ public class CarbonLoadModel implements Serializable { copyObj.loadMinSize = loadMinSize; copyObj.parentTablePath = parentTablePath; copyObj.sdkWriterCores = sdkWriterCores; + copyObj.columnCompressor = columnCompressor; return copyObj; } @@ -921,4 +928,12 @@ public class CarbonLoadModel implements Serializable { public void setSdkWriterCores(short sdkWriterCores) { this.sdkWriterCores = sdkWriterCores; } + + public String getColumnCompressor() { + return columnCompressor; + } + + public void setColumnCompressor(String columnCompressor) { + this.columnCompressor = columnCompressor; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 2ebcb29..bcc904c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -29,7 +29,10 @@ import org.apache.carbondata.common.Strings; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonProperties; @@ -48,7 +51,8 @@ import org.apache.hadoop.conf.Configuration; */ @InterfaceAudience.Internal public class CarbonLoadModelBuilder { - + private static final LogService LOGGER = LogServiceFactory.getLogService( + CarbonLoadModelBuilder.class.getName()); private CarbonTable table; public CarbonLoadModelBuilder(CarbonTable table) { @@ -104,6 +108,7 @@ public class CarbonLoadModelBuilder { } catch (NumberFormatException e) { throw new InvalidLoadOptionException(e.getMessage()); } + validateAndSetColumnCompressor(model); return model; } @@ -280,6 +285,8 @@ public class CarbonLoadModelBuilder { carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds")); carbonLoadModel.setLoadMinSize( optionsFinal.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)); + + validateAndSetColumnCompressor(carbonLoadModel); } private int validateMaxColumns(String[] csvHeaders, String maxColumns) @@ -369,6 +376,23 @@ public class CarbonLoadModelBuilder { } } + private void validateAndSetColumnCompressor(CarbonLoadModel carbonLoadModel) + throws InvalidLoadOptionException { + try { + String columnCompressor = carbonLoadModel.getColumnCompressor(); + if (StringUtils.isBlank(columnCompressor)) { + columnCompressor = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); + } + // check and load compressor + CompressorFactory.getInstance().getCompressor(columnCompressor); + carbonLoadModel.setColumnCompressor(columnCompressor); + } catch (Exception e) { + LOGGER.error(e); + throw new InvalidLoadOptionException("Failed to load the compressor"); + } + } + /** * check whether using default value or not */
