http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
 
b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
index 419fd9e..d6c2352 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import 
org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import 
org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
 import 
org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
@@ -57,14 +58,17 @@ public class PageLevelDictionary {
   private DataType dataType;
 
   private boolean isComplexTypePrimitive;
+  // compressor to be used for the dictionary. The compressor is the same as 
column compressor.
+  private String columnCompressor;
 
   public PageLevelDictionary(LocalDictionaryGenerator 
localDictionaryGenerator, String columnName,
-      DataType dataType, boolean isComplexTypePrimitive) {
+      DataType dataType, boolean isComplexTypePrimitive, String 
columnCompressor) {
     this.localDictionaryGenerator = localDictionaryGenerator;
     this.usedDictionaryValues = new BitSet();
     this.columnName = columnName;
     this.dataType = dataType;
     this.isComplexTypePrimitive = isComplexTypePrimitive;
+    this.columnCompressor = columnCompressor;
   }
 
   /**
@@ -111,8 +115,9 @@ public class PageLevelDictionary {
     }
     TableSpec.ColumnSpec spec =
         TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, 
columnType);
-    ColumnPage dictionaryColumnPage =
-        ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, 
usedDictionaryValues.cardinality());
+    ColumnPage dictionaryColumnPage = ColumnPage.newPage(
+        new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, 
columnCompressor),
+        usedDictionaryValues.cardinality());
     // TODO support data type specific stats collector for numeric data types
     dictionaryColumnPage.setStatsCollector(new DummyStatsCollector());
     int rowId = 0;
@@ -139,8 +144,9 @@ public class PageLevelDictionary {
     // get encoded dictionary values
     LocalDictionaryChunk localDictionaryChunk = 
encoder.encodeDictionary(dictionaryColumnPage);
     // set compressed dictionary values
-    
localDictionaryChunk.setDictionary_values(CompressorFactory.getInstance().getCompressor()
-        .compressByte(usedDictionaryValues.toByteArray()));
+    localDictionaryChunk.setDictionary_values(
+        
CompressorFactory.getInstance().getCompressor(columnCompressor).compressByte(
+            usedDictionaryValues.toByteArray()));
     // free the dictionary page memory
     dictionaryColumnPage.freeMemory();
     return localDictionaryChunk;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 4dc1fbc..5a19073 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -79,6 +79,7 @@ public class DataType implements Serializable {
 
   public static char convertType(DataType dataType) {
     if (dataType == DataTypes.BYTE ||
+        dataType == DataTypes.BOOLEAN ||
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.SHORT_INT ||
         dataType == DataTypes.INT ||

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 2285284..7cc2b09 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -717,16 +716,13 @@ public class QueryUtil {
    * Below method will be used to convert the thrift presence meta to wrapper
    * presence meta
    *
-   * @param presentMetadataThrift
    * @return wrapper presence meta
    */
   public static BitSet getNullBitSet(
-      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift, 
Compressor compressor) {
     final byte[] present_bit_stream = 
presentMetadataThrift.getPresent_bit_stream();
     if (null != present_bit_stream) {
-      return BitSet
-          .valueOf(compressor.unCompressByte(present_bit_stream));
+      return BitSet.valueOf(compressor.unCompressByte(present_bit_stream));
     } else {
       return new BitSet(1);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index f14610c..1832cf5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -357,8 +357,8 @@ public class BlockletDataMapUtil {
       columnSchema.write(dataOutput);
     }
     byte[] byteArray = stream.toByteArray();
-    // Compress with snappy to reduce the size of schema
-    return 
CompressorFactory.getInstance().getCompressor().compressByte(byteArray);
+    // Compress to reduce the size of schema
+    return 
CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().compressByte(byteArray);
   }
 
   /**
@@ -369,7 +369,8 @@ public class BlockletDataMapUtil {
    */
   public static List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws 
IOException {
     // uncompress it.
-    schemaArray = 
CompressorFactory.getInstance().getCompressor().unCompressByte(schemaArray);
+    schemaArray = 
CompressorFactory.SupportedCompressor.SNAPPY.getCompressor().unCompressByte(
+        schemaArray);
     ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
     DataInput schemaInput = new DataInputStream(schemaStream);
     List<ColumnSchema> columnSchemas = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 3473aca..4efd5ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -39,6 +39,8 @@ public final class ByteUtil {
 
   public static final int SIZEOF_LONG = 8;
 
+  public static final int SIZEOF_FLOAT = 4;
+
   public static final int SIZEOF_DOUBLE = 8;
 
   public static final String UTF8_CSN = StandardCharsets.UTF_8.name();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 571a247..4be4f78 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage;
 import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import 
org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -249,17 +250,36 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Right now it is set to default values. We may use this in future
+   * set the compressor.
+   * before 1.5.0, we set a enum 'compression_codec';
+   * after 1.5.0, we use string 'compressor_name' instead
    */
-  public static ChunkCompressionMeta getSnappyChunkCompressionMeta() {
+  public static ChunkCompressionMeta getChunkCompressorMeta(String 
compressorName) {
     ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();
-    chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY);
+    // we will not use this field any longer and will use compressor_name 
instead,
+    // but in thrift definition, this field is required so we cannot set it to 
null, otherwise
+    // it will cause deserialization error in runtime (required field cannot 
be null).
+    chunkCompressionMeta.setCompression_codec(CompressionCodec.DEPRECATED);
+    chunkCompressionMeta.setCompressor_name(compressorName);
     chunkCompressionMeta.setTotal_compressed_size(0);
     chunkCompressionMeta.setTotal_uncompressed_size(0);
     return chunkCompressionMeta;
   }
 
   /**
+   * get the compressor name from chunk meta
+   * before 1.5.0, we only support snappy and do not have compressor_name 
field;
+   * after 1.5.0, we directly get the compressor from the compressor_name field
+   */
+  public static String getCompressorNameFromChunkMeta(ChunkCompressionMeta 
chunkCompressionMeta) {
+    if (chunkCompressionMeta.isSetCompressor_name()) {
+      return chunkCompressionMeta.getCompressor_name();
+    } else {
+      // this is for legacy store before 1.5.0
+      return CompressorFactory.SupportedCompressor.SNAPPY.getName();
+    }
+  }
+  /**
    * Below method will be used to get the index header
    *
    * @param columnCardinality cardinality of each column

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
 
b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
index 8360e02..acdfcf3 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecTest.java
@@ -45,8 +45,10 @@ public class RLECodecTest {
     TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws 
IOException, MemoryException {
       this.inputByteData = inputByteData;
       inputBytePage = ColumnPage.newPage(
-          TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, 
ColumnType.MEASURE),
-          DataTypes.BYTE, inputByteData.length);
+          new ColumnPageEncoderMeta(
+              TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, 
ColumnType.MEASURE),
+              DataTypes.BYTE, "snappy"),
+          inputByteData.length);
       
inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataTypes.BYTE));
       for (int i = 0; i < inputByteData.length; i++) {
         inputBytePage.putData(i, inputByteData[i]);
@@ -131,7 +133,7 @@ public class RLECodecTest {
     RLECodec codec = new RLECodec();
     RLEEncoderMeta meta = new RLEEncoderMeta(
         TableSpec.ColumnSpec.newInstance("test", DataTypes.BYTE, 
ColumnType.MEASURE),
-        DataTypes.BYTE, expectedDecodedBytes.length, null);
+        DataTypes.BYTE, expectedDecodedBytes.length, null, "snappy");
     ColumnPageDecoder decoder = codec.createDecoder(meta);
     ColumnPage page = decoder.decode(inputBytes, 0, inputBytes.length);
     byte[] decoded = page.getBytePage();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
 
b/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
index 3337a7d..93c770b 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/localdictionary/TestPageLevelDictionary.java
@@ -40,12 +40,14 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class TestPageLevelDictionary {
+  private String compressorName = 
CompressorFactory.getInstance().getCompressor(
+      CarbonCommonConstants.DEFAULT_COMPRESSOR).getName();
 
   @Test public void 
testPageLevelDictionaryGenerateDataIsGenertingProperDictionaryValues() {
     LocalDictionaryGenerator generator = new 
ColumnLocalDictionaryGenerator(1000, 2);
     String columnName = "column1";
     PageLevelDictionary pageLevelDictionary = new 
PageLevelDictionary(generator, columnName,
-        DataTypes.STRING, false);
+        DataTypes.STRING, false, compressorName);
     try {
       for (int i = 1; i <= 1000; i++) {
         Assert.assertTrue((i + 1) == 
pageLevelDictionary.getDictionaryValue(("" + i).getBytes()));
@@ -59,7 +61,8 @@ public class TestPageLevelDictionary {
   @Test public void testPageLevelDictionaryContainsOnlyUsedDictionaryValues() {
     LocalDictionaryGenerator generator = new 
ColumnLocalDictionaryGenerator(1000, 2);
     String columnName = "column1";
-    PageLevelDictionary pageLevelDictionary1 = new 
PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     byte[][] validateData = new byte[500][];
     try {
       for (int i = 1; i <= 500; i++) {
@@ -74,7 +77,8 @@ public class TestPageLevelDictionary {
     } catch (DictionaryThresholdReachedException e) {
       Assert.assertTrue(false);
     }
-    PageLevelDictionary pageLevelDictionary2 = new 
PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     try {
       for (int i = 1; i <= 500; i++) {
         byte[] data = ("vikas" + i).getBytes();
@@ -94,7 +98,8 @@ public class TestPageLevelDictionary {
       EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
       List<ByteBuffer> encoderMetas =
           
localDictionaryChunkForBlocklet.getDictionary_meta().getEncoder_meta();
-      ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, 
encoderMetas);
+      ColumnPageDecoder decoder = encodingFactory.createDecoder(
+          encodings, encoderMetas, compressorName);
       ColumnPage decode = 
decoder.decode(localDictionaryChunkForBlocklet.getDictionary_data(), 0,
           localDictionaryChunkForBlocklet.getDictionary_data().length);
       for (int i = 0; i < 500; i++) {
@@ -111,7 +116,8 @@ public class TestPageLevelDictionary {
   public void 
testPageLevelDictionaryContainsOnlyUsedDictionaryValuesWhenMultiplePagesUseSameDictionary()
 {
     LocalDictionaryGenerator generator = new 
ColumnLocalDictionaryGenerator(1000, 2);
     String columnName = "column1";
-    PageLevelDictionary pageLevelDictionary1 = new 
PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary1 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     byte[][] validateData = new byte[10][];
     int index = 0;
     try {
@@ -128,7 +134,8 @@ public class TestPageLevelDictionary {
     } catch (DictionaryThresholdReachedException e) {
       Assert.assertTrue(false);
     }
-    PageLevelDictionary pageLevelDictionary2 = new 
PageLevelDictionary(generator, columnName, DataTypes.STRING, false);
+    PageLevelDictionary pageLevelDictionary2 = new PageLevelDictionary(
+        generator, columnName, DataTypes.STRING, false, compressorName);
     try {
       for (int i = 1; i <= 5; i++) {
         byte[] data = ("vikas" + i).getBytes();
@@ -174,10 +181,11 @@ public class TestPageLevelDictionary {
       EncodingFactory encodingFactory = DefaultEncodingFactory.getInstance();
       List<ByteBuffer> encoderMetas =
           
localDictionaryChunkForBlocklet.getDictionary_meta().getEncoder_meta();
-      ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, 
encoderMetas);
+      ColumnPageDecoder decoder = encodingFactory.createDecoder(
+          encodings, encoderMetas, compressorName);
       ColumnPage decode = 
decoder.decode(localDictionaryChunkForBlocklet.getDictionary_data(), 0,
           localDictionaryChunkForBlocklet.getDictionary_data().length);
-      BitSet bitSet = 
BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+      BitSet bitSet = 
BitSet.valueOf(CompressorFactory.getInstance().getCompressor(compressorName)
           
.unCompressByte(localDictionaryChunkForBlocklet.getDictionary_values()));
       Assert.assertTrue(bitSet.cardinality()==validateData.length);
       for(int i =0; i<validateData.length;i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index c8c74f2..c6b0fcb 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -7,7 +7,7 @@
     the License.  You may obtain a copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
-    
+
     Unless required by applicable law or agreed to in writing, software 
     distributed under the License is distributed on an "AS IS" BASIS, 
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -92,6 +92,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.load.directWriteHdfs.enabled | false | During data load all the 
carbondata files are written to local disk and finally copied to the target 
location in HDFS.Enabling this parameter will make carrbondata files to be 
written directly onto target HDFS location bypassing the local disk.**NOTE:** 
Writing directly to HDFS saves local disk IO(once for writing the files and 
again for copying to HDFS) there by improving the performance.But the drawback 
is when data loading fails or the application crashes, unwanted carbondata 
files will remain in the target HDFS location until it is cleared during next 
data load or by running *CLEAN FILES* DDL command |
 | carbon.options.serialization.null.format | \N | Based on the business 
scenarios, some columns might need to be loaded with null values.As null value 
cannot be written in csv files, some special characters might be adopted to 
specify null values.This configuration can be used to specify the null values 
format in the data being loaded. |
 | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every 
***carbon.sort.size*** number of records to intermediate temp files during data 
loading to ensure memory footprint is within limits.When 
***enable.unsafe.sort*** configuration is enabled, instead of using 
***carbon.sort.size*** which is based on rows count, size occupied in memory is 
used to determine when to flush data pages to intermediate temp files.This 
configuration determines the memory to be used for storing data pages in 
memory.**NOTE:** Configuring a higher values ensures more data is maintained in 
memory and hence increases data loading performance due to reduced or no 
IO.Based on the memory availability in the nodes of the cluster, configure the 
values accordingly. |
+| carbon.column.compressor | snappy | CarbonData will compress the column 
values using the compressor specified by this configuration. Currently 
CarbonData supports 'snappy' and 'zstd' compressors. | |
 
 ## Compaction Configuration
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift 
b/format/src/main/thrift/carbondata.thrift
index a495b6d..2423ffa 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -65,10 +65,12 @@ enum SortState{
 }
 
 /**
- * Compressions supported by CarbonData.
+ * Compressions for column page supported by CarbonData.
  */
 enum CompressionCodec{
     SNAPPY = 0;
+    //** We will not use this CompressionCodec any longer since 1.5.0, but 
because it is required in some structure, we cannot get rid of it. So here I 
add another deprecated enum to alert the people who want to use it **//
+    DEPRECATED = 1;
 }
 
 /**
@@ -82,6 +84,8 @@ struct ChunkCompressionMeta{
     2: required i64 total_uncompressed_size;
     /** Total byte size of all compressed pages in this column chunk 
(including the headers) **/
     3: required i64 total_compressed_size;
+    /** compressor name for chunk, this is introduced in 1.5.0 to make 
compression for final store more extensible. We will first check 
compression_codec, if it is not set, we will use this compressor_name **/
+    4: optional string compressor_name;
 }
 
 /**
@@ -212,6 +216,7 @@ struct FileHeader{
        4: optional i64 time_stamp; // Timestamp to compare column schema 
against master schema
        5: optional bool is_splitable; // Whether file is splitable or not
        6: optional binary sync_marker; // 16 bytes sync marker
+  7: optional string compressor_name;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 2d4f370..28817e9 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -299,6 +300,12 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
     model.setTableName(CarbonTableOutputFormat.getTableName(conf));
     model.setCarbonTransactionalTable(true);
     CarbonTable carbonTable = getCarbonTable(conf);
+    String columnCompressor = 
carbonTable.getTableInfo().getFactTable().getTableProperties().get(
+        CarbonCommonConstants.COMPRESSOR);
+    if (null == columnCompressor) {
+      columnCompressor = 
CompressorFactory.getInstance().getCompressor().getName();
+    }
+    model.setColumnCompressor(columnCompressor);
     model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
     model.setTablePath(getTablePath(conf));
     setFileHeader(conf, model);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 935c52d..7cd241a 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -134,6 +135,12 @@ public class StoreCreator {
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
     CarbonLoadModel loadModel = new CarbonLoadModel();
+    String columnCompressor = 
table.getTableInfo().getFactTable().getTableProperties().get(
+        CarbonCommonConstants.COMPRESSOR);
+    if (columnCompressor == null) {
+      columnCompressor = 
CompressorFactory.getInstance().getCompressor().getName();
+    }
+    loadModel.setColumnCompressor(columnCompressor);
     loadModel.setCarbonDataLoadSchema(schema);
     
loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
     
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 895d6a5..4b973a1 100644
--- 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, 
DictionaryColumnUniqueIdentifier, ReverseDictionary}
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.{AtomicFileOperationFactory, 
AtomicFileOperations, FileWriteOperation}
 import org.apache.carbondata.core.metadata.converter.{SchemaConverter, 
ThriftWrapperSchemaConverterImpl}
@@ -83,6 +84,11 @@ object CarbonDataStoreCreator {
       writeDictionary(dataFilePath, table, absoluteTableIdentifier)
       val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
       val loadModel: CarbonLoadModel = new CarbonLoadModel()
+      import scala.collection.JavaConverters._
+      val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor().getName())
+      loadModel.setColumnCompressor(columnCompressor)
       loadModel.setCarbonDataLoadSchema(schema)
       loadModel.setDatabaseName(
         absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
new file mode 100644
index 0000000..628a0dc
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.concurrent.{ExecutorService, Executors, Future}
+import java.util.Calendar
+
+import scala.util.Random
+
+import org.apache.commons.lang3.{RandomStringUtils, StringUtils}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class Rcd(booleanField: Boolean, shortField: Short, intField: Int, 
bigintField: Long,
+    doubleField: Double, stringField: String, timestampField: String, 
decimalField: Double,
+    dateField: String, charField: String, floatField: Float, stringDictField: 
String,
+    stringSortField: String, stringLocalDictField: String, longStringField: 
String)
+
+class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach 
with BeforeAndAfterAll {
+  private val tableName = "load_test_with_compressor"
+  private var executorService: ExecutorService = _
+  private val csvDataDir = 
s"$integrationPath/spark2/target/csv_load_compression"
+
+  override protected def beforeAll(): Unit = {
+    executorService = Executors.newFixedThreadPool(3)
+    
CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(csvDataDir))
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  override protected def afterAll(): Unit = {
+    executorService.shutdown()
+    
CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(csvDataDir))
+    try {
+      sql(s"DROP TABLE IF EXISTS $tableName")
+    } catch {
+      case _: Exception =>
+    }
+  }
+
+  override protected def afterEach(): Unit = {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,
+      CarbonCommonConstants.DEFAULT_COMPRESSOR)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+      CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
+
+    try {
+      sql(s"DROP TABLE IF EXISTS $tableName")
+    } catch {
+      case _: Exception =>
+    }
+  }
+
+  private def createTable(streaming: Boolean = false, columnCompressor: String 
= ""): Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName(
+         |    booleanField boolean,
+         |    shortField smallint,
+         |    intField int,
+         |    bigintField bigint,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField string,
+         |    floatField float,
+         |    stringDictField string,
+         |    stringSortField string,
+         |    stringLocalDictField string,
+         |    longStringField string
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         |  ${if (StringUtils.isBlank(columnCompressor)) "" else 
s"'${CarbonCommonConstants.COMPRESSOR}'='$columnCompressor',"}
+         |  ${if (streaming) "" else 
s"'LONG_STRING_COLUMNS'='longStringField',"}
+         |  'SORT_COLUMNS'='stringSortField',
+         |  'DICTIONARY_INCLUDE'='stringDictField',
+         |  'local_dictionary_enable'='true',
+         |  'local_dictionary_threshold'='10000',
+         |  'local_dictionary_include'='stringLocalDictField' ${if (streaming) 
s", 'STREAMING'='true'" else ""})
+       """.stripMargin)
+  }
+
+  private def loadData(): Unit = {
+    sql(
+      s"""
+         | INSERT INTO TABLE $tableName VALUES
+         |  (true,1,11,101,41.4,'string1','2015/4/23 
12:01:01',12.34,'2015/4/23','aaa',1.5,'dict1','sort1','local_dict1','longstring1'),
+         | (false,2,12,102,42.4,'string2','2015/5/23 
12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'),
+         |  (true,3,13,163,43.4,'string3','2015/7/26 
12:01:06',34.56,'2015/7/26','ccc',3.5,'dict3','sort3','local_dict3','longstring3'),
+         | 
(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
+       """.stripMargin)
+    sql(
+      s"""
+         | INSERT INTO TABLE $tableName VALUES
+         |  (true,${Short.MaxValue - 2},${Int.MinValue + 2},${Long.MaxValue - 
2},${Double.MinValue + 2},'string1','2015/4/23 12:01:01',${Double.MinValue + 
2},'2015/4/23','aaa',${Float.MaxValue - 
2},'dict1','sort1','local_dict1','longstring1'),
+         | (false,2,12,102,42.4,'string2','2015/5/23 
12:01:03',23.45,'2015/5/23','bbb',2.5,'dict2','sort2','local_dict2','longstring2'),
+         |  (true,${Short.MinValue + 2},${Int.MaxValue - 2},${Long.MinValue + 
2},${Double.MaxValue - 2},'string3','2015/7/26 12:01:06',${Double.MinValue + 
2},'2015/7/26','ccc',${Float.MinValue + 
2},'dict3','sort3','local_dict3','longstring3'),
+         | 
(NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
+       """.stripMargin)
+  }
+
+  test("test data loading with snappy compressor and offheap") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+  }
+
+  test("test data loading with zstd compressor and offheap") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+  }
+
+  test("test data loading with zstd compressor and onheap") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    createTable()
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+  }
+
+  test("test current zstd compressor on legacy store with snappy") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    createTable()
+    loadData()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test current snappy compressor on legacy store with zstd") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    createTable()
+    loadData()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16)))
+  }
+
+  test("test compaction with different compressor for each load") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    createTable()
+    loadData()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    loadData()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    loadData()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    loadData()
+
+    // there are 8 loads
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
+    assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8)
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+    sql(s"CLEAN FILES FOR TABLE $tableName")
+    // after compaction and clean, there should be on segment
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
+    assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1)
+  }
+
+  test("test data loading with unsupported compressor and onheap") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "false")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"fake")
+    createTable()
+    val exception = intercept[UnsupportedOperationException] {
+      loadData()
+    }
+    assert(exception.getMessage.contains("Invalid compressor type provided"))
+  }
+
+  test("test compaction with unsupported compressor") {
+    createTable()
+    loadData()
+    loadData()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"fake")
+    val exception = intercept[UnsupportedOperationException] {
+      sql(s"ALTER TABLE $tableName COMPACT 'major'")
+    }
+    assert(exception.getMessage.contains("Invalid compressor type provided"))
+  }
+
+  private def generateAllDataTypeDF(lineNum: Int) = {
+    val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    val calendar = Calendar.getInstance()
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to lineNum)
+      .map { p =>
+        calendar.add(Calendar.HOUR, p)
+        Rcd(Random.nextBoolean(), (Random.nextInt() % Short.MaxValue).toShort, 
Random.nextInt(), Random.nextLong(),
+          Random.nextDouble(), Random.nextString(6), 
tsFormat.format(calendar.getTime), 0.01 * p,
+          dateFormat.format(calendar.getTime), s"$p", Random.nextFloat(), 
s"stringDict$p",
+          s"stringSort$p", s"stringLocalDict$p", 
RandomStringUtils.randomAlphabetic(33000))
+      }
+      .toDF()
+      .cache()
+  }
+
+  test("test data loading & compaction with more pages and change the 
compressor during loading") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, 
"2000")
+    val lineNum = 5000
+    val df = generateAllDataTypeDF(lineNum)
+
+    def loadDataAsync(): Future[_] = {
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          df.write
+            .format("carbondata")
+            .option("tableName", tableName)
+            .mode(SaveMode.Append)
+            .save()
+        }
+      })
+    }
+
+    createTable()
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    var future = loadDataAsync()
+    // change the compressor randomly during the loading
+    while (!future.isDone) {
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if 
(Random.nextBoolean()) "snappy" else "zstd")
+    }
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    future = loadDataAsync()
+    while (!future.isDone) {
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if 
(Random.nextBoolean()) "snappy" else "zstd")
+    }
+
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName 
WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), 
Row("stringDict1", "stringSort1")))
+
+    def compactAsync(): Future[_] = {
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          sql(s"ALTER TABLE $tableName COMPACT 'MAJOR'")
+        }
+      })
+    }
+
+    // change the compressor randomly during compaction
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    future = compactAsync()
+    while (!future.isDone) {
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, if 
(Random.nextBoolean()) "snappy" else "zstd")
+    }
+
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 2)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName 
WHERE stringDictField='stringDict1'"), Seq(Row("stringDict1", "stringSort1"), 
Row("stringDict1", "stringSort1")))
+  }
+
+  test("test creating table with specified compressor") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    // the system configuration for compressor is snappy
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    // create table with zstd as compressor
+    createTable(columnCompressor = "zstd")
+    loadData()
+    checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(8)))
+    val carbonTable = CarbonEnv.getCarbonTable(Option("default"), 
tableName)(sqlContext.sparkSession)
+    val tableColumnCompressor = 
carbonTable.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.COMPRESSOR)
+    assert("zstd".equalsIgnoreCase(tableColumnCompressor))
+  }
+
+  test("test creating table with unsupported compressor") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+    // the system configuration for compressor is snappy
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    // create table with unsupported compressor
+    val exception = intercept[InvalidConfigurationException] {
+      createTable (columnCompressor = "fakecompressor")
+    }
+    assert(exception.getMessage.contains("fakecompressor compressor is not 
supported"))
+  }
+
+  private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,
+      saveMode: SaveMode = SaveMode.Overwrite): Unit = {
+    val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    val calendar = Calendar.getInstance()
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to lineNum)
+      .map { p =>
+        calendar.add(Calendar.HOUR, p)
+        Rcd(Random.nextBoolean(), (Random.nextInt() % Short.MaxValue / 
2).toShort, Random.nextInt(), Random.nextLong(),
+          Random.nextDouble(), RandomStringUtils.randomAlphabetic(6), 
tsFormat.format(calendar.getTime), 0.01 * p,
+          dateFormat.format(calendar.getTime), s"$p", Random.nextFloat(), 
s"stringDict$p",
+          s"stringSort$p", s"stringLocalDict$p", 
RandomStringUtils.randomAlphabetic(3))
+      }
+      .toDF()
+      .write
+      .option("header", "false")
+      .mode(saveMode)
+      .csv(csvDir)
+  }
+
+  test("test streaming ingestion with different compressor for each 
mini-batch") {
+    createTable(streaming = true)
+    val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
tableName)(sqlContext.sparkSession)
+    val lineNum = 10
+    val dataLocation = new File(csvDataDir).getCanonicalPath
+
+    def doStreamingIngestionThread(): Thread = {
+      new Thread() {
+        override def run(): Unit = {
+          var streamingQuery: StreamingQuery = null
+          try {
+            val streamingQuery = sqlContext.sparkSession.readStream
+              .text(dataLocation)
+              .writeStream
+              .format("carbondata")
+              .trigger(ProcessingTime(s"1 seconds"))
+              .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+              .option("dbName", "default")
+              .option("tableName", tableName)
+              .option(CarbonStreamParser.CARBON_STREAM_PARSER, 
CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
+              .start()
+            streamingQuery.awaitTermination()
+          } catch {
+            case ex: Exception => LOGGER.error(ex)
+          } finally {
+            streamingQuery.stop()
+          }
+        }
+      }
+    }
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    generateAllDataTypeFiles(lineNum, dataLocation)
+    val thread = doStreamingIngestionThread()
+    thread.start()
+    Thread.sleep(10 * 1000)
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append)
+    Thread.sleep(10 * 1000)
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"zstd")
+    generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append)
+    Thread.sleep(10 * 1000)
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
"snappy")
+    generateAllDataTypeFiles(lineNum, dataLocation, SaveMode.Append)
+    Thread.sleep(40 * 1000)
+    thread.interrupt()
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 4)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName 
WHERE stringDictField='stringDict1'"),
+      Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", 
"stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", 
"stringSort1")))
+
+    sql(s"alter table $tableName compact 'streaming'")
+
+    checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(lineNum * 4)))
+    checkAnswer(sql(s"SELECT stringDictField, stringSortField FROM $tableName 
WHERE stringDictField='stringDict1'"),
+      Seq(Row("stringDict1", "stringSort1"), Row("stringDict1", 
"stringSort1"), Row("stringDict1", "stringSort1"), Row("stringDict1", 
"stringSort1")))
+    try {
+      sql(s"DROP TABLE IF EXISTS $tableName")
+    } catch {
+      case _: Exception =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index a03a5eb..643471c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import 
org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataFileFooterConverterV3}
+import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, 
CarbonUtil, DataFileFooterConverterV3}
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.sdk.file._
 
@@ -2604,16 +2604,18 @@ object testUtil{
       data: Array[String]): Boolean = {
     val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
     if (null != local_dictionary) {
+      val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
       val encodings = local_dictionary.getDictionary_meta.encoders
       val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
       val encodingFactory = DefaultEncodingFactory.getInstance
-      val decoder = encodingFactory.createDecoder(encodings, encoderMetas)
+      val decoder = encodingFactory.createDecoder(encodings, encoderMetas, 
compressorName)
       val dictionaryPage = decoder
         .decode(local_dictionary.getDictionary_data, 0, 
local_dictionary.getDictionary_data.length)
       val dictionaryMap = new
           util.HashMap[DictionaryByteArrayWrapper, Integer]
       val usedDictionaryValues = util.BitSet
-        .valueOf(CompressorFactory.getInstance.getCompressor
+        .valueOf(CompressorFactory.getInstance.getCompressor(compressorName)
           .unCompressByte(local_dictionary.getDictionary_values))
       var index = 0
       var i = usedDictionaryValues.nextSetBit(0)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
index 59586c0..e88d8a9 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
@@ -35,7 +35,7 @@ import 
org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import 
org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
-import org.apache.carbondata.core.util.{CarbonProperties, 
DataFileFooterConverterV3}
+import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, 
DataFileFooterConverterV3}
 
 class LocalDictionarySupportLoadTableTest extends QueryTest with 
BeforeAndAfterAll {
 
@@ -277,16 +277,18 @@ class LocalDictionarySupportLoadTableTest extends 
QueryTest with BeforeAndAfterA
       data: Array[String]): Boolean = {
     val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
     if (null != local_dictionary) {
+      val compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
+        rawColumnPage.getDataChunkV3.getData_chunk_list.get(0).getChunk_meta)
       val encodings = local_dictionary.getDictionary_meta.encoders
       val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
       val encodingFactory = DefaultEncodingFactory.getInstance
-      val decoder = encodingFactory.createDecoder(encodings, encoderMetas)
+      val decoder = encodingFactory.createDecoder(encodings, encoderMetas, 
compressorName)
       val dictionaryPage = decoder
         .decode(local_dictionary.getDictionary_data, 0, 
local_dictionary.getDictionary_data.length)
       val dictionaryMap = new
           util.HashMap[DictionaryByteArrayWrapper, Integer]
       val usedDictionaryValues = util.BitSet
-        .valueOf(CompressorFactory.getInstance.getCompressor
+        .valueOf(CompressorFactory.getInstance.getCompressor(compressorName)
           .unCompressByte(local_dictionary.getDictionary_values))
       var index = 0
       var i = usedDictionaryValues.nextSetBit(0)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index cc8a28e..b382693 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -30,6 +30,7 @@ import 
org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sin
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, 
NonSecureDictionaryServer}
 import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
@@ -271,6 +272,10 @@ object StreamSinkFactory {
       getConf.get("spark.driver.host")
     carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
     carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt)
+    val columnCompressor = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
     carbonLoadModel
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 57887a7..6350b50 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -51,6 +51,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.block.{Distributable, 
TableBlockInfo}
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
@@ -288,6 +289,10 @@ object CarbonDataRDDFactory {
     loadModel.readAndSetLoadMetadataDetails()
     val loadStartTime = CarbonUpdateUtil.readCurrentTime()
     loadModel.setFactTimeStamp(loadStartTime)
+    val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    loadModel.setColumnCompressor(columnCompressor)
     loadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 747b064..6d69eb5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.stream;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -33,6 +32,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -64,7 +64,6 @@ import org.apache.carbondata.hadoop.InputMetricsStats;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.streaming.CarbonStreamInputFormat;
-import org.apache.carbondata.streaming.CarbonStreamUtils;
 import org.apache.carbondata.streaming.StreamBlockletReader;
 
 import org.apache.hadoop.conf.Configuration;
@@ -110,6 +109,7 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
   private CacheProvider cacheProvider;
   private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
   private GenericQueryType[] queryTypes;
+  private String compressorName;
 
   // vectorized reader
   private StructType outputSchema;
@@ -262,6 +262,12 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
   private byte[] getSyncMarker(String filePath) throws IOException {
     CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
     FileHeader header = headerReader.readHeader();
+    // legacy store does not have this member
+    if (header.isSetCompressor_name()) {
+      compressorName = header.getCompressor_name();
+    } else {
+      compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName();
+    }
     return header.getSync_marker();
   }
 
@@ -285,7 +291,7 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
     FSDataInputStream fileIn = fs.open(file, bufferSize);
     fileIn.seek(fileSplit.getStart());
     input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
-        fileSplit.getStart() == 0);
+        fileSplit.getStart() == 0, compressorName);
 
     cacheProvider = CacheProvider.getInstance();
     cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e0b0547..a13dfdc 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,6 +34,7 @@ import org.apache.spark.util.AlterTableUtil
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -159,6 +160,10 @@ case class CarbonAlterTableCompactionCommand(
       carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(table.getTablePath)
+      val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
 
       var storeLocation = System.getProperty("java.io.tmpdir")
       storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 63da404..f7a5f42 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -51,6 +51,7 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, 
NonSecureDictionaryServer}
 import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
@@ -206,6 +207,10 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, 
"false").toBoolean)
       
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+      val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
 
       val javaPartition = mutable.Map[String, String]()
       partition.foreach { case (k, v) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 807c925..6c8b0b0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -123,6 +124,10 @@ case class CarbonAlterTableAddHivePartitionCommand(
             "Schema of index files located in location is not matching with 
current table schema")
         }
         val loadModel = new CarbonLoadModel
+        val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+          .getOrElse(CarbonCommonConstants.COMPRESSOR,
+            CompressorFactory.getInstance().getCompressor.getName)
+        loadModel.setColumnCompressor(columnCompressor)
         loadModel.setCarbonTransactionalTable(true)
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index cd26fe8..b76a485 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -145,6 +146,10 @@ case class CarbonAlterTableDropPartitionCommand(
       carbonLoadModel.setTablePath(table.getTablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)
+      val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
       alterTableDropPartition(
         sparkSession.sqlContext,
         model.partitionId,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index f4b6de0..753abaf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -33,8 +33,8 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -142,8 +142,12 @@ case class CarbonAlterTableSplitPartitionCommand(
         LockUsage.ALTER_PARTITION_LOCK)
       locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
         locksToBeAcquired)(sparkSession)
-      val carbonLoadModel = new CarbonLoadModel()
       val table = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+        .getOrElse(CarbonCommonConstants.COMPRESSOR,
+          CompressorFactory.getInstance().getCompressor.getName)
+      carbonLoadModel.setColumnCompressor(columnCompressor)
       val tablePath = table.getTablePath
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 1beda11..42ea0bd 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -26,6 +27,7 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -99,6 +101,18 @@ case class CarbonCreateTableCommand(
         throwMetadataException(dbName, tableName, "Table should have at least 
one column.")
       }
 
+      // Add validatation for column compressor when create table
+      val columnCompressor = tableInfo.getFactTable.getTableProperties.get(
+        CarbonCommonConstants.COMPRESSOR)
+      try {
+        if (null != columnCompressor) {
+          CompressorFactory.getInstance().getCompressor(columnCompressor)
+        }
+      } catch {
+        case ex : UnsupportedOperationException =>
+          throw new InvalidConfigurationException(ex.getMessage)
+      }
+
       val operationContext = new OperationContext
       val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
         CreateTablePreExecutionEvent(sparkSession, tableIdentifier, 
Some(tableInfo))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 6716707..b605a1d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore
 import org.apache.carbondata.core.metadata.SegmentFileStore
@@ -87,6 +88,11 @@ with Serializable {
     val table = CarbonEnv.getCarbonTable(
       TableIdentifier(options("tableName"), 
options.get("dbName")))(sparkSession)
     val model = new CarbonLoadModel
+    val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    model.setColumnCompressor(columnCompressor)
+
     val carbonProperty = CarbonProperties.getInstance()
     val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
     val tableProperties = table.getTableInfo.getFactTable.getTableProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 3298009..08c149b 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -70,6 +71,11 @@ class AllDictionaryTestCase extends Spark2QueryTest with 
BeforeAndAfterAll {
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)
     }
+    import scala.collection.JavaConverters._
+    val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index d98229a..060afca 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -185,6 +186,11 @@ class ExternalColumnDictionaryTestCase extends 
Spark2QueryTest with BeforeAndAft
     if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
       FileFactory.mkdirs(metadataDirectoryPath, fileType)
     }
+    import scala.collection.JavaConverters._
+    val columnCompressor = 
table.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.COMPRESSOR,
+        CompressorFactory.getInstance().getCompressor.getName)
+    carbonLoadModel.setColumnCompressor(columnCompressor)
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 7ef86a5..a49d5bb 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest 
with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    // 2096 is the size of carbon table
-    assertResult(2147)(result(0).getLong(1))
+    // 2087 is the size of carbon table. Note that since 1.5.0, we add 
additional compressor name in metadata
+    assertResult(2187)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2147)(result(1).getLong(1))
+    assertResult(2187)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 46ad32f..4d85296 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -121,6 +121,11 @@ public class CarbonDataLoadConfiguration {
 
   private String parentTablePath;
 
+  /**
+   * name of compressor to be used to compress column page
+   */
+  private String columnCompressor;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -408,4 +413,11 @@ public class CarbonDataLoadConfiguration {
     return complexNonDictionaryColumnCount;
   }
 
+  public String getColumnCompressor() {
+    return columnCompressor;
+  }
+
+  public void setColumnCompressor(String columnCompressor) {
+    this.columnCompressor = columnCompressor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 225da26..f89bc2f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -314,6 +314,7 @@ public final class DataLoadProcessBuilder {
     if (loadModel.getSdkWriterCores() > 0) {
       configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
     }
+    configuration.setColumnCompressor(loadModel.getColumnCompressor());
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 97e329d..e15fb5d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -229,6 +229,11 @@ public class CarbonLoadModel implements Serializable {
 
   private List<String> mergedSegmentIds;
 
+  /**
+   * compressor used to compress column page
+   */
+  private String columnCompressor;
+
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
   }
@@ -473,6 +478,7 @@ public class CarbonLoadModel implements Serializable {
     copy.loadMinSize = loadMinSize;
     copy.parentTablePath = parentTablePath;
     copy.sdkWriterCores = sdkWriterCores;
+    copy.columnCompressor = columnCompressor;
     return copy;
   }
 
@@ -529,6 +535,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.loadMinSize = loadMinSize;
     copyObj.parentTablePath = parentTablePath;
     copyObj.sdkWriterCores = sdkWriterCores;
+    copyObj.columnCompressor = columnCompressor;
     return copyObj;
   }
 
@@ -921,4 +928,12 @@ public class CarbonLoadModel implements Serializable {
   public void setSdkWriterCores(short sdkWriterCores) {
     this.sdkWriterCores = sdkWriterCores;
   }
+
+  public String getColumnCompressor() {
+    return columnCompressor;
+  }
+
+  public void setColumnCompressor(String columnCompressor) {
+    this.columnCompressor = columnCompressor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 2ebcb29..bcc904c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -29,7 +29,10 @@ import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -48,7 +51,8 @@ import org.apache.hadoop.conf.Configuration;
  */
 @InterfaceAudience.Internal
 public class CarbonLoadModelBuilder {
-
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      CarbonLoadModelBuilder.class.getName());
   private CarbonTable table;
 
   public CarbonLoadModelBuilder(CarbonTable table) {
@@ -104,6 +108,7 @@ public class CarbonLoadModelBuilder {
     } catch (NumberFormatException e) {
       throw new InvalidLoadOptionException(e.getMessage());
     }
+    validateAndSetColumnCompressor(model);
     return model;
   }
 
@@ -280,6 +285,8 @@ public class CarbonLoadModelBuilder {
     
carbonLoadModel.setSortColumnsBoundsStr(optionsFinal.get("sort_column_bounds"));
     carbonLoadModel.setLoadMinSize(
         optionsFinal.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB));
+
+    validateAndSetColumnCompressor(carbonLoadModel);
   }
 
   private int validateMaxColumns(String[] csvHeaders, String maxColumns)
@@ -369,6 +376,23 @@ public class CarbonLoadModelBuilder {
     }
   }
 
+  private void validateAndSetColumnCompressor(CarbonLoadModel carbonLoadModel)
+      throws InvalidLoadOptionException {
+    try {
+      String columnCompressor = carbonLoadModel.getColumnCompressor();
+      if (StringUtils.isBlank(columnCompressor)) {
+        columnCompressor = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.COMPRESSOR, 
CarbonCommonConstants.DEFAULT_COMPRESSOR);
+      }
+      // check and load compressor
+      CompressorFactory.getInstance().getCompressor(columnCompressor);
+      carbonLoadModel.setColumnCompressor(columnCompressor);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new InvalidLoadOptionException("Failed to load the compressor");
+    }
+  }
+
   /**
    * check whether using default value or not
    */

Reply via email to