http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java index d85d6cd..9bed89f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecoderBasedFallbackEncoder.java @@ -27,7 +27,6 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.Encoding; @@ -65,7 +64,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu int[] rlePage; // uncompress the encoded column page - byte[] bytes = CompressorFactory.getInstance().getCompressor() + byte[] bytes = CompressorFactory.getInstance().getCompressor( + encodedColumnPage.getActualPage().getColumnPageEncoderMeta().getCompressorName()) .unCompressByte(encodedColumnPage.getEncodedData().array(), offset, encodedColumnPage.getPageMetadata().data_page_length); @@ -94,15 +94,10 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu // disable encoding using local dictionary encodedColumnPage.getActualPage().disableLocalDictEncoding(); - // get column spec for existing column page - TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec(); - - // get the dataType of column - DataType dataType = encodedColumnPage.getActualPage().getDataType(); - // create a new column page which will have actual data instead of encoded data ColumnPage actualDataColumnPage = - ColumnPage.newPage(columnSpec, dataType, encodedColumnPage.getActualPage().getPageSize()); + ColumnPage.newPage(encodedColumnPage.getActualPage().getColumnPageEncoderMeta(), + encodedColumnPage.getActualPage().getPageSize()); // uncompressed data from encoded column page is dictionary data, get the dictionary data using // keygenerator @@ -120,6 +115,8 @@ public class DecoderBasedFallbackEncoder implements Callable<FallbackEncodedColu .putBytes(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(keyArray)); } + // get column spec for existing column page + TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec(); FallbackEncodedColumnPage fallBackEncodedColumnPage = CarbonUtil.getFallBackEncodedColumnPage(actualDataColumnPage, pageIndex, columnSpec); // here freeing the memory of new column page created as fallback is done and
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java index 255e078..605fe4e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java @@ -37,7 +37,7 @@ public class LazyColumnPage extends ColumnPage { private ColumnPageValueConverter converter; private LazyColumnPage(ColumnPage columnPage, ColumnPageValueConverter converter) { - super(columnPage.getColumnSpec(), columnPage.getDataType(), columnPage.getPageSize()); + super(columnPage.getColumnPageEncoderMeta(), columnPage.getPageSize()); this.columnPage = columnPage; this.converter = converter; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java index 904d7ef..fced016 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java @@ -76,14 +76,13 @@ public class LocalDictColumnPage extends ColumnPage { protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage, LocalDictionaryGenerator localDictionaryGenerator, boolean isComplexTypePrimitive, boolean isDecoderBasedFallBackEnabled) { - super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(), - actualDataColumnPage.getPageSize()); + super(actualDataColumnPage.getColumnPageEncoderMeta(), actualDataColumnPage.getPageSize()); // if threshold is not reached then create page level dictionary // for encoding with local dictionary if (!localDictionaryGenerator.isThresholdReached()) { pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator, actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType(), - isComplexTypePrimitive); + isComplexTypePrimitive, actualDataColumnPage.getColumnCompressorName()); this.encodedDataColumnPage = encodedColumnpage; this.keyGenerator = KeyGeneratorFactory .getKeyGenerator(new int[] { CarbonCommonConstants.LOCAL_DICTIONARY_MAX + 1 }); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java index 89ac4a4..d3e945d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java @@ -19,8 +19,7 @@ package org.apache.carbondata.core.datastore.page; import java.math.BigDecimal; -import org.apache.carbondata.core.datastore.TableSpec; -import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.util.ByteUtil; /** @@ -36,8 +35,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage { private byte[] shortIntData; private byte[][] byteArrayData; - SafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { - super(columnSpec, dataType, pageSize); + SafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { + super(columnPageEncoderMeta, pageSize); byteArrayData = new byte[pageSize][]; } @@ -189,8 +188,8 @@ public class SafeDecimalColumnPage extends DecimalColumnPage { } break; default: - throw new UnsupportedOperationException( - "not support value conversion on " + dataType + " page"); + throw new UnsupportedOperationException("not support value conversion on " + + columnPageEncoderMeta.getStoreDataType() + " page"); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index 82f1510..b355220 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -22,7 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; -import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.ByteUtil; @@ -45,8 +45,8 @@ public class SafeFixLengthColumnPage extends ColumnPage { // total number of entries in array private int arrayElementCount = 0; - SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { - super(columnSpec, dataType, pageSize); + SafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { + super(columnPageEncoderMeta, pageSize); this.fixedLengthdata = new byte[pageSize][]; } @@ -120,17 +120,20 @@ public class SafeFixLengthColumnPage extends ColumnPage { @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putDecimal(int rowId, BigDecimal decimal) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public byte[] getDecimalPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } /** @@ -190,7 +193,8 @@ public class SafeFixLengthColumnPage extends ColumnPage { } @Override public BigDecimal getDecimal(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -267,7 +271,8 @@ public class SafeFixLengthColumnPage extends ColumnPage { @Override public byte[] getLVFlattenedBytePage() throws IOException { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -345,7 +350,8 @@ public class SafeFixLengthColumnPage extends ColumnPage { */ @Override public void setByteArrayPage(byte[][] byteArray) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -366,33 +372,33 @@ public class SafeFixLengthColumnPage extends ColumnPage { */ @Override public void convertValue(ColumnPageValueConverter codec) { - if (dataType == DataTypes.BYTE) { + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) { for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, byteData[i]); } - } else if (dataType == DataTypes.SHORT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) { for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, shortData[i]); } - } else if (dataType == DataTypes.INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) { for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, intData[i]); } - } else if (dataType == DataTypes.LONG) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) { for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, longData[i]); } - } else if (dataType == DataTypes.FLOAT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) { for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, floatData[i]); } - } else if (dataType == DataTypes.DOUBLE) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) { for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, doubleData[i]); } } else { - throw new UnsupportedOperationException("not support value conversion on " + - dataType + " page"); + throw new UnsupportedOperationException("not support value conversion on " + + columnPageEncoderMeta.getStoreDataType() + " page"); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java index 274b8a7..9b47e86 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java @@ -24,16 +24,15 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.datastore.TableSpec; -import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { // for string and decimal data private List<byte[]> byteArrayData; - SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { - super(columnSpec, dataType, pageSize); + SafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { + super(columnPageEncoderMeta, pageSize); byteArrayData = new ArrayList<>(); } @@ -54,12 +53,14 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { } @Override public void putDecimal(int rowId, BigDecimal decimal) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public BigDecimal getDecimal(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java index 96aeac2..829fad4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java @@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page; import java.math.BigDecimal; -import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.ByteUtil; @@ -32,36 +31,35 @@ import org.apache.carbondata.core.util.ByteUtil; */ public class UnsafeDecimalColumnPage extends DecimalColumnPage { - UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) + UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) throws MemoryException { - super(columnSpec, dataType, pageSize); - capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR); - initMemory(); + this(columnPageEncoderMeta, pageSize, (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR)); } - UnsafeDecimalColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, - int capacity) throws MemoryException { - super(columnSpec, dataType, pageSize); + UnsafeDecimalColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int capacity) + throws MemoryException { + super(columnPageEncoderMeta, pageSize); this.capacity = capacity; initMemory(); } private void initMemory() throws MemoryException { - if (dataType == DataTypes.BYTE || - dataType == DataTypes.SHORT || - dataType == DataTypes.INT || - dataType == DataTypes.LONG) { - int size = pageSize << dataType.getSizeBits(); + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE || + columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT || + columnPageEncoderMeta.getStoreDataType() == DataTypes.INT || + columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) { + int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits(); memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); - } else if (dataType == DataTypes.SHORT_INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) { int size = pageSize * 3; memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); - } else if (DataTypes.isDecimal(dataType)) { + } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType())) { memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity)); - } else if (dataType == DataTypes.BYTE_ARRAY) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) { memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity)); } else { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); @@ -255,8 +253,8 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage { } break; default: - throw new UnsupportedOperationException( - "not support value conversion on " + dataType + " page"); + throw new UnsupportedOperationException("not support value conversion on " + + columnPageEncoderMeta.getStoreDataType() + " page"); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index f75deb6..8a53840 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -20,13 +20,12 @@ package org.apache.carbondata.core.datastore.page; import java.io.IOException; import java.math.BigDecimal; -import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; @@ -61,40 +60,41 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { private static final int floatBits = DataTypes.FLOAT.getSizeBits(); private static final int doubleBits = DataTypes.DOUBLE.getSizeBits(); - UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) + UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) throws MemoryException { - super(columnSpec, dataType, pageSize); - if (dataType == DataTypes.BOOLEAN || - dataType == DataTypes.BYTE || - dataType == DataTypes.SHORT || - dataType == DataTypes.INT || - dataType == DataTypes.LONG || - dataType == DataTypes.FLOAT || - dataType == DataTypes.DOUBLE) { - int size = pageSize << dataType.getSizeBits(); + super(columnPageEncoderMeta, pageSize); + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BOOLEAN || + columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE || + columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT || + columnPageEncoderMeta.getStoreDataType() == DataTypes.INT || + columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG || + columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT || + columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) { + int size = pageSize << columnPageEncoderMeta.getStoreDataType().getSizeBits(); memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); capacity = size; - } else if (dataType == DataTypes.SHORT_INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) { int size = pageSize * 3; memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); capacity = size; - } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + } else if (DataTypes.isDecimal(columnPageEncoderMeta.getStoreDataType()) || + columnPageEncoderMeta.getStoreDataType() == DataTypes.STRING) { + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } totalLength = 0; } - UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, - int eachRowSize) - throws MemoryException { - this(columnSpec, dataType, pageSize); + UnsafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, + int eachRowSize) throws MemoryException { + this(columnPageEncoderMeta, pageSize); this.eachRowSize = eachRowSize; totalLength = 0; - if (dataType == DataTypes.BYTE_ARRAY) { + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) { memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize); baseAddress = memoryBlock.getBaseObject(); @@ -217,11 +217,13 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putDecimal(int rowId, BigDecimal decimal) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -272,7 +274,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public BigDecimal getDecimal(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -288,7 +291,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { } @Override public byte[] getDecimalPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -375,7 +379,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public byte[] getLVFlattenedBytePage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public byte[] getComplexChildrenLVFlattenedBytePage() { @@ -441,7 +446,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public void setByteArrayPage(byte[][] byteArray) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } public void freeMemory() { @@ -455,68 +461,70 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public void convertValue(ColumnPageValueConverter codec) { int endLoop = getEndLoop(); - if (dataType == DataTypes.BYTE) { + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) { for (long i = 0; i < endLoop; i++) { long offset = i << byteBits; codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset)); } - } else if (dataType == DataTypes.SHORT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) { for (long i = 0; i < endLoop; i++) { long offset = i << shortBits; codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset)); } - } else if (dataType == DataTypes.INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) { for (long i = 0; i < endLoop; i++) { long offset = i << intBits; codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset)); } - } else if (dataType == DataTypes.LONG) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) { for (long i = 0; i < endLoop; i++) { long offset = i << longBits; codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset)); } - } else if (dataType == DataTypes.FLOAT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) { for (long i = 0; i < endLoop; i++) { long offset = i << floatBits; codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset)); } - } else if (dataType == DataTypes.DOUBLE) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) { for (long i = 0; i < endLoop; i++) { long offset = i << doubleBits; codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset)); } } else { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } } private int getEndLoop() { - if (dataType == DataTypes.BYTE) { + if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE) { return totalLength / ByteUtil.SIZEOF_BYTE; - } else if (dataType == DataTypes.SHORT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT) { return totalLength / ByteUtil.SIZEOF_SHORT; - } else if (dataType == DataTypes.SHORT_INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.SHORT_INT) { return totalLength / ByteUtil.SIZEOF_SHORT_INT; - } else if (dataType == DataTypes.INT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.INT) { return totalLength / ByteUtil.SIZEOF_INT; - } else if (dataType == DataTypes.LONG) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.LONG) { return totalLength / ByteUtil.SIZEOF_LONG; - } else if (dataType == DataTypes.FLOAT) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.FLOAT) { return totalLength / DataTypes.FLOAT.getSizeInBytes(); - } else if (dataType == DataTypes.DOUBLE) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.DOUBLE) { return totalLength / DataTypes.DOUBLE.getSizeInBytes(); - } else if (dataType == DataTypes.BYTE_ARRAY) { + } else if (columnPageEncoderMeta.getStoreDataType() == DataTypes.BYTE_ARRAY) { return totalLength / eachRowSize; } else { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } } @Override public byte[] compress(Compressor compressor) throws MemoryException, IOException { - if (UnsafeMemoryManager.isOffHeap()) { + if (UnsafeMemoryManager.isOffHeap() && compressor.supportUnsafe()) { // use raw compression and copy to byte[] int inputSize = totalLength; - int compressedMaxSize = compressor.maxCompressedLength(inputSize); + long compressedMaxSize = compressor.maxCompressedLength(inputSize); MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize); long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java index ae57dcd..4693dba 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java @@ -19,11 +19,10 @@ package org.apache.carbondata.core.datastore.page; import java.math.BigDecimal; -import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; -import org.apache.carbondata.core.metadata.datatype.DataType; /** * This extension uses unsafe memory to store page data, for variable length data type (string) @@ -33,9 +32,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { /** * create a page */ - UnsafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) + UnsafeVarLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) throws MemoryException { - super(columnSpec, dataType, pageSize); + super(columnPageEncoderMeta, pageSize); capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR); memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity)); baseAddress = memoryBlock.getBaseObject(); @@ -85,7 +84,8 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { @Override public BigDecimal getDecimal(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index 4edd201..7f0b2a6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; @@ -64,13 +65,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { // size of the allocated memory, in bytes int capacity; - VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { - super(columnSpec, dataType, pageSize); - TableSpec.ColumnSpec spec = TableSpec.ColumnSpec - .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE); + VarLengthColumnPageBase(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) { + super(columnPageEncoderMeta, pageSize); + TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance( + columnPageEncoderMeta.getColumnSpec().getFieldName(), DataTypes.INT, ColumnType.MEASURE); try { - rowOffset = - ColumnPage.newPage(spec, DataTypes.INT, pageSize); + rowOffset = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.INT, columnPageEncoderMeta.getCompressorName()), + pageSize); } catch (MemoryException e) { throw new RuntimeException(e); } @@ -79,44 +81,51 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { @Override public void setBytePage(byte[] byteData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setShortPage(short[] shortData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setShortIntPage(byte[] shortIntData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setIntPage(int[] intData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setLongPage(long[] longData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setFloatPage(float[] floatData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void setDoublePage(double[] doubleData) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } /** * Create a new column page for decimal page */ - static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes) - throws MemoryException { + static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes, + String compressorName) throws MemoryException { DecimalConverterFactory.DecimalConverter decimalConverter = DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(), columnSpec.getScale()); @@ -124,10 +133,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { if (size < 0) { return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()), - CarbonCommonConstants.INT_SIZE_IN_BYTE); + CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName); } else { // Here the size is always fixed. - return getDecimalColumnPage(columnSpec, lvEncodedBytes, size); + return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName); } } @@ -135,23 +144,26 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { * Create a new column page based on the LV (Length Value) encoded bytes */ static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes, - int lvLength) throws MemoryException { - return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength); + int lvLength, String compressorName) throws MemoryException { + return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, + lvLength, compressorName); } /** * Create a new column page based on the LV (Length Value) encoded bytes */ static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, int lvLength) throws MemoryException { - return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength); + byte[] lvEncodedBytes, int lvLength, String compressorName) throws MemoryException { + return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, + lvLength, compressorName); } private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, int size) throws MemoryException { + byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException { TableSpec.ColumnSpec spec = TableSpec.ColumnSpec .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE); - ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT, + ColumnPage rowOffset = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName), CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); int offset; int rowId = 0; @@ -165,9 +177,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { VarLengthColumnPageBase page; if (unsafe) { - page = new UnsafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId); + page = new UnsafeDecimalColumnPage( + new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName), + rowId); } else { - page = new SafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId); + page = new SafeDecimalColumnPage( + new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName), + rowId); } // set total length and rowOffset in page @@ -181,13 +197,14 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { } private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, DataType dataType, int lvLength) + byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName) throws MemoryException { // extract length and data, set them to rowOffset and unsafe memory correspondingly int rowId = 0; TableSpec.ColumnSpec spec = TableSpec.ColumnSpec .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE); - ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT, + ColumnPage rowOffset = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName), CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); int length; int offset; @@ -202,20 +219,19 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { counter++; } rowOffset.putInt(counter, offset); - VarLengthColumnPageBase page = - getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset, - offset); - return page; + return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, + lvLength, rowId, rowOffset, offset, compressorName); } private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, DataType dataType, int lvLength) + byte[] lvEncodedBytes, DataType dataType, int lvLength, String compressorName) throws MemoryException { // extract length and data, set them to rowOffset and unsafe memory correspondingly int rowId = 0; TableSpec.ColumnSpec spec = TableSpec.ColumnSpec .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE); - ColumnPage rowOffset = ColumnPage.newPage(spec, DataTypes.INT, + ColumnPage rowOffset = ColumnPage.newPage( + new ColumnPageEncoderMeta(spec, DataTypes.INT, compressorName), CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); int length; int offset; @@ -231,15 +247,13 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { } rowOffset.putInt(counter, offset); - VarLengthColumnPageBase page = - getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset, - offset); - return page; + return getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, + lvLength, rowId, rowOffset, offset, compressorName); } private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, ColumnPage rowOffset, - int offset) throws MemoryException { + int offset, String compressorName) throws MemoryException { int lvEncodedOffset; int length; int numRows = rowId; @@ -247,9 +261,12 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { VarLengthColumnPageBase page; int inputDataLength = offset; if (unsafe) { - page = new UnsafeDecimalColumnPage(columnSpec, dataType, numRows, inputDataLength); + page = new UnsafeDecimalColumnPage( + new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows, + inputDataLength); } else { - page = new SafeDecimalColumnPage(columnSpec, dataType, numRows); + page = new SafeDecimalColumnPage( + new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), numRows); } // set total length and rowOffset in page @@ -269,32 +286,38 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { @Override public void putByte(int rowId, byte value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putShort(int rowId, short value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putShortInt(int rowId, int value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putInt(int rowId, int value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putLong(int rowId, long value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public void putDouble(int rowId, double value) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } abstract void putBytesAtRow(int rowId, byte[] bytes); @@ -317,72 +340,86 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { @Override public byte getByte(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public short getShort(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public int getShortInt(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public int getInt(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public long getLong(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public float getFloat(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public double getDouble(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public byte[] getBytePage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public short[] getShortPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public byte[] getShortIntPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public int[] getIntPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public long[] getLongPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public float[] getFloatPage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override public double[] getDoublePage() { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } @Override @@ -445,7 +482,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { @Override public void convertValue(ColumnPageValueConverter codec) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + throw new UnsupportedOperationException( + "invalid data type: " + columnPageEncoderMeta.getStoreDataType()); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java index b5a63f8..2ed12a0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java @@ -84,7 +84,8 @@ public abstract class ColumnPageEncoder { } private void fillBasicFields(ColumnPage inputPage, DataChunk2 dataChunk) { - dataChunk.setChunk_meta(CarbonMetadataUtil.getSnappyChunkCompressionMeta()); + dataChunk.setChunk_meta( + CarbonMetadataUtil.getChunkCompressorMeta(inputPage.getColumnCompressorName())); dataChunk.setNumberOfRowsInpage(inputPage.getPageSize()); dataChunk.setRowMajor(false); } @@ -92,7 +93,8 @@ public abstract class ColumnPageEncoder { private void fillNullBitSet(ColumnPage inputPage, DataChunk2 dataChunk) { PresenceMeta presenceMeta = new PresenceMeta(); presenceMeta.setPresent_bit_streamIsSet(true); - Compressor compressor = CompressorFactory.getInstance().getCompressor(); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + inputPage.getColumnCompressorName()); presenceMeta.setPresent_bit_stream( compressor.compressByte(inputPage.getNullBits().toByteArray())); dataChunk.setPresence(presenceMeta); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java index 4e04186..971cf24 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java @@ -45,14 +45,15 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable // storage data type of this column, it could be different from data type in the column spec private DataType storeDataType; - // compressor name for compressing and decompressing this column - private String compressorName; + // compressor name for compressing and decompressing this column. + // Make it protected for RLEEncoderMeta + protected String compressorName; public ColumnPageEncoderMeta() { } public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType, - SimpleStatsResult stats, String compressorName) { + String compressorName) { if (columnSpec == null) { throw new IllegalArgumentException("columm spec must not be null"); } @@ -66,6 +67,11 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable this.storeDataType = storeDataType; this.compressorName = compressorName; setType(DataType.convertType(storeDataType)); + } + + public ColumnPageEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType storeDataType, + SimpleStatsResult stats, String compressorName) { + this(columnSpec, storeDataType, compressorName); if (stats != null) { setDecimal(stats.getDecimalCount()); setMaxValue(stats.getMax()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java index 1cc2ba8..29772d1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java @@ -20,8 +20,6 @@ package org.apache.carbondata.core.datastore.page.encoding; import java.math.BigDecimal; import org.apache.carbondata.core.datastore.TableSpec; -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.DecimalColumnPage; import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec; @@ -73,39 +71,36 @@ public class DefaultEncodingFactory extends EncodingFactory { private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec, ColumnPage inputPage) { - Compressor compressor = CompressorFactory.getInstance().getCompressor(); switch (columnSpec.getColumnType()) { case GLOBAL_DICTIONARY: case DIRECT_DICTIONARY: case PLAIN_VALUE: return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null); case COMPLEX: - return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null); + return new ComplexDimensionIndexCodec(false, false).createEncoder(null); default: - throw new RuntimeException("unsupported dimension type: " + - columnSpec.getColumnType()); + throw new RuntimeException("unsupported dimension type: " + columnSpec.getColumnType()); } } private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) { - Compressor compressor = CompressorFactory.getInstance().getCompressor(); switch (dimensionSpec.getColumnType()) { case GLOBAL_DICTIONARY: return new DictDimensionIndexCodec( dimensionSpec.isInSortColumns(), - dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(), - compressor).createEncoder(null); + dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex()) + .createEncoder(null); case DIRECT_DICTIONARY: return new DirectDictDimensionIndexCodec( dimensionSpec.isInSortColumns(), - dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(), - compressor).createEncoder(null); + dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex()) + .createEncoder(null); case PLAIN_VALUE: return new HighCardDictDimensionIndexCodec( dimensionSpec.isInSortColumns(), dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(), - dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR, - compressor).createEncoder(null); + dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR) + .createEncoder(null); default: throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getColumnType()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java index 8bc67c0..d119c8f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java @@ -64,8 +64,8 @@ public abstract class EncodingFactory { /** * Return new decoder based on encoder metadata read from file */ - public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas) - throws IOException { + public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas, + String compressor) throws IOException { assert (encodings.size() == 1); assert (encoderMetas.size() == 1); Encoding encoding = encodings.get(0); @@ -111,21 +111,20 @@ public abstract class EncodingFactory { } else { // for backward compatibility ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta); - return createDecoderLegacy(metadata); + return createDecoderLegacy(metadata, compressor); } } /** * Old way of creating decoder, based on algorithm */ - public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) { + public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) { if (null == metadata) { throw new RuntimeException("internal error"); } SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstanceLegacy("legacy", stats.getDataType(), ColumnType.MEASURE); - String compressor = "snappy"; DataType dataType = DataType.getDataType(metadata.getType()); if (dataType == DataTypes.BYTE || dataType == DataTypes.SHORT || http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java index 0e8d1c0..bb928c2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java @@ -67,16 +67,19 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec { @Override public ColumnPageEncoder createEncoder(Map<String, String> parameter) { - final Compressor compressor = CompressorFactory.getInstance().getCompressor(); return new ColumnPageEncoder() { @Override protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { if (encodedPage != null) { throw new IllegalStateException("already encoded"); } - encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType, + encodedPage = ColumnPage.newPage( + new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), + targetDataType, input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize()); input.convertValue(converter); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + input.getColumnCompressorName()); byte[] result = encodedPage.compress(compressor); encodedPage.freeMemory(); return result; @@ -92,7 +95,7 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec { @Override protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats, - compressor.getName()); + inputPage.getColumnCompressorName()); } }; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java index f20422c..ac9693d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -78,16 +78,19 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { @Override public ColumnPageEncoder createEncoder(Map<String, String> parameter) { return new ColumnPageEncoder() { - final Compressor compressor = CompressorFactory.getInstance().getCompressor(); @Override protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { if (encodedPage != null) { throw new IllegalStateException("already encoded"); } - encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType, + encodedPage = ColumnPage.newPage( + new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), + targetDataType, input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize()); input.convertValue(converter); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + input.getColumnCompressorName()); byte[] result = encodedPage.compress(compressor); encodedPage.freeMemory(); return result; @@ -96,7 +99,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { @Override protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, - inputPage.getStatistics(), compressor.getName()); + inputPage.getStatistics(), inputPage.getColumnCompressorName()); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java index 6d7697b..028fa71 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java @@ -59,15 +59,18 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec { @Override public ColumnPageEncoder createEncoder(Map<String, String> parameter) { - final Compressor compressor = CompressorFactory.getInstance().getCompressor(); return new ColumnPageEncoder() { @Override protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { if (encodedPage != null) { throw new IllegalStateException("already encoded"); } - encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType, + encodedPage = ColumnPage.newPage( + new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), + targetDataType, input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize()); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + input.getColumnCompressorName()); input.convertValue(converter); byte[] result = encodedPage.compress(compressor); encodedPage.freeMemory(); @@ -84,7 +87,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec { @Override protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats, - compressor.getName()); + inputPage.getColumnCompressorName()); } }; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java index cfc26c7..a9cf742 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java @@ -56,15 +56,18 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { @Override public ColumnPageEncoder createEncoder(Map<String, String> parameter) { - final Compressor compressor = CompressorFactory.getInstance().getCompressor(); return new ColumnPageEncoder() { @Override protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { if (encodedPage != null) { throw new IllegalStateException("already encoded"); } - encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType, + encodedPage = ColumnPage.newPage( + new ColumnPageEncoderMeta(input.getColumnPageEncoderMeta().getColumnSpec(), + targetDataType, input.getColumnPageEncoderMeta().getCompressorName()), input.getPageSize()); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + input.getColumnCompressorName()); input.convertValue(converter); byte[] result = encodedPage.compress(compressor); encodedPage.freeMemory(); @@ -81,7 +84,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { @Override protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats, - compressor.getName()); + inputPage.getColumnCompressorName()); } }; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java index 7e1e9dd..aa03ec1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -55,69 +54,53 @@ public class DirectCompressCodec implements ColumnPageCodec { @Override public ColumnPageEncoder createEncoder(Map<String, String> parameter) { - // TODO: make compressor configurable in create table - return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); - } - - @Override - public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) { - return new DirectDecompressor(meta); - } - - private class DirectCompressor extends ColumnPageEncoder { - - private Compressor compressor; - - DirectCompressor(String compressorName) { - this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); - } - - @Override - protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { - return input.compress(compressor); - } + return new ColumnPageEncoder() { - @Override - protected List<Encoding> getEncodingList() { - List<Encoding> encodings = new ArrayList<>(); - encodings.add(dataType == DataTypes.VARCHAR ? - Encoding.DIRECT_COMPRESS_VARCHAR : - Encoding.DIRECT_COMPRESS); - return encodings; - } + @Override + protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { + Compressor compressor = CompressorFactory.getInstance().getCompressor( + input.getColumnCompressorName()); + return input.compress(compressor); + } - @Override - protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { - return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(), - inputPage.getStatistics(), compressor.getName()); - } + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<>(); + encodings.add(dataType == DataTypes.VARCHAR ? + Encoding.DIRECT_COMPRESS_VARCHAR : + Encoding.DIRECT_COMPRESS); + return encodings; + } + @Override + protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { + return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(), + inputPage.getStatistics(), inputPage.getColumnCompressorName()); + } + }; } - private class DirectDecompressor implements ColumnPageDecoder { - - private ColumnPageEncoderMeta meta; - - DirectDecompressor(ColumnPageEncoderMeta meta) { - this.meta = meta; - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - ColumnPage decodedPage; - if (DataTypes.isDecimal(dataType)) { - decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length); - } else { - decodedPage = ColumnPage.decompress(meta, input, offset, length, false); + @Override + public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) { + return new ColumnPageDecoder() { + + @Override + public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { + ColumnPage decodedPage; + if (DataTypes.isDecimal(dataType)) { + decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length); + } else { + decodedPage = ColumnPage.decompress(meta, input, offset, length, false); + } + return LazyColumnPage.newPage(decodedPage, converter); } - return LazyColumnPage.newPage(decodedPage, converter); - } - @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) + @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) throws MemoryException, IOException { - return LazyColumnPage - .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter); - } + return LazyColumnPage.newPage( + ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter); + } + }; } private ColumnPageValueConverter converter = new ColumnPageValueConverter() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java index e37b8f6..cc044cc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; import org.apache.carbondata.core.datastore.columnar.IndexStorage; import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; import org.apache.carbondata.core.util.ByteUtil; @@ -31,9 +32,8 @@ import org.apache.carbondata.format.Encoding; public class ComplexDimensionIndexCodec extends IndexStorageCodec { - public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, - Compressor compressor) { - super(isSort, isInvertedIndex, compressor); + public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) { + super(isSort, isInvertedIndex); } @Override @@ -49,6 +49,8 @@ public class ComplexDimensionIndexCodec extends IndexStorageCodec { IndexStorage indexStorage = new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false); byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + inputPage.getColumnCompressorName()); byte[] compressed = compressor.compressByte(flattened); super.indexStorage = indexStorage; super.compressedDataPage = compressed; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java index d157654..66f5f1d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; import org.apache.carbondata.core.datastore.columnar.IndexStorage; import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; import org.apache.carbondata.core.util.ByteUtil; @@ -32,8 +33,8 @@ import org.apache.carbondata.format.Encoding; public class DictDimensionIndexCodec extends IndexStorageCodec { - public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) { - super(isSort, isInvertedIndex, compressor); + public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) { + super(isSort, isInvertedIndex); } @Override @@ -54,6 +55,8 @@ public class DictDimensionIndexCodec extends IndexStorageCodec { indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false); } byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + inputPage.getColumnCompressorName()); super.compressedDataPage = compressor.compressByte(flattened); super.indexStorage = indexStorage; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java index 1e5015b..a130cbd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; import org.apache.carbondata.core.datastore.columnar.IndexStorage; import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; import org.apache.carbondata.core.util.ByteUtil; @@ -32,9 +33,8 @@ import org.apache.carbondata.format.Encoding; public class DirectDictDimensionIndexCodec extends IndexStorageCodec { - public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, - Compressor compressor) { - super(isSort, isInvertedIndex, compressor); + public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex) { + super(isSort, isInvertedIndex); } @Override @@ -55,6 +55,8 @@ public class DirectDictDimensionIndexCodec extends IndexStorageCodec { indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false); } byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + inputPage.getColumnCompressorName()); super.compressedDataPage = compressor.compressByte(flattened); super.indexStorage = indexStorage; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java index f9c124f..bce8523 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; import org.apache.carbondata.core.datastore.columnar.IndexStorage; import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; import org.apache.carbondata.core.util.ByteUtil; @@ -37,8 +38,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec { private boolean isVarcharType; public HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, - boolean isVarcharType, Compressor compressor) { - super(isSort, isInvertedIndex, compressor); + boolean isVarcharType) { + super(isSort, isInvertedIndex); this.isVarcharType = isVarcharType; } @@ -63,6 +64,8 @@ public class HighCardDictDimensionIndexCodec extends IndexStorageCodec { new BlockIndexerStorageForNoInvertedIndexForShort(data, isDictionary); } byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + Compressor compressor = CompressorFactory.getInstance().getCompressor( + input.getColumnCompressorName()); super.compressedDataPage = compressor.compressByte(flattened); super.indexStorage = indexStorage; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java index cb6b387..13a9215 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/IndexStorageCodec.java @@ -17,20 +17,17 @@ package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy; -import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; public abstract class IndexStorageCodec implements ColumnPageCodec { - protected Compressor compressor; protected boolean isSort; protected boolean isInvertedIndex; - IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) { + IndexStorageCodec(boolean isSort, boolean isInvertedIndex) { this.isSort = isSort; this.isInvertedIndex = isInvertedIndex; - this.compressor = compressor; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java index fa03809..e7d4118 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java @@ -66,7 +66,7 @@ public class RLECodec implements ColumnPageCodec { public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) { assert meta instanceof RLEEncoderMeta; RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta; - return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize()); + return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize(), meta.getCompressorName()); } // This codec supports integral type only @@ -151,7 +151,10 @@ public class RLECodec implements ColumnPageCodec { @Override protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { return new RLEEncoderMeta(inputPage.getColumnSpec(), - inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics()); + inputPage.getDataType(), + inputPage.getPageSize(), + inputPage.getStatistics(), + inputPage.getColumnCompressorName()); } private void putValue(Object value) throws IOException { @@ -281,11 +284,13 @@ public class RLECodec implements ColumnPageCodec { private TableSpec.ColumnSpec columnSpec; private int pageSize; + private String compressorName; - private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize) { + private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize, String compressorName) { validateDataType(columnSpec.getSchemaDataType()); this.columnSpec = columnSpec; this.pageSize = pageSize; + this.compressorName = compressorName; } @Override @@ -293,7 +298,8 @@ public class RLECodec implements ColumnPageCodec { throws MemoryException, IOException { DataType dataType = columnSpec.getSchemaDataType(); DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length)); - ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize); + ColumnPage resultPage = ColumnPage.newPage( + new ColumnPageEncoderMeta(columnSpec, dataType, compressorName), pageSize); if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) { decodeBytePage(in, resultPage); } else if (dataType == DataTypes.SHORT) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f08c4ab/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java index 8871671..25533f8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java @@ -39,8 +39,8 @@ public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable { } public RLEEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, - SimpleStatsResult stats) { - super(columnSpec, dataType, stats, ""); + SimpleStatsResult stats, String compressorName) { + super(columnSpec, dataType, stats, compressorName); this.pageSize = pageSize; }
