Repository: carbondata
Updated Branches:
  refs/heads/master 7b31b9168 -> 8f08c4abb


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 26ee65a..54dd0aa 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -179,6 +179,8 @@ public class CarbonFactDataHandlerModel {
 
   private List<Integer> varcharDimIdxInNoDict;
 
+  private String columnCompressor;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -284,6 +286,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.taskExtension = taskExtension;
     carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
     carbonFactDataHandlerModel.sortScope = 
CarbonDataProcessorUtil.getSortScope(configuration);
+    carbonFactDataHandlerModel.columnCompressor = 
configuration.getColumnCompressor();
 
     if (listener == null) {
       listener = new DataMapWriterListener();
@@ -364,6 +367,7 @@ public class CarbonFactDataHandlerModel {
     
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+    
carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor());
 
     carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable);
     DataMapWriterListener listener = new DataMapWriterListener();
@@ -700,5 +704,12 @@ public class CarbonFactDataHandlerModel {
     return varcharDimIdxInNoDict;
   }
 
+  public String getColumnCompressor() {
+    return columnCompressor;
+  }
+
+  public void setColumnCompressor(String columnCompressor) {
+    this.columnCompressor = columnCompressor;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index c46b2c2..73746d6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import 
org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
@@ -85,12 +86,16 @@ public class TablePage {
 
   // used for complex column to deserilize the byte array in input CarbonRow
   private Map<Integer, GenericDataType> complexIndexMap = null;
+  // name of compressor that used to compress column data,
+  // currently all the columns share the same compressor.
+  private String columnCompressor;
 
   TablePage(CarbonFactDataHandlerModel model, int pageSize) throws 
MemoryException {
     this.model = model;
     this.pageSize = pageSize;
     int numDictDimension = model.getMDKeyGenerator().getDimCount();
     TableSpec tableSpec = model.getTableSpec();
+    this.columnCompressor = model.getColumnCompressor();
 
     dictDimensionPages = new ColumnPage[numDictDimension];
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
@@ -102,7 +107,8 @@ public class TablePage {
       ColumnPage page;
       if (ColumnType.GLOBAL_DICTIONARY == columnType
           || ColumnType.DIRECT_DICTIONARY == columnType) {
-        page = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+        page = ColumnPage.newPage(
+            new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, 
columnCompressor), pageSize);
         
page.setStatsCollector(KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY));
         dictDimensionPages[tmpNumDictDimIdx++] = page;
       } else {
@@ -113,11 +119,13 @@ public class TablePage {
         if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
           dataType = DataTypes.VARCHAR;
         }
+        ColumnPageEncoderMeta columnPageEncoderMeta =
+            new ColumnPageEncoderMeta(spec, dataType, columnCompressor);
         if (null != localDictionaryGenerator) {
-          page = ColumnPage
-              .newLocalDictPage(spec, dataType, pageSize, 
localDictionaryGenerator, false);
+          page = ColumnPage.newLocalDictPage(
+              columnPageEncoderMeta, pageSize, localDictionaryGenerator, 
false);
         } else {
-          page = ColumnPage.newPage(spec, dataType, pageSize);
+          page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
         }
         if (DataTypes.VARCHAR == dataType) {
           page.setStatsCollector(LVLongStringStatsCollector.newInstance());
@@ -136,15 +144,15 @@ public class TablePage {
     measurePages = new ColumnPage[model.getMeasureCount()];
     DataType[] dataTypes = model.getMeasureDataType();
     for (int i = 0; i < measurePages.length; i++) {
-      TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i);
+      ColumnPageEncoderMeta columnPageEncoderMeta = new ColumnPageEncoderMeta(
+          model.getTableSpec().getMeasureSpec(i), dataTypes[i], 
columnCompressor);
       ColumnPage page;
-      if (DataTypes.isDecimal(spec.getSchemaDataType())) {
-        page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize);
+      if (DataTypes.isDecimal(columnPageEncoderMeta.getSchemaDataType())) {
+        page = ColumnPage.newDecimalPage(columnPageEncoderMeta, pageSize);
       } else {
-        page = ColumnPage.newPage(spec, dataTypes[i], pageSize);
+        page = ColumnPage.newPage(columnPageEncoderMeta, pageSize);
       }
-      page.setStatsCollector(
-          PrimitivePageStatsCollector.newInstance(dataTypes[i]));
+      
page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i]));
       measurePages[i] = page;
     }
 
@@ -239,8 +247,8 @@ public class TablePage {
       complexDataType.getComplexColumnInfo(complexColumnInfoList);
       complexDimensionPages[index] = new 
ComplexColumnPage(complexColumnInfoList);
       try {
-        complexDimensionPages[index]
-            .initialize(model.getColumnLocalDictGenMap(), pageSize);
+        complexDimensionPages[index].initialize(
+            model.getColumnLocalDictGenMap(), pageSize, columnCompressor);
       } catch (MemoryException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 6325528..f7ce1f2 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -27,6 +27,8 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
@@ -35,6 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -89,6 +92,7 @@ public class CarbonStreamRecordWriter extends 
RecordWriter<Void, Object> {
   private int measureCount;
   private DataType[] measureDataTypes;
   private StreamBlockletWriter output = null;
+  private String compressorName;
 
   // data write
   private String segmentDir;
@@ -155,25 +159,41 @@ public class CarbonStreamRecordWriter extends 
RecordWriter<Void, Object> {
     converter = new RowConverterImpl(configuration.getDataFields(), 
configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
     converter.initialize();
-    // initialize encoder
-    nullBitSet = new BitSet(dataFields.length);
-    int rowBufferSize = 
hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
-        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
-    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize,
-        isNoDictionaryDimensionColumn.length, measureCount,
-        measureDataTypes);
-    // initialize data writer
+
+    // initialize data writer and compressor
     String filePath = segmentDir + File.separator + fileName;
     FileFactory.FileType fileType = FileFactory.getFileType(filePath);
     CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
     if (carbonFile.exists()) {
       // if the file is existed, use the append api
       outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, 
fileType);
+      // get the compressor from the fileheader. In legacy store,
+      // the compressor name is not set and it use snappy compressor
+      FileHeader header = new CarbonHeaderReader(filePath).readHeader();
+      if (header.isSetCompressor_name()) {
+        compressorName = header.getCompressor_name();
+      } else {
+        compressorName = 
CompressorFactory.SupportedCompressor.SNAPPY.getName();
+      }
     } else {
       // IF the file is not existed, use the create api
       outputStream = FileFactory.getDataOutputStream(filePath, fileType);
+      compressorName = 
carbonTable.getTableInfo().getFactTable().getTableProperties().get(
+          CarbonCommonConstants.COMPRESSOR);
+      if (null == compressorName) {
+        compressorName = 
CompressorFactory.getInstance().getCompressor().getName();
+      }
       writeFileHeader();
     }
+
+    // initialize encoder
+    nullBitSet = new BitSet(dataFields.length);
+    int rowBufferSize = 
hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
+        CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
+    output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize,
+        isNoDictionaryDimensionColumn.length, measureCount,
+        measureDataTypes, compressorName);
+
     isFirstRow = false;
   }
 
@@ -295,6 +315,7 @@ public class CarbonStreamRecordWriter extends 
RecordWriter<Void, Object> {
     fileHeader.setIs_footer_present(false);
     fileHeader.setIs_splitable(true);
     fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+    fileHeader.setCompressor_name(compressorName);
     outputStream.write(CarbonUtil.getByteArray(fileHeader));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
index 5c7ad5e..0467fe4 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
@@ -41,13 +41,13 @@ public class StreamBlockletReader {
   private final long limitStart;
   private final long limitEnd;
   private boolean isAlreadySync = false;
-  private Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  private Compressor compressor;
   private int rowNums = 0;
   private int rowIndex = 0;
   private boolean isHeaderPresent;
 
   public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
-      boolean isHeaderPresent) {
+      boolean isHeaderPresent, String compressorName) {
     this.syncMarker = syncMarker;
     syncLen = syncMarker.length;
     syncBuffer = new byte[syncLen];
@@ -55,6 +55,7 @@ public class StreamBlockletReader {
     limitStart = limit;
     limitEnd = limitStart + syncLen;
     this.isHeaderPresent = isHeaderPresent;
+    this.compressor = 
CompressorFactory.getInstance().getCompressor(compressorName);
   }
 
   private void ensureCapacity(int capacity) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index d4322b4..c538451 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -47,7 +47,7 @@ public class StreamBlockletWriter {
   private int rowSize;
   private int count = 0;
   private int rowIndex = -1;
-  private Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  private Compressor compressor;
 
   private int dimCountWithoutComplex;
   private int measureCount;
@@ -60,7 +60,7 @@ public class StreamBlockletWriter {
   private BlockletMinMaxIndex blockletMinMaxIndex;
 
   StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize, int 
dimCountWithoutComplex,
-      int measureCount, DataType[] measureDataTypes) {
+      int measureCount, DataType[] measureDataTypes, String compressorName) {
     buffer = new byte[maxSize];
     this.maxSize = maxSize;
     this.maxRowNum = maxRowNum;
@@ -68,6 +68,7 @@ public class StreamBlockletWriter {
     this.dimCountWithoutComplex = dimCountWithoutComplex;
     this.measureCount = measureCount;
     this.measureDataTypes = measureDataTypes;
+    this.compressor = 
CompressorFactory.getInstance().getCompressor(compressorName);
     initializeStatsCollector();
   }
 

Reply via email to