[CARBONDATA-2760] Reduce Memory footprint and store size for local dictionary encoded columns
Problem: Local dictionary encoded page is using unsafevarlenghtcolumn column page which internally maintains offset of each value in another column page because of this memory footprint is high. for complex primitive string data type column page while compressing, it is converting to LV even if it is encoded with dictionary values, because of this store size is high. Solution: Use UnsafeFixedLength Column page for local dictionary encoded columns No need to convert to LV during query if local dictionary is present so use UnsafeFixLength Column page This closes #2529 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/714487c4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/714487c4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/714487c4 Branch: refs/heads/branch-1.4 Commit: 714487c425cacdfe46977912c2d544ec909c061b Parents: 78a7371 Author: kumarvishal09 <[email protected]> Authored: Thu Jul 19 15:52:34 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue Jul 31 00:10:41 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 2 + ...mpressedDimensionChunkFileBasedReaderV3.java | 11 +++-- .../core/datastore/page/ColumnPage.java | 15 ++++-- .../datastore/page/SafeFixLengthColumnPage.java | 51 +++++++++++++------- .../page/UnsafeFixLengthColumnPage.java | 28 +++++++++-- .../page/encoding/ColumnPageDecoder.java | 2 + .../adaptive/AdaptiveDeltaFloatingCodec.java | 7 ++- .../adaptive/AdaptiveDeltaIntegralCodec.java | 7 ++- .../adaptive/AdaptiveFloatingCodec.java | 7 ++- .../adaptive/AdaptiveIntegralCodec.java | 7 ++- .../encoding/compress/DirectCompressCodec.java | 8 ++- .../datastore/page/encoding/rle/RLECodec.java | 5 ++ .../localdictionary/PageLevelDictionary.java | 2 +- .../executer/IncludeFilterExecuterImpl.java | 5 +- 14 files changed, 120 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index e7e074d..8bf22c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1878,6 +1878,8 @@ public final class CarbonCommonConstants { public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true"; + public static final short LOCAL_DICT_ENCODED_BYTEARRAY_SIZE = 3; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index 32f84f7..ef2c189 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; @@ -209,12 +210,13 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead } private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, - ByteBuffer pageData, int offset) + ByteBuffer pageData, int offset, boolean isLocalDictEncodedPage) throws IOException, MemoryException { List<Encoding> encodings = pageMetadata.getEncoders(); List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta(); ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas); - return decoder.decode(pageData.array(), offset, pageMetadata.data_page_length); + return decoder + .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage); } private boolean isEncodedWithMeta(DataChunk2 pageMetadata) { @@ -238,7 +240,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException, MemoryException { if (isEncodedWithMeta(pageMetadata)) { - ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset); + ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset, + null != rawColumnPage.getLocalDictionary()); decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), isEncodedWithAdaptiveMeta(pageMetadata)); @@ -289,7 +292,7 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, null == rawColumnPage.getLocalDictionary() ? eachColumnValueSize[rawColumnPage.getColumnIndex()] : - 3); + CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE); } DimensionColumnPage columnDataChunk = null; http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 75e47de..1330aa3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -141,7 +141,7 @@ public abstract class ColumnPage { throw new RuntimeException(e); } } else { - return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize); } } @@ -167,11 +167,12 @@ public abstract class ColumnPage { boolean isComplexTypePrimitive) throws MemoryException { if (unsafe) { return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new UnsafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), + new UnsafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize, + CarbonCommonConstants.LOCAL_DICT_ENCODED_BYTEARRAY_SIZE), localDictionaryGenerator, isComplexTypePrimitive); } else { return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), - new SafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), + new SafeFixLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), localDictionaryGenerator, isComplexTypePrimitive); } } @@ -718,7 +719,7 @@ public abstract class ColumnPage { * except for decimal page */ public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compressedData, - int offset, int length) + int offset, int length, boolean isLVEncoded) throws MemoryException { Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName()); TableSpec.ColumnSpec columnSpec = meta.getColumnSpec(); @@ -744,12 +745,16 @@ public abstract class ColumnPage { } else if (storeDataType == DataTypes.DOUBLE) { double[] doubleData = compressor.unCompressDouble(compressedData, offset, length); return newDoublePage(columnSpec, doubleData); - } else if (storeDataType == DataTypes.BYTE_ARRAY && ( + } else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && ( columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); return newComplexLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + } else if (isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && + columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { + byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); + return newFixedByteArrayPage(columnSpec, lvVarBytes, 3); } else if (storeDataType == DataTypes.BYTE_ARRAY && columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index 74bb3fe..82f1510 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.page; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; @@ -48,11 +50,6 @@ public class SafeFixLengthColumnPage extends ColumnPage { this.fixedLengthdata = new byte[pageSize][]; } - SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, - int eachRowSize) { - super(columnSpec, dataType, pageSize); - this.fixedLengthdata = new byte[pageSize][]; - } /** * Set byte value at rowId */ @@ -108,7 +105,9 @@ public class SafeFixLengthColumnPage extends ColumnPage { */ @Override public void putBytes(int rowId, byte[] bytes) { + ensureArraySize(rowId, DataTypes.BYTE_ARRAY); this.fixedLengthdata[rowId] = bytes; + arrayElementCount++; } @Override @@ -258,9 +257,12 @@ public class SafeFixLengthColumnPage extends ColumnPage { /** * Get string page */ - @Override - public byte[][] getByteArrayPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + @Override public byte[][] getByteArrayPage() { + byte[][] data = new byte[arrayElementCount][]; + for (int i = 0; i < arrayElementCount; i++) { + data[i] = fixedLengthdata[i]; + } + return data; } @Override @@ -269,8 +271,13 @@ public class SafeFixLengthColumnPage extends ColumnPage { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() { - throw new UnsupportedOperationException("internal error"); + public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(stream); + for (int i = 0; i < arrayElementCount; i++) { + out.write(fixedLengthdata[i]); + } + return stream.toByteArray(); } @Override @@ -350,6 +357,7 @@ public class SafeFixLengthColumnPage extends ColumnPage { floatData = null; doubleData = null; shortIntData = null; + fixedLengthdata = null; } /** @@ -391,46 +399,55 @@ public class SafeFixLengthColumnPage extends ColumnPage { private void ensureArraySize(int requestSize, DataType dataType) { if (dataType == DataTypes.BYTE) { if (requestSize >= byteData.length) { - byte[] newArray = new byte[arrayElementCount + 16]; + byte[] newArray = new byte[arrayElementCount * 2]; System.arraycopy(byteData, 0, newArray, 0, arrayElementCount); byteData = newArray; } } else if (dataType == DataTypes.SHORT) { if (requestSize >= shortData.length) { - short[] newArray = new short[arrayElementCount + 16]; + short[] newArray = new short[arrayElementCount * 2]; System.arraycopy(shortData, 0, newArray, 0, arrayElementCount); shortData = newArray; } } else if (dataType == DataTypes.SHORT_INT) { if (requestSize >= shortIntData.length / 3) { - byte[] newArray = new byte[(arrayElementCount * 3) + (16 * 3)]; + byte[] newArray = new byte[arrayElementCount * 6]; System.arraycopy(shortIntData, 0, newArray, 0, arrayElementCount * 3); shortIntData = newArray; } } else if (dataType == DataTypes.INT) { if (requestSize >= intData.length) { - int[] newArray = new int[arrayElementCount + 16]; + int[] newArray = new int[arrayElementCount * 2]; System.arraycopy(intData, 0, newArray, 0, arrayElementCount); intData = newArray; } } else if (dataType == DataTypes.LONG) { if (requestSize >= longData.length) { - long[] newArray = new long[arrayElementCount + 16]; + long[] newArray = new long[arrayElementCount * 2]; System.arraycopy(longData, 0, newArray, 0, arrayElementCount); longData = newArray; } } else if (dataType == DataTypes.FLOAT) { if (requestSize >= floatData.length) { - float[] newArray = new float[arrayElementCount + 16]; + float[] newArray = new float[arrayElementCount * 2]; System.arraycopy(floatData, 0, newArray, 0, arrayElementCount); floatData = newArray; } } else if (dataType == DataTypes.DOUBLE) { if (requestSize >= doubleData.length) { - double[] newArray = new double[arrayElementCount + 16]; + double[] newArray = new double[arrayElementCount * 2]; System.arraycopy(doubleData, 0, newArray, 0, arrayElementCount); doubleData = newArray; } + } else if (dataType == DataTypes.BYTE_ARRAY) { + if (requestSize >= fixedLengthdata.length) { + byte[][] newArray = new byte[arrayElementCount * 2][]; + int index = 0; + for (byte[] data : fixedLengthdata) { + newArray[index++] = data; + } + fixedLengthdata = newArray; + } } else { throw new UnsupportedOperationException( "not support value conversion on " + dataType + " page"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index c5805d5..bcb74c0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -201,12 +201,18 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public void putBytes(int rowId, byte[] bytes) { + try { + ensureMemory(eachRowSize); + } catch (MemoryException e) { + throw new RuntimeException(e); + } // copy the data to memory long offset = (long)rowId * eachRowSize; CarbonUnsafe.getUnsafe() .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), baseOffset + offset, bytes.length); updatePageSize(rowId); + totalLength += eachRowSize; } @Override @@ -355,16 +361,28 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public byte[][] getByteArrayPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + byte[][] data = new byte[getEndLoop()][eachRowSize]; + long offset = baseOffset; + for (int i = 0; i < data.length; i++) { + //copy the row from memory block based on offset + // offset position will be index * each column value length + CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(), offset, data[i], + CarbonUnsafe.BYTE_ARRAY_OFFSET, eachRowSize); + offset += eachRowSize; + } + return data; } @Override public byte[] getLVFlattenedBytePage() { throw new UnsupportedOperationException("invalid data type: " + dataType); } - @Override - public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { - throw new UnsupportedOperationException("invalid data type: " + dataType); + + @Override public byte[] getComplexChildrenLVFlattenedBytePage() { + byte[] data = new byte[totalLength]; + CarbonUnsafe.getUnsafe() + .copyMemory(baseAddress, baseOffset, data, CarbonUnsafe.BYTE_ARRAY_OFFSET, totalLength); + return data; } @Override @@ -485,6 +503,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { return totalLength / DataTypes.FLOAT.getSizeInBytes(); } else if (dataType == DataTypes.DOUBLE) { return totalLength / DataTypes.DOUBLE.getSizeInBytes(); + } else if (dataType == DataTypes.BYTE_ARRAY) { + return totalLength / eachRowSize; } else { throw new UnsupportedOperationException("invalid data type: " + dataType); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java index 4cdd819..4e491c5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java @@ -29,4 +29,6 @@ public interface ColumnPageDecoder { */ ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException; + ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java index 421a17a..0e8d1c0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java @@ -104,9 +104,14 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException { - ColumnPage page = ColumnPage.decompress(meta, input, offset, length); + ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false); return LazyColumnPage.newPage(page, converter); } + + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException { + return decode(input, offset, length); + } }; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java index e98397d..f20422c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -117,10 +117,15 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { if (DataTypes.isDecimal(meta.getSchemaDataType())) { page = ColumnPage.decompressDecimalPage(meta, input, offset, length); } else { - page = ColumnPage.decompress(meta, input, offset, length); + page = ColumnPage.decompress(meta, input, offset, length, false); } return LazyColumnPage.newPage(page, converter); } + + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException { + return decode(input, offset, length); + } }; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java index 7ac908f..6d7697b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java @@ -96,9 +96,14 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException { - ColumnPage page = ColumnPage.decompress(meta, input, offset, length); + ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false); return LazyColumnPage.newPage(page, converter); } + + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException { + return decode(input, offset, length); + } }; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java index b65296d..cfc26c7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java @@ -97,10 +97,15 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { if (DataTypes.isDecimal(meta.getSchemaDataType())) { page = ColumnPage.decompressDecimalPage(meta, input, offset, length); } else { - page = ColumnPage.decompress(meta, input, offset, length); + page = ColumnPage.decompress(meta, input, offset, length, false); } return LazyColumnPage.newPage(page, converter); } + + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException { + return decode(input, offset, length); + } }; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java index 4c1bc49..7e1e9dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java @@ -108,10 +108,16 @@ public class DirectCompressCodec implements ColumnPageCodec { if (DataTypes.isDecimal(dataType)) { decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length); } else { - decodedPage = ColumnPage.decompress(meta, input, offset, length); + decodedPage = ColumnPage.decompress(meta, input, offset, length, false); } return LazyColumnPage.newPage(decodedPage, converter); } + + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException { + return LazyColumnPage + .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter); + } } private ColumnPageValueConverter converter = new ColumnPageValueConverter() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java index 7007084..fa03809 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java @@ -308,6 +308,11 @@ public class RLECodec implements ColumnPageCodec { return resultPage; } + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + throws MemoryException, IOException { + return decode(input, offset, length); + } + private void decodeBytePage(DataInputStream in, ColumnPage decodedPage) throws IOException { int rowId = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java index 10a4e6e..419fd9e 100644 --- a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java @@ -56,7 +56,7 @@ public class PageLevelDictionary { private DataType dataType; - private boolean isComplexTypePrimitive; + private boolean isComplexTypePrimitive; public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName, DataType dataType, boolean isComplexTypePrimitive) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/714487c4/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java index 91f46be..974830d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java @@ -106,15 +106,18 @@ public class IncludeFilterExecuterImpl implements FilterExecuter { DimensionRawColumnChunk dimensionRawColumnChunk = rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex]; BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount()); + filterValues = dimColumnExecuterInfo.getFilterKeys(); + boolean isDecoded = false; for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) { if (dimensionRawColumnChunk.getMaxValues() != null) { if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i], dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys())) { DimensionColumnPage dimensionColumnPage = dimensionRawColumnChunk.decodeColumnPage(i); - if (null == filterValues) { + if (!isDecoded) { filterValues = FilterUtil .getEncodedFilterValues(dimensionRawColumnChunk.getLocalDictionary(), dimColumnExecuterInfo.getFilterKeys()); + isDecoded = true; } BitSet bitSet = getFilteredIndexes(dimensionColumnPage, dimensionRawColumnChunk.getRowCount()[i], useBitsetPipeLine,
