Removed unnecessary array copy and bitset checking OPtimized code
Added table_blocksize option. Removed unnecessary plan from optimized plan. Fixed test FIxed comment Rebased Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8f59a326 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8f59a326 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8f59a326 Branch: refs/heads/branch-1.1 Commit: 8f59a326ea4028ff7987e98826b00926df804ea7 Parents: 00535f4 Author: ravipesala <[email protected]> Authored: Sun Mar 5 20:32:35 2017 +0530 Committer: jackylk <[email protected]> Committed: Tue Apr 11 21:01:53 2017 +0800 ---------------------------------------------------------------------- .../carbondata/core/datastore/FileHolder.java | 4 +- .../impl/VariableLengthDimensionDataChunk.java | 16 +- ...mpressedDimensionChunkFileBasedReaderV1.java | 14 +- ...mpressedDimensionChunkFileBasedReaderV2.java | 19 +-- ...mpressedDimensionChunkFileBasedReaderV3.java | 16 +- ...CompressedMeasureChunkFileBasedReaderV1.java | 16 +- ...CompressedMeasureChunkFileBasedReaderV2.java | 19 +-- ...CompressedMeasureChunkFileBasedReaderV3.java | 34 ++-- .../chunk/store/DimensionDataChunkStore.java | 9 ++ .../SafeAbsractDimensionDataChunkStore.java | 4 + ...feVariableLengthDimensionDataChunkStore.java | 36 ++++- .../UnsafeAbstractDimensionDataChunkStore.java | 4 + ...afeVariableLengthDimesionDataChunkStore.java | 12 ++ .../core/datastore/impl/DFSFileHolderImpl.java | 8 +- .../core/datastore/impl/FileHolderImpl.java | 8 +- .../DictionaryBasedVectorResultCollector.java | 1 - .../executor/impl/AbstractQueryExecutor.java | 2 +- .../core/scan/executor/util/QueryUtil.java | 2 +- .../vector/MeasureDataVectorProcessor.java | 158 +++++++++++++------ .../apache/carbondata/core/util/ByteUtil.java | 29 ++++ .../apache/carbondata/core/util/CarbonUtil.java | 12 +- .../carbondata/examples/CompareTest.scala | 4 + .../apache/carbondata/spark/CarbonOption.scala | 2 + .../VectorizedCarbonRecordReader.java | 2 +- .../spark/sql/CarbonCatalystOperators.scala | 2 +- .../spark/sql/CarbonDataFrameWriter.scala | 13 +- .../spark/sql/CarbonDictionaryDecoder.scala | 68 ++++---- .../execution/CarbonLateDecodeStrategy.scala | 5 +- 28 files changed, 328 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java index b1eb1ee..1b972bc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java @@ -27,12 +27,12 @@ public interface FileHolder { * and length(number of bytes) need to read * * @param filePath fully qualified file path - * @param byteBuffer * @param offset reading start position, * @param length number of bytes to be read + * @return ByteBuffer * @throws IOException */ - void readByteBuffer(String filePath, ByteBuffer byteBuffer, long offset, int length) + ByteBuffer readByteBuffer(String filePath, long offset, int length) throws IOException; /** * This method will be used to read the byte array from file based on offset http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java index eac06f4..6c47bf5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java @@ -16,8 +16,6 @@ */ package org.apache.carbondata.core.datastore.chunk.impl; -import java.util.Arrays; - import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType; @@ -109,14 +107,9 @@ public class VariableLengthDimensionDataChunk extends AbstractDimensionDataChunk int vectorOffset = columnVectorInfo.vectorOffset; int len = offset + columnVectorInfo.size; for (int i = offset; i < len; i++) { - byte[] value = dataChunkStore.getRow(i); // Considering only String case now as we support only // string in no dictionary case at present. - if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) { - vector.putNull(vectorOffset++); - } else { - vector.putBytes(vectorOffset++, value); - } + dataChunkStore.fillRow(i, vector, vectorOffset++); } return column + 1; } @@ -138,14 +131,9 @@ public class VariableLengthDimensionDataChunk extends AbstractDimensionDataChunk int vectorOffset = columnVectorInfo.vectorOffset; int len = offset + columnVectorInfo.size; for (int i = offset; i < len; i++) { - byte[] value = dataChunkStore.getRow(rowMapping[i]); // Considering only String case now as we support only // string in no dictionary case at present. - if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) { - vector.putNull(vectorOffset++); - } else { - vector.putBytes(vectorOffset++, value); - } + dataChunkStore.fillRow(rowMapping[i], vector, vectorOffset++); } return column + 1; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java index 00e6351..83e0c74 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java @@ -85,12 +85,10 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead @Override public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex) throws IOException { DataChunk dataChunk = dimensionColumnChunk.get(blockletIndex); - ByteBuffer buffer = - ByteBuffer.allocateDirect(dataChunk.getDataPageLength()); + ByteBuffer buffer = null; synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, - dataChunk.getDataPageOffset(), - dataChunk.getDataPageLength()); + buffer = fileReader + .readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength()); } DimensionRawColumnChunk rawColumnChunk = new DimensionRawColumnChunk(blockletIndex, buffer, 0, dataChunk.getDataPageLength(), this); @@ -110,10 +108,8 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead FileHolder fileReader = dimensionRawColumnChunk.getFileReader(); ByteBuffer rawData = dimensionRawColumnChunk.getRawData(); - rawData.position(dimensionRawColumnChunk.getOffSet()); - byte[] data = new byte[dimensionRawColumnChunk.getLength()]; - rawData.get(data); - dataPage = COMPRESSOR.unCompressByte(data); + dataPage = COMPRESSOR.unCompressByte(rawData.array(), dimensionRawColumnChunk.getOffSet(), + dimensionRawColumnChunk.getLength()); // if row id block is present then read the row id chunk and uncompress it DataChunk dataChunk = dimensionColumnChunk.get(blockIndex); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java index b2201cd..bd8de36 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java @@ -66,9 +66,10 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead long currentDimensionOffset = dimensionChunksOffset.get(blockletIndex); length = (int) (dimensionChunksOffset.get(blockletIndex + 1) - currentDimensionOffset); } - ByteBuffer buffer = ByteBuffer.allocateDirect(length); + ByteBuffer buffer = null; synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, dimensionChunksOffset.get(blockletIndex), length); + buffer = + fileReader.readByteBuffer(filePath, dimensionChunksOffset.get(blockletIndex), length); } DimensionRawColumnChunk rawColumnChunk = new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this); @@ -92,10 +93,9 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader, int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException { long currentDimensionOffset = dimensionChunksOffset.get(startColumnBlockletIndex); - ByteBuffer buffer = ByteBuffer.allocateDirect( - (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset)); + ByteBuffer buffer = null; synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, + buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset, (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset)); } DimensionRawColumnChunk[] dataChunks = @@ -132,8 +132,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length + dimensionColumnChunk.rowid_page_length; synchronized (dimensionRawColumnChunk.getFileReader()) { - rawData = ByteBuffer.allocateDirect(totalDimensionDataLength); - dimensionRawColumnChunk.getFileReader().readByteBuffer(filePath, rawData, + rawData = dimensionRawColumnChunk.getFileReader().readByteBuffer(filePath, dimensionChunksOffset.get(blockIndex) + dimensionChunksLength.get(blockIndex), totalDimensionDataLength); } @@ -143,11 +142,9 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead copySourcePoint += dimensionChunksLength.get(blockIndex); } - byte[] data = new byte[dimensionColumnChunk.data_page_length]; - rawData.position(copySourcePoint); - rawData.get(data); // first read the data and uncompressed it - dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length); + dataPage = COMPRESSOR + .unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length); copySourcePoint += dimensionColumnChunk.data_page_length; // if row id block is present then read the row id chunk and uncompress it if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index 8bea132..9a14a85 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -88,11 +88,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead } else { length = (int) (dimensionChunksOffset.get(blockletColumnIndex + 1) - currentDimensionOffset); } - // allocate the buffer - ByteBuffer buffer = ByteBuffer.allocateDirect(length); + ByteBuffer buffer = null; // read the data from carbon data file synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, length); + buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset, length); } // get the data chunk which will have all the details about the data pages DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length); @@ -148,11 +147,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead // column we can subtract the offset of start column offset with // end column+1 offset and get the total length. long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex); - ByteBuffer buffer = ByteBuffer.allocateDirect( - (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset)); + ByteBuffer buffer = null; // read the data from carbon data file synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, + buffer = fileReader.readByteBuffer(filePath, currentDimensionOffset, (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset)); } // create raw chunk for each dimension column @@ -218,11 +216,9 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead // data chunk length + page offset int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength .get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber); - byte[] data = new byte[dimensionColumnChunk.data_page_length]; - rawData.position(copySourcePoint); - rawData.get(data); // first read the data and uncompressed it - dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length); + dataPage = COMPRESSOR + .unCompressByte(rawData.array(), copySourcePoint, dimensionColumnChunk.data_page_length); copySourcePoint += dimensionColumnChunk.data_page_length; // if row id block is present then read the row id chunk and uncompress it if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java index 107c430..64e9b45 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java @@ -82,11 +82,8 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException { DataChunk dataChunk = measureColumnChunks.get(blockIndex); - ByteBuffer buffer = - ByteBuffer.allocateDirect(dataChunk.getDataPageLength()); - fileReader - .readByteBuffer(filePath, buffer, dataChunk.getDataPageOffset(), - dataChunk.getDataPageLength()); + ByteBuffer buffer = fileReader + .readByteBuffer(filePath, dataChunk.getDataPageOffset(), dataChunk.getDataPageLength()); MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0, dataChunk.getDataPageLength(), this); rawColumnChunk.setFileReader(fileReader); @@ -104,15 +101,12 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun ReaderCompressModel compressModel = ValueCompressionUtil.getReaderCompressModel(meta); ValueCompressionHolder values = compressModel.getValueCompressionHolder(); - byte[] dataPage = new byte[measureRawColumnChunk.getLength()]; ByteBuffer rawData = measureRawColumnChunk.getRawData(); - rawData.position(measureRawColumnChunk.getOffSet()); - rawData.get(dataPage); // unCompress data - values.uncompress(compressModel.getConvertedDataType(), dataPage, 0, - dataChunk.getDataPageLength(), compressModel.getMantissa(), - compressModel.getMaxValue(), numberOfRows); + values.uncompress(compressModel.getConvertedDataType(), rawData.array(), + measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength(), + compressModel.getMantissa(), compressModel.getMaxValue(), numberOfRows); CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java index 7b6acee..3ed1292 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java @@ -58,10 +58,10 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex); dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset); } - ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength); + ByteBuffer buffer = null; synchronized (fileReader) { - fileReader - .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength); + buffer = fileReader + .readByteBuffer(filePath, measureColumnChunkOffsets.get(blockIndex), dataLength); } MeasureRawColumnChunk rawColumnChunk = new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this); @@ -85,10 +85,9 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader, int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException { long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex); - ByteBuffer buffer = ByteBuffer.allocateDirect( - (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset)); + ByteBuffer buffer = null; synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset, + buffer = fileReader.readByteBuffer(filePath, currentMeasureOffset, (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset)); } MeasureRawColumnChunk[] dataChunks = @@ -121,8 +120,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun measureColumnChunk = CarbonUtil.readDataChunk(rawData, copyPoint, measureColumnChunkLength.get(blockIndex)); synchronized (measureRawColumnChunk.getFileReader()) { - rawData = ByteBuffer.allocateDirect(measureColumnChunk.data_page_length); - measureRawColumnChunk.getFileReader().readByteBuffer(filePath, rawData, + rawData = measureRawColumnChunk.getFileReader().readByteBuffer(filePath, measureColumnChunkOffsets.get(blockIndex) + measureColumnChunkLength.get(blockIndex), measureColumnChunk.data_page_length); } @@ -139,11 +137,8 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta); ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0]; - byte[] data = new byte[measureColumnChunk.data_page_length]; - rawData.position(copyPoint); - rawData.get(data); // uncompress - values.uncompress(compressionModel.getConvertedDataType()[0], data, 0, + values.uncompress(compressionModel.getConvertedDataType()[0], rawData.array(), copyPoint, measureColumnChunk.data_page_length, compressionModel.getMantissa()[0], compressionModel.getMaxValue()[0], numberOfRows); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java index 570c0c8..36839fd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java @@ -72,33 +72,33 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun * @param blockIndex blocklet index of the column in carbon data file * @return measure raw chunk */ - @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex) - throws IOException { + @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, + int blockletColumnIndex) throws IOException { int dataLength = 0; // to calculate the length of the data to be read // column other than last column we can subtract the offset of current column with // next column and get the total length. // but for last column we need to use lastDimensionOffset which is the end position // of the last dimension, we can subtract current dimension offset from lastDimesionOffset - if (measureColumnChunkOffsets.size() - 1 == blockIndex) { - dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockIndex)); + if (measureColumnChunkOffsets.size() - 1 == blockletColumnIndex) { + dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockletColumnIndex)); } else { - dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - measureColumnChunkOffsets - .get(blockIndex)); + dataLength = + (int) (measureColumnChunkOffsets.get(blockletColumnIndex + 1) - measureColumnChunkOffsets + .get(blockletColumnIndex)); } - // allocate the buffer - ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength); + ByteBuffer buffer = null; // read the data from carbon data file synchronized (fileReader) { - fileReader - .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength); + buffer = fileReader + .readByteBuffer(filePath, measureColumnChunkOffsets.get(blockletColumnIndex), dataLength); } // get the data chunk which will have all the details about the data pages DataChunk3 dataChunk = - CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockIndex)); + CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockletColumnIndex)); // creating a raw chunks instance and filling all the details MeasureRawColumnChunk rawColumnChunk = - new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this); + new MeasureRawColumnChunk(blockletColumnIndex, buffer, 0, dataLength, this); int numberOfPages = dataChunk.getPage_length().size(); byte[][] maxValueOfEachPage = new byte[numberOfPages][]; byte[][] minValueOfEachPage = new byte[numberOfPages][]; @@ -148,11 +148,10 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun // column we can subtract the offset of start column offset with // end column+1 offset and get the total length. long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex); - ByteBuffer buffer = ByteBuffer.allocateDirect( - (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset)); + ByteBuffer buffer = null; // read the data from carbon data file synchronized (fileReader) { - fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset, + buffer = fileReader.readByteBuffer(filePath, currentMeasureOffset, (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset)); } // create raw chunk for each measure column @@ -224,11 +223,8 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta); ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0]; // uncompress - byte[] data = new byte[measureColumnChunk.data_page_length]; ByteBuffer rawData = measureRawColumnChunk.getRawData(); - rawData.position(copyPoint); - rawData.get(data); - values.uncompress(compressionModel.getConvertedDataType()[0], data, 0, + values.uncompress(compressionModel.getConvertedDataType()[0], rawData.array(), copyPoint, measureColumnChunk.data_page_length, compressionModel.getMantissa()[0], compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]); CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java index a3f7ac3..5301945 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.chunk.store; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; + /** * Interface responsibility is to store dimension data in memory. * storage can be on heap or offheap. @@ -42,6 +44,13 @@ public interface DimensionDataChunkStore { byte[] getRow(int rowId); /** + * Below method will be used to fill the row to vector + * based on row id passed + * + */ + void fillRow(int rowId, CarbonColumnVector vector, int vectorRow); + + /** * Below method will be used to fill the row values to buffer array * * @param rowId row id of the data to be filled http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java index cb10d3e..cb4c1a7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk.store.impl.safe; import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; /** * Responsibility is to store dimension data @@ -120,4 +121,7 @@ public abstract class SafeAbsractDimensionDataChunkStore implements DimensionDat throw new UnsupportedOperationException("Operation not supported"); } + @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) { + throw new UnsupportedOperationException("Operation not supported"); + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java index 28cd4c1..bb0edea 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java @@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.chunk.store.impl.safe; import java.nio.ByteBuffer; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.util.ByteUtil; /** @@ -73,16 +74,14 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens // which we have to skip dataOffsets[0] = CarbonCommonConstants.SHORT_SIZE_IN_BYTE; // creating a byte buffer which will wrap the length of the row - ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + ByteBuffer buffer = ByteBuffer.wrap(data); for (int i = 1; i < numberOfRows; i++) { - buffer.put(data, startOffset, CarbonCommonConstants.SHORT_SIZE_IN_BYTE); - buffer.flip(); + buffer.position(startOffset); // so current row position will be // previous row length + 2 bytes used for storing previous row data startOffset += buffer.getShort() + CarbonCommonConstants.SHORT_SIZE_IN_BYTE; // as same byte buffer is used to avoid creating many byte buffer for each row // we need to clear the byte buffer - buffer.clear(); dataOffsets[i] = startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE; } } @@ -113,6 +112,35 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens return currentRowData; } + @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) { + // if column was explicitly sorted we need to get the rowid based inverted index reverse + if (isExplictSorted) { + rowId = invertedIndexReverse[rowId]; + } + // now to get the row from memory block we need to do following thing + // 1. first get the current offset + // 2. if it's not a last row- get the next row offset + // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset + // else subtract the current row offset with complete data + // length get the offset of set of data + int currentDataOffset = dataOffsets[rowId]; + short length = 0; + // calculating the length of data + if (rowId < numberOfRows - 1) { + length = (short) (dataOffsets[rowId + 1] - (currentDataOffset + + CarbonCommonConstants.SHORT_SIZE_IN_BYTE)); + } else { + // for last record + length = (short) (this.data.length - currentDataOffset); + } + if (ByteUtil.UnsafeComparer.INSTANCE.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0, + CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentDataOffset, length)) { + vector.putNull(vectorRow); + } else { + vector.putBytes(vectorRow, currentDataOffset, length, data); + } + } + @Override public int compareTo(int index, byte[] compareValue) { // now to get the row from memory block we need to do following thing // 1. first get the current offset http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java index 51ccc58..3752fb6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java @@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryAllocatorFactory; import org.apache.carbondata.core.memory.MemoryBlock; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; /** * Responsibility is to store dimension data in memory. storage can be on heap @@ -169,4 +170,7 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension throw new UnsupportedOperationException("Operation not supported"); } + @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) { + throw new UnsupportedOperationException("Operation not supported"); + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java index 82ec205..f5222fe 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.util.ByteUtil; /** * Below class is responsible to store variable length dimension data chunk in @@ -156,6 +158,16 @@ public class UnsafeVariableLengthDimesionDataChunkStore return data; } + @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) { + byte[] value = getRow(rowId); + if (ByteUtil.UnsafeComparer.INSTANCE + .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) { + vector.putNull(vectorRow); + } else { + vector.putBytes(vectorRow, value); + } + } + /** * to compare the two byte array * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java index dcd74c5..d14cff7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java @@ -131,11 +131,11 @@ public class DFSFileHolderImpl implements FileHolder { return fileChannel.readInt(); } - @Override - public void readByteBuffer(String filePath, ByteBuffer byteBuffer, - long offset, int length) throws IOException { + @Override public ByteBuffer readByteBuffer(String filePath, long offset, int length) + throws IOException { byte[] readByteArray = readByteArray(filePath, offset, length); - byteBuffer.put(readByteArray); + ByteBuffer byteBuffer = ByteBuffer.wrap(readByteArray); byteBuffer.rewind(); + return byteBuffer; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java index d78c28e..36b48f5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java @@ -192,13 +192,15 @@ public class FileHolderImpl implements FileHolder { ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); return byteBffer.getLong(); } - @Override - public void readByteBuffer(String filePath, ByteBuffer byteBuffer, - long offset, int length) throws IOException { + + @Override public ByteBuffer readByteBuffer(String filePath, long offset, int length) + throws IOException { + ByteBuffer byteBuffer = ByteBuffer.allocate(length); FileChannel fileChannel = updateCache(filePath); fileChannel.position(offset); fileChannel.read(byteBuffer); byteBuffer.rewind(); + return byteBuffer; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java index 082874d..af617be 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java @@ -110,7 +110,6 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC noDictionaryInfo = noDictInfoList.toArray(new ColumnVectorInfo[noDictInfoList.size()]); complexInfo = complexList.toArray(new ColumnVectorInfo[complexList.size()]); Arrays.sort(dictionaryInfo); - Arrays.sort(noDictionaryInfo); Arrays.sort(complexInfo); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 1ddb3e6..2a5c342 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -345,7 +345,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { blockExecutionInfo .setFixedLengthKeySize(getKeySize(currentBlockQueryDimensions, segmentProperties)); Set<Integer> dictionaryColumnBlockIndex = new HashSet<Integer>(); - Set<Integer> noDictionaryColumnBlockIndex = new HashSet<Integer>(); + List<Integer> noDictionaryColumnBlockIndex = new ArrayList<Integer>(); // get the block index to be read from file for query dimension // for both dictionary columns and no dictionary columns QueryUtil.fillQueryDimensionsBlockIndexes(currentBlockQueryDimensions, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java index 130b4fb..b12cfb0 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java @@ -607,7 +607,7 @@ public class QueryUtil { */ public static void fillQueryDimensionsBlockIndexes(List<QueryDimension> queryDimensions, Map<Integer, Integer> columnOrdinalToBlockIndexMapping, - Set<Integer> dictionaryDimensionBlockIndex, Set<Integer> noDictionaryDimensionBlockIndex) { + Set<Integer> dictionaryDimensionBlockIndex, List<Integer> noDictionaryDimensionBlockIndex) { for (QueryDimension queryDimension : queryDimensions) { if (CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY) && queryDimension.getDimension().numberOfChild() == 0) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java index 6f238c9..3c65198 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java @@ -43,14 +43,22 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - if (nullBitSet.get(i)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { vector.putInt(vectorOffset, (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putInt(vectorOffset, + (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + } + vectorOffset++; } - vectorOffset++; } } @@ -62,15 +70,24 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; - if (nullBitSet.get(currentRow)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; vector.putInt(vectorOffset, (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putInt(vectorOffset, + (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + } + vectorOffset++; } - vectorOffset++; } } } @@ -84,14 +101,22 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - if (nullBitSet.get(i)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { vector.putShort(vectorOffset, (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putShort(vectorOffset, + (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + } + vectorOffset++; } - vectorOffset++; } } @@ -103,15 +128,24 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; - if (nullBitSet.get(currentRow)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; vector.putShort(vectorOffset, (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putShort(vectorOffset, + (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + } + vectorOffset++; } - vectorOffset++; } } } @@ -125,14 +159,22 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - if (nullBitSet.get(i)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { vector.putLong(vectorOffset, dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putLong(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i)); + } + vectorOffset++; } - vectorOffset++; } } @@ -144,15 +186,24 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; - if (nullBitSet.get(currentRow)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; vector.putLong(vectorOffset, dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putLong(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow)); + } + vectorOffset++; } - vectorOffset++; } } } @@ -200,7 +251,7 @@ public class MeasureDataVectorProcessor { } else { BigDecimal decimal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(currentRow); - Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal); + Decimal toDecimal = Decimal.apply(decimal); vector.putDecimal(vectorOffset, toDecimal, precision); } vectorOffset++; @@ -217,14 +268,22 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - if (nullBitSet.get(i)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { vector.putDouble(vectorOffset, dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(i)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + if (nullBitSet.get(i)) { + vector.putNull(vectorOffset); + } else { + vector.putDouble(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(i)); + } + vectorOffset++; } - vectorOffset++; } } @@ -236,15 +295,24 @@ public class MeasureDataVectorProcessor { int vectorOffset = info.vectorOffset; CarbonColumnVector vector = info.vector; BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet(); - for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; - if (nullBitSet.get(currentRow)) { - vector.putNull(vectorOffset); - } else { + if (nullBitSet.isEmpty()) { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; vector.putDouble(vectorOffset, dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(currentRow)); + vectorOffset++; + } + } else { + for (int i = offset; i < len; i++) { + int currentRow = rowMapping[i]; + if (nullBitSet.get(currentRow)) { + vector.putNull(vectorOffset); + } else { + vector.putDouble(vectorOffset, + dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(currentRow)); + } + vectorOffset++; } - vectorOffset++; } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java index 9b2c2ed..8f83f3d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java @@ -339,6 +339,35 @@ public final class ByteUtil { return true; } + public boolean equals(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2, + int length2) { + if (length1 != length2) { + return false; + } + int len = length1 / 8; + long currentOffset = CarbonUnsafe.BYTE_ARRAY_OFFSET; + for (int i = 0; i < len; i++) { + long lw = CarbonUnsafe.unsafe.getLong(buffer1, currentOffset + offset1); + long rw = CarbonUnsafe.unsafe.getLong(buffer2, currentOffset + offset2); + if (lw != rw) { + return false; + } + currentOffset += 8; + } + len = buffer1.length % 8; + if (len > 0) { + for (int i = 0; i < len; i += 1) { + long lw = CarbonUnsafe.unsafe.getByte(buffer1, currentOffset + offset1); + long rw = CarbonUnsafe.unsafe.getByte(buffer2, currentOffset + offset2); + if (lw != rw) { + return false; + } + currentOffset += 1; + } + } + return true; + } + /** * Comparing the 2 byte buffers. This is used in case of data load sorting step. * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index e10bd02..7d57fb8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1351,26 +1351,22 @@ public final class CarbonUtil { public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length) throws IOException { - byte[] data = new byte[length]; - dataChunkBuffer.position(offset); - dataChunkBuffer.get(data); + byte[] data = dataChunkBuffer.array(); return (DataChunk3) read(data, new ThriftReader.TBaseCreator() { @Override public TBase create() { return new DataChunk3(); } - }, 0, length); + }, offset, length); } public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length) throws IOException { - byte[] data = new byte[length]; - dataChunkBuffer.position(offset); - dataChunkBuffer.get(data); + byte[] data = dataChunkBuffer.array(); return (DataChunk2) read(data, new ThriftReader.TBaseCreator() { @Override public TBase create() { return new DataChunk2(); } - }, 0, length); + }, offset, length); } /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala index 82bd02a..41a7850 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala @@ -266,6 +266,7 @@ object CompareTest { .option("tempCSV", "false") .option("single_pass", "true") .option("dictionary_exclude", "id") // id is high cardinality column + .option("table_blocksize", "32") .mode(SaveMode.Overwrite) .save() } @@ -306,6 +307,8 @@ object CompareTest { // do GC and sleep for some time before running next table System.gc() Thread.sleep(1000) + System.gc() + Thread.sleep(1000) val carbonResult: Array[(Double, Int)] = runQueries(spark, carbonTableName("3")) // check result by comparing output from parquet and carbon parquetResult.zipWithIndex.foreach { case (result, index) => @@ -334,6 +337,7 @@ object CompareTest { CarbonProperties.getInstance() .addProperty("carbon.enable.vector.reader", "true") .addProperty("enable.unsafe.sort", "true") + .addProperty("carbon.blockletgroup.size.in.mb", "32") import org.apache.spark.sql.CarbonSession._ val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index c29c1a2..6ad10f3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -50,6 +50,8 @@ class CarbonOption(options: Map[String, String]) { def dictionaryExclude: Option[String] = options.get("dictionary_exclude") + def tableBlockSize: Option[String] = options.get("table_blocksize") + def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt def bucketColumns: String = options.getOrElse("bucketcolumns", "") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 9db5ace..ffff956 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -73,7 +73,7 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> { /** * The default config on whether columnarBatch should be offheap. */ - private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.OFF_HEAP; private QueryModel queryModel; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index 9b1533e..5917369 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog} +import org.apache.spark.sql.hive.HiveSessionCatalog import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.types.{StringType, TimestampType} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index e6efeaa..9ad9504 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -162,7 +162,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { val carbonSchema = schema.map { field => s"${ field.name } ${ convertToCarbonType(field.dataType) }" } - val property = new StringBuilder + var property = new StringBuilder property.append( if (options.dictionaryInclude.isDefined) { s"'DICTIONARY_INCLUDE' = '${options.dictionaryInclude.get}' ," @@ -171,11 +171,20 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { } ).append( if (options.dictionaryExclude.isDefined) { - s"'DICTIONARY_EXCLUDE' = '${options.dictionaryExclude.get}'" + s"'DICTIONARY_EXCLUDE' = '${options.dictionaryExclude.get}' ," + } else { + "" + } + ).append( + if (options.tableBlockSize.isDefined) { + s"'table_blocksize' = '${options.tableBlockSize.get}'" } else { "" } ) + if (property.nonEmpty && property.charAt(property.length-1) == ',') { + property = property.replace(property.length - 1, property.length, "") + } s""" | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index d450b69..d47ff1a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -57,28 +57,8 @@ case class CarbonDictionaryDecoder( child.outputPartitioning } - val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = { - child.output.map { attribute => - val attr = aliasMap.getOrElse(attribute, attribute) - val relation = relations.find(p => p.contains(attr)) - if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) { - val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable - val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) - if (carbonDimension != null && - carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !carbonDimension.isComplex) { - (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, - carbonDimension) - } else { - (null, null, null) - } - } else { - (null, null, null) - } - }.toArray - } + val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = + CarbonDictionaryDecoder.getDictionaryColumnMapping(child.output, relations, profile, aliasMap) override def doExecute(): RDD[InternalRow] = { attachTree(this, "execute") { @@ -88,7 +68,7 @@ case class CarbonDictionaryDecoder( (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap - if (isRequiredToDecode) { + if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val dataTypes = child.output.map { attr => attr.dataType } child.execute().mapPartitions { iter => val cacheProvider: CacheProvider = CacheProvider.getInstance @@ -142,7 +122,7 @@ case class CarbonDictionaryDecoder( (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap - if (isRequiredToDecode) { + if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val cacheProvider: CacheProvider = CacheProvider.getInstance val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) @@ -255,13 +235,6 @@ case class CarbonDictionaryDecoder( child.asInstanceOf[CodegenSupport].produce(ctx, this) } - private def isRequiredToDecode = { - getDictionaryColumnIds.find(p => p._1 != null) match { - case Some(value) => true - case _ => false - } - } - private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier], cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = { val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f => @@ -422,6 +395,39 @@ object CarbonDictionaryDecoder { .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>") } } + + def getDictionaryColumnMapping(output: Seq[Attribute], + relations: Seq[CarbonDecoderRelation], + profile: CarbonProfile, + aliasMap: CarbonAliasDecoderRelation): Array[(String, ColumnIdentifier, CarbonDimension)] = { + output.map { attribute => + val attr = aliasMap.getOrElse(attribute, attribute) + val relation = relations.find(p => p.contains(attr)) + if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) { + val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable + val carbonDimension = + carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) + if (carbonDimension != null && + carbonDimension.hasEncoding(Encoding.DICTIONARY) && + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && + !carbonDimension.isComplex) { + (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, + carbonDimension) + } else { + (null, null, null) + } + } else { + (null, null, null) + } + }.toArray + } + + def isRequiredToDecode(colIdents: Array[(String, ColumnIdentifier, CarbonDimension)]): Boolean = { + colIdents.find(p => p._1 != null) match { + case Some(value) => true + case _ => false + } + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8f59a326/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index ed5d362..976759a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -61,7 +61,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { a.map(_.name).toArray, f), needDecoder)) :: Nil case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => - if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) { + if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) || + !CarbonDictionaryDecoder. + isRequiredToDecode(CarbonDictionaryDecoder. + getDictionaryColumnMapping(child.output, relations, profile, aliasMap))) { planLater(child) :: Nil } else { CarbonDictionaryDecoder(relations,
