http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java index b122615..79c8101 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java @@ -17,11 +17,32 @@ package org.apache.carbondata.core.datastore.page.encoding; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralEncoderMeta; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec; +import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralEncoderMeta; +import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec; +import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressorEncoderMeta; +import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec; +import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta; import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; -import org.apache.carbondata.core.metadata.ColumnPageCodecMeta; import org.apache.carbondata.core.metadata.ValueEncoderMeta; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.Encoding; + +import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL; +import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL; +import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS; +import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL; /** * Base class for encoding strategy implementation. @@ -29,87 +50,69 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta; public abstract class EncodingStrategy { /** - * create codec based on the page data type and statistics + * Return new encoder for specified column + */ + public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, + ColumnPage inputPage); + + /** + * Return new decoder based on encoder metadata read from file + */ + public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas) + throws IOException { + assert (encodings.size() == 1); + assert (encoderMetas.size() == 1); + Encoding encoding = encodings.get(0); + byte[] encoderMeta = encoderMetas.get(0).array(); + ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta); + DataInputStream in = new DataInputStream(stream); + if (encoding == DIRECT_COMPRESS) { + DirectCompressorEncoderMeta metadata = new DirectCompressorEncoderMeta(); + metadata.readFields(in); + return new DirectCompressCodec(metadata.getDataType()).createDecoder(metadata); + } else if (encoding == ADAPTIVE_INTEGRAL) { + AdaptiveIntegralEncoderMeta metadata = new AdaptiveIntegralEncoderMeta(); + metadata.readFields(in); + SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); + return new AdaptiveIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(), + stats).createDecoder(metadata); + } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) { + AdaptiveDeltaIntegralEncoderMeta metadata = new AdaptiveDeltaIntegralEncoderMeta(); + metadata.readFields(in); + SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); + return new AdaptiveDeltaIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(), + stats).createDecoder(metadata); + } else if (encoding == RLE_INTEGRAL) { + RLEEncoderMeta metadata = new RLEEncoderMeta(); + metadata.readFields(in); + return new RLECodec().createDecoder(metadata); + } else { + // for backward compatibility + ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta); + return createDecoderLegacy(metadata); + } + } + + /** + * Old way of creating decoder, based on algorithm */ - public ColumnPageCodec newCodec(SimpleStatsResult stats) { - switch (stats.getDataType()) { + public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) { + SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata); + switch (metadata.getType()) { case BYTE: case SHORT: case INT: case LONG: - return newCodecForIntegralType(stats); + return DefaultEncodingStrategy.selectCodecByAlgorithm(stats).createDecoder(null); case FLOAT: case DOUBLE: - return newCodecForFloatingType(stats); case DECIMAL: - return newCodecForDecimalType(stats); case BYTE_ARRAY: // no dictionary dimension - return newCodecForByteArrayType(stats); + return new DirectCompressCodec(stats.getDataType()).createDecoder( + new DirectCompressorEncoderMeta("snappy", stats.getDataType(), stats)); default: throw new RuntimeException("unsupported data type: " + stats.getDataType()); } } - - /** - * create codec based on the page data type and statistics contained by ValueEncoderMeta - */ - public ColumnPageCodec newCodec(ValueEncoderMeta meta) { - if (meta instanceof ColumnPageCodecMeta) { - ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta; - SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta); - switch (codecMeta.getSrcDataType()) { - case BYTE: - case SHORT: - case INT: - case LONG: - return newCodecForIntegralType(stats); - case FLOAT: - case DOUBLE: - return newCodecForFloatingType(stats); - case DECIMAL: - return newCodecForDecimalType(stats); - case BYTE_ARRAY: - // no dictionary dimension - return newCodecForByteArrayType(stats); - default: - throw new RuntimeException("unsupported data type: " + stats.getDataType()); - } - } else { - SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(meta); - switch (meta.getType()) { - case BYTE: - case SHORT: - case INT: - case LONG: - return newCodecForIntegralType(stats); - case FLOAT: - case DOUBLE: - return newCodecForFloatingType(stats); - case DECIMAL: - return newCodecForDecimalType(stats); - case BYTE_ARRAY: - // no dictionary dimension - return newCodecForByteArrayType(stats); - default: - throw new RuntimeException("unsupported data type: " + stats.getDataType()); - } - } - } - - // for byte, short, int, long - abstract ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats); - - // for float, double - abstract ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats); - - // for decimal - abstract ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats); - - // for byte array - abstract ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats); - - // for dimension column - public abstract ColumnPageCodec newCodec(TableSpec.DimensionSpec dimensionSpec); - }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java new file mode 100644 index 0000000..56527cb --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +/** + * Factory to create Encoding Strategy. + * Now only a default strategy is supported which will choose encoding based on data type + * and column data stats. + */ +public class EncodingStrategyFactory { + + private static EncodingStrategy defaultStrategy = new DefaultEncodingStrategy(); + + public static EncodingStrategy getStrategy() { + // TODO: make it configurable after added new strategy + return defaultStrategy; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java deleted file mode 100644 index c1620c6..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import org.apache.carbondata.core.datastore.DimensionType; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; -import org.apache.carbondata.core.datastore.columnar.IndexStorage; -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import org.apache.carbondata.core.util.ByteUtil; - -public class HighCardDictDimensionIndexCodec extends IndexStorageCodec { - - HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) { - super(isSort, isInvertedIndex, compressor); - } - - @Override - public String getName() { - return "HighCardDictDimensionIndexCodec"; - } - - @Override - public EncodedColumnPage encode(ColumnPage input) throws MemoryException { - IndexStorage indexStorage; - byte[][] data = input.getByteArrayPage(); - if (isInvertedIndex) { - if (version == ColumnarFormatVersion.V3) { - indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort); - } else { - indexStorage = new BlockIndexerStorageForInt(data, false, true, isSort); - } - } else { - if (version == ColumnarFormatVersion.V3) { - indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true); - } else { - indexStorage = new BlockIndexerStorageForNoInvertedIndexForInt(data); - } - } - byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); - byte[] compressed = compressor.compressByte(flattened); - return new EncodedDimensionPage(input.getPageSize(), compressed, indexStorage, - DimensionType.PLAIN_VALUE); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java deleted file mode 100644 index 3122b15..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.ComplexColumnPage; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import org.apache.carbondata.core.util.CarbonProperties; - -public abstract class IndexStorageCodec implements ColumnPageCodec { - protected ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion(); - protected Compressor compressor; - protected boolean isSort; - protected boolean isInvertedIndex; - - IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) { - this.isSort = isSort; - this.isInvertedIndex = isInvertedIndex; - this.compressor = compressor; - } - - @Override - public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) { - throw new UnsupportedOperationException("internal error"); - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - throw new UnsupportedOperationException("internal error"); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java deleted file mode 100644 index dda89e0..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.ComplexColumnPage; -import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.CodecMetaFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * RLE encoding implementation for integral column page. - * This encoding keeps track of repeated-run and non-repeated-run, and make use - * of the highest bit of the length field to indicate the type of run. - * The length field is encoded as 16 bits value. (Page size must be less than 65535 rows) - * - * For example: input data {5, 5, 1, 2, 3, 3, 3, 3, 3} will be encoded to - * {0x00, 0x02, 0x05, (repeated-run, 2 values of 5) - * 0x80, 0x03, 0x01, 0x02, 0x03, (non-repeated-run, 3 values: 1, 2, 3) - * 0x00, 0x04, 0x03} (repeated-run, 4 values of 3) - */ -public class RLECodec implements ColumnPageCodec { - - enum RUN_STATE { INIT, START, REPEATED_RUN, NONREPEATED_RUN } - - private DataType dataType; - private int pageSize; - - /** - * New RLECodec - * @param dataType data type of the raw column page before encode - * @param pageSize page size of the raw column page before encode - */ - RLECodec(DataType dataType, int pageSize) { - this.dataType = dataType; - this.pageSize = pageSize; - } - - @Override - public String getName() { - return "RLECodec"; - } - - @Override - public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException { - Encoder encoder = new Encoder(); - return encoder.encode(input); - } - - @Override - public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) { - throw new UnsupportedOperationException("complex column does not support RLE encoding"); - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, - IOException { - Decoder decoder = new Decoder(dataType, pageSize); - return decoder.decode(input, offset, length); - } - - // This codec supports integral type only - private void validateDataType(DataType dataType) { - switch (dataType) { - case BYTE: - case SHORT: - case INT: - case LONG: - break; - default: - throw new UnsupportedOperationException(dataType + " is not supported for RLE"); - } - } - - private class Encoder { - // While encoding RLE, this class internally work as a state machine - // INIT state is the initial state before any value comes - // START state is the start for each run - // REPEATED_RUN state means it is collecting repeated values (`lastValue`) - // NONREPEATED_RUN state means it is collecting non-repeated values (`nonRepeatValues`) - private RUN_STATE runState; - - // count for each run, either REPEATED_RUN or NONREPEATED_RUN - private short valueCount; - - // collected value for REPEATED_RUN - private Object lastValue; - - // collected value for NONREPEATED_RUN - private List<Object> nonRepeatValues; - - // data type of input page - private DataType dataType; - - // output stream for encoded data - private ByteArrayOutputStream bao; - private DataOutputStream stream; - - private Encoder() { - this.runState = RUN_STATE.INIT; - this.valueCount = 0; - this.nonRepeatValues = new ArrayList<>(); - this.bao = new ByteArrayOutputStream(); - this.stream = new DataOutputStream(bao); - } - - private EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException { - validateDataType(input.getDataType()); - this.dataType = input.getDataType(); - switch (dataType) { - case BYTE: - byte[] bytePage = input.getBytePage(); - for (int i = 0; i < bytePage.length; i++) { - putValue(bytePage[i]); - } - break; - case SHORT: - short[] shortPage = input.getShortPage(); - for (int i = 0; i < shortPage.length; i++) { - putValue(shortPage[i]); - } - break; - case INT: - int[] intPage = input.getIntPage(); - for (int i = 0; i < intPage.length; i++) { - putValue(intPage[i]); - } - break; - case LONG: - long[] longPage = input.getLongPage(); - for (int i = 0; i < longPage.length; i++) { - putValue(longPage[i]); - } - break; - default: - throw new UnsupportedOperationException(input.getDataType() + - " does not support RLE encoding"); - } - byte[] encoded = collectResult(); - SimpleStatsResult stats = (SimpleStatsResult) input.getStatistics(); - return new EncodedMeasurePage( - input.getPageSize(), - encoded, - CodecMetaFactory.createMeta(stats, input.getDataType()), - stats.getNullBits()); - } - - private void putValue(Object value) throws IOException { - if (runState == RUN_STATE.INIT) { - startNewRun(value); - } else { - if (lastValue.equals(value)) { - putRepeatValue(value); - } else { - putNonRepeatValue(value); - } - } - } - - // when last row is reached, write out all collected data - private byte[] collectResult() throws IOException { - switch (runState) { - case REPEATED_RUN: - writeRunLength(valueCount); - writeRunValue(lastValue); - break; - case NONREPEATED_RUN: - writeRunLength(valueCount | 0x8000); - for (int i = 0; i < valueCount; i++) { - writeRunValue(nonRepeatValues.get(i)); - } - break; - default: - assert (runState == RUN_STATE.START); - writeRunLength(1); - writeRunValue(lastValue); - } - return bao.toByteArray(); - } - - private void writeRunLength(int length) throws IOException { - stream.writeShort(length); - } - - private void writeRunValue(Object value) throws IOException { - switch (dataType) { - case BYTE: - stream.writeByte((byte) value); - break; - case SHORT: - stream.writeShort((short) value); - break; - case INT: - stream.writeInt((int) value); - break; - case LONG: - stream.writeLong((long) value); - break; - default: - throw new RuntimeException("internal error"); - } - } - - // for each run, call this to initialize the state and clear the collected data - private void startNewRun(Object value) { - runState = RUN_STATE.START; - valueCount = 1; - lastValue = value; - nonRepeatValues.clear(); - nonRepeatValues.add(value); - } - - // non-repeated run ends, put the collected data to result page - private void encodeNonRepeatedRun() throws IOException { - // put the value count (highest bit is 1) and all collected values - writeRunLength(valueCount | 0x8000); - for (int i = 0; i < valueCount; i++) { - writeRunValue(nonRepeatValues.get(i)); - } - } - - // repeated run ends, put repeated value to result page - private void encodeRepeatedRun() throws IOException { - // put the value count (highest bit is 0) and repeated value - writeRunLength(valueCount); - writeRunValue(lastValue); - } - - private void putRepeatValue(Object value) throws IOException { - switch (runState) { - case REPEATED_RUN: - valueCount++; - break; - case NONREPEATED_RUN: - // non-repeated run ends, encode this run - encodeNonRepeatedRun(); - startNewRun(value); - break; - default: - assert (runState == RUN_STATE.START); - // enter repeated run - runState = RUN_STATE.REPEATED_RUN; - valueCount++; - break; - } - } - - private void putNonRepeatValue(Object value) throws IOException { - switch (runState) { - case NONREPEATED_RUN: - // collect the non-repeated value - nonRepeatValues.add(value); - lastValue = value; - valueCount++; - break; - case REPEATED_RUN: - // repeated-run ends, encode this run - encodeRepeatedRun(); - startNewRun(value); - break; - default: - assert (runState == RUN_STATE.START); - // enter non-repeated run - runState = RUN_STATE.NONREPEATED_RUN; - nonRepeatValues.add(value); - lastValue = value; - valueCount++; - break; - } - } - - } - - // It decodes data in one shot. It is suitable for scan query - // TODO: add a on-the-fly decoder for filter query with high selectivity - private class Decoder { - - // src data type - private DataType dataType; - private int pageSize; - - private Decoder(DataType dataType, int pageSize) throws MemoryException { - validateDataType(dataType); - this.dataType = dataType; - this.pageSize = pageSize; - } - - private ColumnPage decode(byte[] input, int offset, int length) - throws MemoryException, IOException { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length)); - ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize); - switch (dataType) { - case BYTE: - decodeBytePage(in, resultPage); - break; - case SHORT: - decodeShortPage(in, resultPage); - break; - case INT: - decodeIntPage(in, resultPage); - break; - case LONG: - decodeLongPage(in, resultPage); - break; - default: - throw new RuntimeException("unsupported datatype:" + dataType); - } - return resultPage; - } - - private void decodeBytePage(DataInputStream in, ColumnPage decodedPage) - throws IOException { - int rowId = 0; - do { - int runLength = in.readShort(); - int count = runLength & 0x7FFF; - if (runLength < 0) { - // non-repeated run - for (int i = 0; i < count; i++) { - decodedPage.putByte(rowId++, in.readByte()); - } - } else { - // repeated run - byte value = in.readByte(); - for (int i = 0; i < count; i++) { - decodedPage.putByte(rowId++, value); - } - } - } while (in.available() > 0); - } - - private void decodeShortPage(DataInputStream in, ColumnPage decodedPage) - throws IOException { - int rowId = 0; - do { - int runLength = in.readShort(); - int count = runLength & 0x7FFF; - if (runLength < 0) { - // non-repeated run - for (int i = 0; i < count; i++) { - decodedPage.putShort(rowId++, in.readShort()); - } - } else { - // repeated run - short value = in.readShort(); - for (int i = 0; i < count; i++) { - decodedPage.putShort(rowId++, value); - } - } - } while (in.available() > 0); - } - - private void decodeIntPage(DataInputStream in, ColumnPage decodedPage) - throws IOException { - int rowId = 0; - do { - int runLength = in.readShort(); - int count = runLength & 0x7FFF; - if (runLength < 0) { - // non-repeated run - for (int i = 0; i < count; i++) { - decodedPage.putInt(rowId++, in.readInt()); - } - } else { - // repeated run - int value = in.readInt(); - for (int i = 0; i < count; i++) { - decodedPage.putInt(rowId++, value); - } - } - } while (in.available() > 0); - } - - private void decodeLongPage(DataInputStream in, ColumnPage decodedPage) - throws IOException { - int rowId = 0; - do { - int runLength = in.readShort(); - int count = runLength & 0x7FFF; - if (runLength < 0) { - // non-repeated run - for (int i = 0; i < count; i++) { - decodedPage.putLong(rowId++, in.readLong()); - } - } else { - // repeated run - long value = in.readLong(); - for (int i = 0; i < count; i++) { - decodedPage.putLong(rowId++, value); - } - } - } while (in.available() > 0); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java new file mode 100644 index 0000000..94ca3e6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import org.apache.carbondata.core.datastore.page.ComplexColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec; +import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply + * and decode, it also employs compressor to compress the encoded data + */ +public abstract class AdaptiveCodec implements ColumnPageCodec { + + // TODO: cache and reuse the same encoder since snappy is thread-safe + + // statistics of this page, can be used by subclass + protected final SimpleStatsResult stats; + + // the data type used for storage + protected final DataType targetDataType; + + // the data type specified in schema + protected final DataType srcDataType; + + protected AdaptiveCodec(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats) { + this.stats = stats; + this.srcDataType = srcDataType; + this.targetDataType = targetDataType; + } + + public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) { + throw new UnsupportedOperationException("internal error"); + } + + @Override + public String toString() { + return String.format("%s[src type: %s, target type: %s, stats(%s)]", + getClass().getName(), srcDataType, targetDataType, stats); + } + + protected String debugInfo() { + return this.toString(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java new file mode 100644 index 0000000..9107a6b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.format.Encoding; + +/** + * Codec for integer (byte, short, int, long) data type page. + * This codec will calculate delta of page max value and page value, + * and do type casting of the diff to make storage minimum. + */ +public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { + + private ColumnPage encodedPage; + private long max; + + public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats) { + super(srcDataType, targetDataType, stats); + switch (srcDataType) { + case BYTE: + this.max = (byte) stats.getMax(); + break; + case SHORT: + this.max = (short) stats.getMax(); + break; + case INT: + this.max = (int) stats.getMax(); + break; + case LONG: + this.max = (long) stats.getMax(); + break; + default: + // this codec is for integer type only + throw new UnsupportedOperationException( + "unsupported data type for Delta compress: " + srcDataType); + } + } + + @Override + public String getName() { + return "DeltaIntegralCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + return new ColumnPageEncoder() { + final Compressor compressor = CompressorFactory.getInstance().getCompressor(); + + @Override + protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { + if (encodedPage != null) { + throw new IllegalStateException("already encoded"); + } + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.convertValue(converter); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return result; + } + + @Override + protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { + return new AdaptiveDeltaIntegralEncoderMeta( + compressor.getName(), targetDataType, inputPage.getStatistics()); + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<>(); + encodings.add(Encoding.ADAPTIVE_DELTA_INTEGRAL); + return encodings; + } + + }; + } + + @Override + public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) { + AdaptiveDeltaIntegralEncoderMeta codecMeta = (AdaptiveDeltaIntegralEncoderMeta) meta; + final Compressor compressor = CompressorFactory.getInstance().getCompressor( + codecMeta.getCompressorName()); + return new ColumnPageDecoder() { + @Override + public ColumnPage decode(byte[] input, int offset, int length) + throws MemoryException, IOException { + ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + DeltaIntegralConverter converter = new DeltaIntegralConverter(page, targetDataType, + srcDataType, stats.getMax()); + return LazyColumnPage.newPage(page, converter); + } + }; + } + + private ColumnPageValueConverter converter = new ColumnPageValueConverter() { + @Override + public void encode(int rowId, byte value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, short value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, int value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, long value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, max - value); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, float value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, (long)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, double value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, (long)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public long decodeLong(byte value) { + return max - value; + } + + @Override + public long decodeLong(short value) { + return max - value; + } + + @Override + public long decodeLong(int value) { + return max - value; + } + + @Override + public double decodeDouble(byte value) { + return max - value; + } + + @Override + public double decodeDouble(short value) { + return max - value; + } + + @Override + public double decodeDouble(int value) { + return max - value; + } + + @Override + public double decodeDouble(long value) { + return max - value; + } + + @Override + public double decodeDouble(float value) { + // this codec is for integer type only + throw new RuntimeException("internal error"); + } + + @Override + public double decodeDouble(double value) { + // this codec is for integer type only + throw new RuntimeException("internal error"); + } + }; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java new file mode 100644 index 0000000..1d20481 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.Writable; + +public class AdaptiveDeltaIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable { + + public AdaptiveDeltaIntegralEncoderMeta() { + } + + AdaptiveDeltaIntegralEncoderMeta(String compressorName, DataType targetDataType, + SimpleStatsResult stats) { + super(targetDataType, stats, compressorName); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java new file mode 100644 index 0000000..3104dd6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.Writable; + +/** + * Metadata for AdaptiveIntegralCodec and DeltaIntegralCodec + */ +public class AdaptiveEncoderMeta extends ColumnPageEncoderMeta implements Writable { + + private DataType targetDataType; + private String compressorName; + + AdaptiveEncoderMeta() { + + } + + AdaptiveEncoderMeta(DataType targetDataType, SimpleStatsResult stats, + String compressorName) { + super(stats.getDataType(), stats); + this.targetDataType = targetDataType; + this.compressorName = compressorName; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeByte(targetDataType.ordinal()); + out.writeUTF(compressorName); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.targetDataType = DataType.valueOf(in.readByte()); + this.compressorName = in.readUTF(); + } + + public DataType getTargetDataType() { + return targetDataType; + } + + public String getCompressorName() { + return compressorName; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java new file mode 100644 index 0000000..ccd92d1 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.format.Encoding; + +/** + * Codec for integer (byte, short, int, long) data type page. + * This converter will do type casting on page data to make storage minimum. + */ +public class AdaptiveIntegralCodec extends AdaptiveCodec { + + private ColumnPage encodedPage; + + public AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType, + SimpleStatsResult stats) { + super(srcDataType, targetDataType, stats); + } + + @Override + public String getName() { + return "AdaptiveIntegralCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + final Compressor compressor = CompressorFactory.getInstance().getCompressor(); + return new ColumnPageEncoder() { + @Override + protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { + if (encodedPage != null) { + throw new IllegalStateException("already encoded"); + } + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.convertValue(converter); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return result; + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<Encoding>(); + encodings.add(Encoding.ADAPTIVE_INTEGRAL); + return encodings; + } + + @Override + protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { + return new AdaptiveIntegralEncoderMeta(targetDataType, stats, compressor.getName()); + } + + }; + } + + @Override + public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) { + AdaptiveIntegralEncoderMeta codecMeta = (AdaptiveIntegralEncoderMeta) meta; + final Compressor compressor = CompressorFactory.getInstance().getCompressor( + codecMeta.getCompressorName()); + final DataType targetDataType = codecMeta.getTargetDataType(); + return new ColumnPageDecoder() { + @Override + public ColumnPage decode(byte[] input, int offset, int length) + throws MemoryException, IOException { + ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); + return LazyColumnPage.newPage(page, converter); + } + }; + } + + // encoded value = (type cast page value to target data type) + private ColumnPageValueConverter converter = new ColumnPageValueConverter() { + @Override + public void encode(int rowId, byte value) { + switch (targetDataType) { + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, short value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, int value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, long value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int) value); + break; + case INT: + encodedPage.putInt(rowId, (int) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, float value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int) value); + break; + case INT: + encodedPage.putInt(rowId, (int) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public void encode(int rowId, double value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte) value); + break; + case SHORT: + encodedPage.putShort(rowId, (short) value); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int) value); + break; + case INT: + encodedPage.putInt(rowId, (int) value); + break; + case LONG: + encodedPage.putLong(rowId, (long) value); + break; + default: + throw new RuntimeException("internal error: " + debugInfo()); + } + } + + @Override + public long decodeLong(byte value) { + return value; + } + + @Override + public long decodeLong(short value) { + return value; + } + + @Override + public long decodeLong(int value) { + return value; + } + + @Override + public double decodeDouble(byte value) { + return value; + } + + @Override + public double decodeDouble(short value) { + return value; + } + + @Override + public double decodeDouble(int value) { + return value; + } + + @Override + public double decodeDouble(long value) { + return value; + } + + @Override + public double decodeDouble(float value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + + @Override + public double decodeDouble(double value) { + throw new RuntimeException("internal error: " + debugInfo()); + } + }; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java new file mode 100644 index 0000000..3025303 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.Writable; + +public class AdaptiveIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable { + + public AdaptiveIntegralEncoderMeta() { + } + + public AdaptiveIntegralEncoderMeta(DataType targetDataType, SimpleStatsResult stats, + String compressorName) { + super(targetDataType, stats, compressorName); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java new file mode 100644 index 0000000..8a2bf6d --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.adaptive; + +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter; +import org.apache.carbondata.core.metadata.datatype.DataType; + +public class DeltaIntegralConverter implements ColumnPageValueConverter { + private DataType targetDataType; + private ColumnPage encodedPage; + private long max; + + public DeltaIntegralConverter(ColumnPage encodedPage, DataType targetDataType, + DataType srcDataType, Object max) { + this.targetDataType = targetDataType; + this.encodedPage = encodedPage; + switch (srcDataType) { + case BYTE: + this.max = (byte) max; + break; + case SHORT: + this.max = (short) max; + break; + case INT: + this.max = (int) max; + break; + case LONG: + this.max = (long) max; + break; + case FLOAT: + case DOUBLE: + this.max = (long)(max); + break; + } + } + + @Override + public void encode(int rowId, byte value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, short value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, int value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, long value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, max - value); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, float value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, (long)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public void encode(int rowId, double value) { + switch (targetDataType) { + case BYTE: + encodedPage.putByte(rowId, (byte)(max - value)); + break; + case SHORT: + encodedPage.putShort(rowId, (short)(max - value)); + break; + case SHORT_INT: + encodedPage.putShortInt(rowId, (int)(max - value)); + break; + case INT: + encodedPage.putInt(rowId, (int)(max - value)); + break; + case LONG: + encodedPage.putLong(rowId, (long)(max - value)); + break; + default: + throw new RuntimeException("internal error"); + } + } + + @Override + public long decodeLong(byte value) { + return max - value; + } + + @Override + public long decodeLong(short value) { + return max - value; + } + + @Override + public long decodeLong(int value) { + return max - value; + } + + @Override + public double decodeDouble(byte value) { + return max - value; + } + + @Override + public double decodeDouble(short value) { + return max - value; + } + + @Override + public double decodeDouble(int value) { + return max - value; + } + + @Override + public double decodeDouble(long value) { + return max - value; + } + + @Override + public double decodeDouble(float value) { + // this codec is for integer type only + throw new RuntimeException("internal error"); + } + + @Override + public double decodeDouble(double value) { + // this codec is for integer type only + throw new RuntimeException("internal error"); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java new file mode 100644 index 0000000..1fc61ac --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.compress; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter; +import org.apache.carbondata.core.datastore.page.LazyColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.format.Encoding; + +/** + * This codec directly apply compression on the input data + */ +public class DirectCompressCodec implements ColumnPageCodec { + + private DataType dataType; + + public DirectCompressCodec(DataType dataType) { + this.dataType = dataType; + } + + @Override + public String getName() { + return "DirectCompressCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + // TODO: make compressor configurable in create table + return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + } + + @Override + public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) { + DirectCompressorEncoderMeta codecMeta = (DirectCompressorEncoderMeta) meta; + return new DirectDecompressor(codecMeta.getCompressorName(), + codecMeta.getScale(), codecMeta.getPrecision()); + } + + private class DirectCompressor extends ColumnPageEncoder { + + private Compressor compressor; + + DirectCompressor(String compressorName) { + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); + } + + @Override + protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException { + return input.compress(compressor); + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<>(); + encodings.add(Encoding.DIRECT_COMPRESS); + return encodings; + } + + @Override + protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) { + return new DirectCompressorEncoderMeta(compressor.getName(), inputPage.getDataType(), + inputPage.getStatistics()); + } + + } + + private class DirectDecompressor implements ColumnPageDecoder { + + private Compressor compressor; + private int scale; + private int precision; + + DirectDecompressor(String compressorName, int scale, int precision) { + this.compressor = CompressorFactory.getInstance().getCompressor(compressorName); + this.scale = scale; + this.precision = precision; + } + + @Override + public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { + ColumnPage decodedPage; + if (dataType == DataType.DECIMAL) { + decodedPage = ColumnPage.decompressDecimalPage(compressor, input, offset, length, + scale, precision); + } else { + decodedPage = ColumnPage.decompress(compressor, dataType, input, offset, length); + } + return LazyColumnPage.newPage(decodedPage, converter); + } + } + + private ColumnPageValueConverter converter = new ColumnPageValueConverter() { + @Override + public void encode(int rowId, byte value) { + throw new RuntimeException("internal error"); + } + + @Override + public void encode(int rowId, short value) { + throw new RuntimeException("internal error"); + } + + @Override + public void encode(int rowId, int value) { + throw new RuntimeException("internal error"); + } + + @Override + public void encode(int rowId, long value) { + throw new RuntimeException("internal error"); + } + + @Override + public void encode(int rowId, float value) { + throw new RuntimeException("internal error"); + } + + @Override + public void encode(int rowId, double value) { + throw new RuntimeException("internal error"); + } + + @Override + public long decodeLong(byte value) { + return value; + } + + @Override + public long decodeLong(short value) { + return value; + } + + @Override + public long decodeLong(int value) { + return value; + } + + @Override + public double decodeDouble(byte value) { + return value; + } + + @Override + public double decodeDouble(short value) { + return value; + } + + @Override + public double decodeDouble(int value) { + return value; + } + + @Override + public double decodeDouble(long value) { + return value; + } + + @Override + public double decodeDouble(float value) { + return value; + } + + @Override + public double decodeDouble(double value) { + return value; + } + }; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java new file mode 100644 index 0000000..cf19259 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.compress; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.Writable; + +public class DirectCompressorEncoderMeta extends ColumnPageEncoderMeta implements Writable { + private String compressorName; + + public DirectCompressorEncoderMeta() { + } + + public DirectCompressorEncoderMeta(String compressorName, final DataType dataType, + SimpleStatsResult stats) { + super(dataType, stats); + this.compressorName = compressorName; + } + + public String getCompressorName() { + return compressorName; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(compressorName); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + compressorName = in.readUTF(); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java new file mode 100644 index 0000000..e37b8f6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; +import org.apache.carbondata.core.datastore.columnar.IndexStorage; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.format.Encoding; + +public class ComplexDimensionIndexCodec extends IndexStorageCodec { + + public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, + Compressor compressor) { + super(isSort, isInvertedIndex, compressor); + } + + @Override + public String getName() { + return "ComplexDimensionIndexCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + return new IndexStorageEncoder() { + @Override + void encodeIndexStorage(ColumnPage inputPage) { + IndexStorage indexStorage = + new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false); + byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + byte[] compressed = compressor.compressByte(flattened); + super.indexStorage = indexStorage; + super.compressedDataPage = compressed; + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<>(); + encodings.add(Encoding.DICTIONARY); + encodings.add(Encoding.INVERTED_INDEX); + return encodings; + } + + }; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java new file mode 100644 index 0000000..d157654 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; +import org.apache.carbondata.core.datastore.columnar.IndexStorage; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.format.Encoding; + +public class DictDimensionIndexCodec extends IndexStorageCodec { + + public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) { + super(isSort, isInvertedIndex, compressor); + } + + @Override + public String getName() { + return "DictDimensionIndexCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + return new IndexStorageEncoder() { + @Override + void encodeIndexStorage(ColumnPage inputPage) { + IndexStorage indexStorage; + byte[][] data = inputPage.getByteArrayPage(); + if (isInvertedIndex) { + indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort); + } else { + indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false); + } + byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + super.compressedDataPage = compressor.compressByte(flattened); + super.indexStorage = indexStorage; + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<>(); + encodings.add(Encoding.DICTIONARY); + encodings.add(Encoding.RLE); + if (isInvertedIndex) { + encodings.add(Encoding.INVERTED_INDEX); + } + return encodings; + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java new file mode 100644 index 0000000..1e5015b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; +import org.apache.carbondata.core.datastore.columnar.IndexStorage; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.format.Encoding; + +public class DirectDictDimensionIndexCodec extends IndexStorageCodec { + + public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, + Compressor compressor) { + super(isSort, isInvertedIndex, compressor); + } + + @Override + public String getName() { + return "DirectDictDimensionIndexCodec"; + } + + @Override + public ColumnPageEncoder createEncoder(Map<String, String> parameter) { + return new IndexStorageEncoder() { + @Override + void encodeIndexStorage(ColumnPage inputPage) { + IndexStorage indexStorage; + byte[][] data = inputPage.getByteArrayPage(); + if (isInvertedIndex) { + indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort); + } else { + indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false); + } + byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); + super.compressedDataPage = compressor.compressByte(flattened); + super.indexStorage = indexStorage; + } + + @Override + protected List<Encoding> getEncodingList() { + List<Encoding> encodings = new ArrayList<>(); + encodings.add(Encoding.DICTIONARY); + encodings.add(Encoding.RLE); + if (isInvertedIndex) { + encodings.add(Encoding.INVERTED_INDEX); + } + return encodings; + } + }; + } + +}
