Repository: carbondata Updated Branches: refs/heads/master 7b31b9168 -> 8f08c4abb
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 26ee65a..54dd0aa 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -179,6 +179,8 @@ public class CarbonFactDataHandlerModel { private List<Integer> varcharDimIdxInNoDict; + private String columnCompressor; + /** * Create the model using @{@link CarbonDataLoadConfiguration} */ @@ -284,6 +286,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.taskExtension = taskExtension; carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec(); carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration); + carbonFactDataHandlerModel.columnCompressor = configuration.getColumnCompressor(); if (listener == null) { listener = new DataMapWriterListener(); @@ -364,6 +367,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality()); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); + carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor()); carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable); DataMapWriterListener listener = new DataMapWriterListener(); @@ -700,5 +704,12 @@ public class CarbonFactDataHandlerModel { return varcharDimIdxInNoDict; } + public String getColumnCompressor() { + return columnCompressor; + } + + public void setColumnCompressor(String columnCompressor) { + this.columnCompressor = columnCompressor; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index c46b2c2..73746d6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.ComplexColumnPage; import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory; import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; @@ -85,12 +86,16 @@ public class TablePage { // used for complex column to deserilize the byte array in input CarbonRow private Map<Integer, GenericDataType> complexIndexMap = null; + // name of compressor that used to compress column data, + // currently all the columns share the same compressor. + private String columnCompressor; TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException { this.model = model; this.pageSize = pageSize; int numDictDimension = model.getMDKeyGenerator().getDimCount(); TableSpec tableSpec = model.getTableSpec(); + this.columnCompressor = model.getColumnCompressor(); dictDimensionPages = new ColumnPage[numDictDimension]; noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()]; @@ -102,7 +107,8 @@ public class TablePage { ColumnPage page; if (ColumnType.GLOBAL_DICTIONARY == columnType || ColumnType.DIRECT_DICTIONARY == columnType) { - page = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize); + page = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.BYTE_ARRAY, columnCompressor), pageSize); page.setStatsCollector(KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY)); dictDimensionPages[tmpNumDictDimIdx++] = page; } else { @@ -113,11 +119,13 @@ public class TablePage { if (DataTypes.VARCHAR == spec.getSchemaDataType()) { dataType = DataTypes.VARCHAR; } + ColumnPageEncoderMeta columnPageEncoderMeta = + new ColumnPageEncoderMeta(spec, dataType, columnCompressor); if (null != localDictionaryGenerator) { - page = ColumnPage - .newLocalDictPage(spec, dataType, pageSize, localDictionaryGenerator, false); + page = ColumnPage.newLocalDictPage( + columnPageEncoderMeta, pageSize, localDictionaryGenerator, false); } else { - page = ColumnPage.newPage(spec, dataType, pageSize); + page = ColumnPage.newPage(columnPageEncoderMeta, pageSize); } if (DataTypes.VARCHAR == dataType) { page.setStatsCollector(LVLongStringStatsCollector.newInstance()); @@ -136,15 +144,15 @@ public class TablePage { measurePages = new ColumnPage[model.getMeasureCount()]; DataType[] dataTypes = model.getMeasureDataType(); for (int i = 0; i < measurePages.length; i++) { - TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i); + ColumnPageEncoderMeta columnPageEncoderMeta = new ColumnPageEncoderMeta( + model.getTableSpec().getMeasureSpec(i), dataTypes[i], columnCompressor); ColumnPage page; - if (DataTypes.isDecimal(spec.getSchemaDataType())) { - page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize); + if (DataTypes.isDecimal(columnPageEncoderMeta.getSchemaDataType())) { + page = ColumnPage.newDecimalPage(columnPageEncoderMeta, pageSize); } else { - page = ColumnPage.newPage(spec, dataTypes[i], pageSize); + page = ColumnPage.newPage(columnPageEncoderMeta, pageSize); } - page.setStatsCollector( - PrimitivePageStatsCollector.newInstance(dataTypes[i])); + page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i])); measurePages[i] = page; } @@ -239,8 +247,8 @@ public class TablePage { complexDataType.getComplexColumnInfo(complexColumnInfoList); complexDimensionPages[index] = new ComplexColumnPage(complexColumnInfoList); try { - complexDimensionPages[index] - .initialize(model.getColumnLocalDictGenMap(), pageSize); + complexDimensionPages[index].initialize( + model.getColumnLocalDictGenMap(), pageSize, columnCompressor); } catch (MemoryException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index 6325528..f7ce1f2 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -27,6 +27,8 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; @@ -35,6 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; @@ -89,6 +92,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { private int measureCount; private DataType[] measureDataTypes; private StreamBlockletWriter output = null; + private String compressorName; // data write private String segmentDir; @@ -155,25 +159,41 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger); configuration.setCardinalityFinder(converter); converter.initialize(); - // initialize encoder - nullBitSet = new BitSet(dataFields.length); - int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE, - CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT); - output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize, - isNoDictionaryDimensionColumn.length, measureCount, - measureDataTypes); - // initialize data writer + + // initialize data writer and compressor String filePath = segmentDir + File.separator + fileName; FileFactory.FileType fileType = FileFactory.getFileType(filePath); CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType); if (carbonFile.exists()) { // if the file is existed, use the append api outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType); + // get the compressor from the fileheader. In legacy store, + // the compressor name is not set and it use snappy compressor + FileHeader header = new CarbonHeaderReader(filePath).readHeader(); + if (header.isSetCompressor_name()) { + compressorName = header.getCompressor_name(); + } else { + compressorName = CompressorFactory.SupportedCompressor.SNAPPY.getName(); + } } else { // IF the file is not existed, use the create api outputStream = FileFactory.getDataOutputStream(filePath, fileType); + compressorName = carbonTable.getTableInfo().getFactTable().getTableProperties().get( + CarbonCommonConstants.COMPRESSOR); + if (null == compressorName) { + compressorName = CompressorFactory.getInstance().getCompressor().getName(); + } writeFileHeader(); } + + // initialize encoder + nullBitSet = new BitSet(dataFields.length); + int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE, + CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT); + output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize, + isNoDictionaryDimensionColumn.length, measureCount, + measureDataTypes, compressorName); + isFirstRow = false; } @@ -295,6 +315,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { fileHeader.setIs_footer_present(false); fileHeader.setIs_splitable(true); fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); + fileHeader.setCompressor_name(compressorName); outputStream.write(CarbonUtil.getByteArray(fileHeader)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java index 5c7ad5e..0467fe4 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java @@ -41,13 +41,13 @@ public class StreamBlockletReader { private final long limitStart; private final long limitEnd; private boolean isAlreadySync = false; - private Compressor compressor = CompressorFactory.getInstance().getCompressor(); + private Compressor compressor; private int rowNums = 0; private int rowIndex = 0; private boolean isHeaderPresent; public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, - boolean isHeaderPresent) { + boolean isHeaderPresent, String compressorName) { this.syncMarker = syncMarker; syncLen = syncMarker.length; syncBuffer = new byte[syncLen]; @@ -55,6 +55,7 @@ public class StreamBlockletReader { limitStart = limit; limitEnd = limitStart + syncLen; this.isHeaderPresent = isHeaderPresent; + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); } private void ensureCapacity(int capacity) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java index d4322b4..c538451 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java @@ -47,7 +47,7 @@ public class StreamBlockletWriter { private int rowSize; private int count = 0; private int rowIndex = -1; - private Compressor compressor = CompressorFactory.getInstance().getCompressor(); + private Compressor compressor; private int dimCountWithoutComplex; private int measureCount; @@ -60,7 +60,7 @@ public class StreamBlockletWriter { private BlockletMinMaxIndex blockletMinMaxIndex; StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize, int dimCountWithoutComplex, - int measureCount, DataType[] measureDataTypes) { + int measureCount, DataType[] measureDataTypes, String compressorName) { buffer = new byte[maxSize]; this.maxSize = maxSize; this.maxRowNum = maxRowNum; @@ -68,6 +68,7 @@ public class StreamBlockletWriter { this.dimCountWithoutComplex = dimCountWithoutComplex; this.measureCount = measureCount; this.measureDataTypes = measureDataTypes; + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); initializeStatsCollector(); }
