Repository: carbondata Updated Branches: refs/heads/master 601323db9 -> 379d4f66c
[CARBONDATA-1365] add RLE codec implementation A new RLE codec for integral type is added A test suite is added Another PR is needed to modify encoding strategy to use this codec This closes #1240 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/379d4f66 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/379d4f66 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/379d4f66 Branch: refs/heads/master Commit: 379d4f66c432c4ba942a6738967c4700e6de1ffa Parents: 601323d Author: Jacky Li <[email protected]> Authored: Thu Aug 17 09:57:43 2017 +0800 Committer: Ravindra Pesala <[email protected]> Committed: Thu Aug 17 19:22:10 2017 +0530 ---------------------------------------------------------------------- ...CompressedMeasureChunkFileBasedReaderV3.java | 2 +- .../core/datastore/page/ColumnPage.java | 4 + .../page/encoding/ColumnPageCodec.java | 2 +- .../core/datastore/page/encoding/RLECodec.java | 417 +++++++++++++++++++ .../datastore/page/encoding/RLECodecSuite.java | 147 +++++++ 5 files changed, 570 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/379d4f66/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java index bde9803..a893ab0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java @@ -226,7 +226,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun } protected ColumnPage decodeMeasure(MeasureRawColumnChunk measureRawColumnChunk, - DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException { + DataChunk2 measureColumnChunk, int copyPoint) throws MemoryException, IOException { List<ByteBuffer> encoder_meta = measureColumnChunk.getEncoder_meta(); // for measure, it should have only one ValueEncoderMeta assert (encoder_meta.size() > 0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/379d4f66/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 3912f45..a0eeb44 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -131,6 +131,10 @@ public abstract class ColumnPage { } } + public static ColumnPage newPage(DataType dataType, int pageSize) throws MemoryException { + return newPage(dataType, pageSize, 0, 0); + } + /** * Create a new page of dataType and number of row = pageSize */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/379d4f66/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java index ac7a79e..62c6cd5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java @@ -52,6 +52,6 @@ public interface ColumnPageCodec { * @param length length of data to decode * @return decoded data */ - ColumnPage decode(byte[] input, int offset, int length) throws MemoryException; + ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/379d4f66/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java new file mode 100644 index 0000000..46c5c58 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.ComplexColumnPage; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.CodecMetaFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * RLE encoding implementation for integral column page. + * This encoding keeps track of repeated-run and non-repeated-run, and make use + * of the highest bit of the length field to indicate the type of run. + * The length field is encoded as 16 bits value. (Page size must be less than 65535 rows) + * + * For example: input data {5, 5, 1, 2, 3, 3, 3, 3, 3} will be encoded to + * {0x00, 0x02, 0x05, (repeated-run, 2 values of 5) + * 0x80, 0x03, 0x01, 0x02, 0x03, (non-repeated-run, 3 values: 1, 2, 3) + * 0x00, 0x04, 0x03} (repeated-run, 4 values of 3) + */ +public class RLECodec implements ColumnPageCodec { + + enum RUN_STATE { INIT, START, REPEATED_RUN, NONREPEATED_RUN } + + private DataType dataType; + private int pageSize; + + /** + * New RLECodec + * @param dataType data type of the raw column page before encode + * @param pageSize page size of the raw column page before encode + */ + RLECodec(DataType dataType, int pageSize) { + this.dataType = dataType; + this.pageSize = pageSize; + } + + @Override + public String getName() { + return "RLECodec"; + } + + @Override + public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException { + Encoder encoder = new Encoder(); + return encoder.encode(input); + } + + @Override + public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) { + throw new UnsupportedOperationException("complex column does not support RLE encoding"); + } + + @Override + public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, + IOException { + Decoder decoder = new Decoder(dataType, pageSize); + return decoder.decode(input, offset, length); + } + + // This codec supports integral type only + private void validateDataType(DataType dataType) { + switch (dataType) { + case BYTE: + case SHORT: + case INT: + case LONG: + break; + default: + throw new UnsupportedOperationException(dataType + " is not supported for RLE"); + } + } + + private class Encoder { + // While encoding RLE, this class internally work as a state machine + // INIT state is the initial state before any value comes + // START state is the start for each run + // REPEATED_RUN state means it is collecting repeated values (`lastValue`) + // NONREPEATED_RUN state means it is collecting non-repeated values (`nonRepeatValues`) + private RUN_STATE runState; + + // count for each run, either REPEATED_RUN or NONREPEATED_RUN + private short valueCount; + + // collected value for REPEATED_RUN + private Object lastValue; + + // collected value for NONREPEATED_RUN + private List<Object> nonRepeatValues; + + // data type of input page + private DataType dataType; + + // output stream for encoded data + private ByteArrayOutputStream bao; + private DataOutputStream stream; + + private Encoder() { + this.runState = RUN_STATE.INIT; + this.valueCount = 0; + this.nonRepeatValues = new ArrayList<>(); + this.bao = new ByteArrayOutputStream(); + this.stream = new DataOutputStream(bao); + } + + private EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException { + validateDataType(input.getDataType()); + this.dataType = input.getDataType(); + switch (dataType) { + case BYTE: + byte[] bytePage = input.getBytePage(); + for (int i = 0; i < bytePage.length; i++) { + putValue(bytePage[i]); + } + break; + case SHORT: + short[] shortPage = input.getShortPage(); + for (int i = 0; i < shortPage.length; i++) { + putValue(shortPage[i]); + } + break; + case INT: + int[] intPage = input.getIntPage(); + for (int i = 0; i < intPage.length; i++) { + putValue(intPage[i]); + } + break; + case LONG: + long[] longPage = input.getLongPage(); + for (int i = 0; i < longPage.length; i++) { + putValue(longPage[i]); + } + break; + default: + throw new UnsupportedOperationException(input.getDataType() + + " does not support RLE encoding"); + } + byte[] encoded = collectResult(); + SimpleStatsResult stats = (SimpleStatsResult) input.getStatistics(); + return new EncodedMeasurePage( + input.getPageSize(), + encoded, + CodecMetaFactory.createMeta(stats, input.getDataType()), + stats.getNullBits()); + } + + private void putValue(Object value) throws IOException { + if (runState == RUN_STATE.INIT) { + startNewRun(value); + } else { + if (lastValue.equals(value)) { + putRepeatValue(value); + } else { + putNonRepeatValue(value); + } + } + } + + // when last row is reached, write out all collected data + private byte[] collectResult() throws IOException { + switch (runState) { + case REPEATED_RUN: + writeRunLength(valueCount); + writeRunValue(lastValue); + break; + case NONREPEATED_RUN: + writeRunLength(valueCount | 0x8000); + for (int i = 0; i < valueCount; i++) { + writeRunValue(nonRepeatValues.get(i)); + } + break; + default: + assert (runState == RUN_STATE.START); + writeRunLength(1); + writeRunValue(lastValue); + } + return bao.toByteArray(); + } + + private void writeRunLength(int length) throws IOException { + stream.writeShort(length); + } + + private void writeRunValue(Object value) throws IOException { + switch (dataType) { + case BYTE: + stream.writeByte((byte) value); + break; + case SHORT: + stream.writeShort((short) value); + break; + case INT: + stream.writeInt((int) value); + break; + case LONG: + stream.writeLong((long) value); + break; + default: + throw new RuntimeException("internal error"); + } + } + + // for each run, call this to initialize the state and clear the collected data + private void startNewRun(Object value) { + runState = RUN_STATE.START; + valueCount = 1; + lastValue = value; + nonRepeatValues.clear(); + nonRepeatValues.add(value); + } + + // non-repeated run ends, put the collected data to result page + private void encodeNonRepeatedRun() throws IOException { + // put the value count (highest bit is 1) and all collected values + writeRunLength(valueCount | 0x8000); + for (int i = 0; i < valueCount; i++) { + writeRunValue(nonRepeatValues.get(i)); + } + } + + // repeated run ends, put repeated value to result page + private void encodeRepeatedRun() throws IOException { + // put the value count (highest bit is 0) and repeated value + writeRunLength(valueCount); + writeRunValue(lastValue); + } + + private void putRepeatValue(Object value) throws IOException { + switch (runState) { + case REPEATED_RUN: + valueCount++; + break; + case NONREPEATED_RUN: + // non-repeated run ends, encode this run + encodeNonRepeatedRun(); + startNewRun(value); + break; + default: + assert (runState == RUN_STATE.START); + // enter repeated run + runState = RUN_STATE.REPEATED_RUN; + valueCount++; + break; + } + } + + private void putNonRepeatValue(Object value) throws IOException { + switch (runState) { + case NONREPEATED_RUN: + // collect the non-repeated value + nonRepeatValues.add(value); + lastValue = value; + valueCount++; + break; + case REPEATED_RUN: + // repeated-run ends, encode this run + encodeRepeatedRun(); + startNewRun(value); + break; + default: + assert (runState == RUN_STATE.START); + // enter non-repeated run + runState = RUN_STATE.NONREPEATED_RUN; + nonRepeatValues.add(value); + lastValue = value; + valueCount++; + break; + } + } + + } + + // It decodes data in one shot. It is suitable for scan query + // TODO: add a on-the-fly decoder for filter query with high selectivity + private class Decoder { + + // src data type + private DataType dataType; + private int pageSize; + + private Decoder(DataType dataType, int pageSize) throws MemoryException { + validateDataType(dataType); + this.dataType = dataType; + this.pageSize = pageSize; + } + + private ColumnPage decode(byte[] input, int offset, int length) + throws MemoryException, IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length)); + ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize); + switch (dataType) { + case BYTE: + decodeBytePage(in, resultPage); + break; + case SHORT: + decodeShortPage(in, resultPage); + break; + case INT: + decodeIntPage(in, resultPage); + break; + case LONG: + decodeLongPage(in, resultPage); + break; + } + return resultPage; + } + + private void decodeBytePage(DataInputStream in, ColumnPage decodedPage) + throws IOException { + int rowId = 0; + do { + int runLength = in.readShort(); + int count = runLength & 0x7FFF; + if (runLength < 0) { + // non-repeated run + for (int i = 0; i < count; i++) { + decodedPage.putByte(rowId++, in.readByte()); + } + } else { + // repeated run + byte value = in.readByte(); + for (int i = 0; i < count; i++) { + decodedPage.putByte(rowId++, value); + } + } + } while (in.available() > 0); + } + + private void decodeShortPage(DataInputStream in, ColumnPage decodedPage) + throws IOException { + int rowId = 0; + do { + int runLength = in.readShort(); + int count = runLength & 0x7FFF; + if (runLength < 0) { + // non-repeated run + for (int i = 0; i < count; i++) { + decodedPage.putShort(rowId++, in.readShort()); + } + } else { + // repeated run + short value = in.readShort(); + for (int i = 0; i < count; i++) { + decodedPage.putShort(rowId++, value); + } + } + } while (in.available() > 0); + } + + private void decodeIntPage(DataInputStream in, ColumnPage decodedPage) + throws IOException { + int rowId = 0; + do { + int runLength = in.readShort(); + int count = runLength & 0x7FFF; + if (runLength < 0) { + // non-repeated run + for (int i = 0; i < count; i++) { + decodedPage.putInt(rowId++, in.readInt()); + } + } else { + // repeated run + int value = in.readInt(); + for (int i = 0; i < count; i++) { + decodedPage.putInt(rowId++, value); + } + } + } while (in.available() > 0); + } + + private void decodeLongPage(DataInputStream in, ColumnPage decodedPage) + throws IOException { + int rowId = 0; + do { + int runLength = in.readShort(); + int count = runLength & 0x7FFF; + if (runLength < 0) { + // non-repeated run + for (int i = 0; i < count; i++) { + decodedPage.putLong(rowId++, in.readLong()); + } + } else { + // repeated run + long value = in.readLong(); + for (int i = 0; i < count; i++) { + decodedPage.putLong(rowId++, value); + } + } + } while (in.available() > 0); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/379d4f66/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java new file mode 100644 index 0000000..251376e --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class RLECodecSuite { + + static class TestData { + private byte[] inputByteData; + private ColumnPage inputBytePage; + private byte[] expectedEncodedByteData; + + TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws IOException, MemoryException { + this.inputByteData = inputByteData; + inputBytePage = ColumnPage.newPage(DataType.BYTE, inputByteData.length); + inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataType.BYTE, inputByteData.length, 0, 0)); + for (int i = 0; i < inputByteData.length; i++) { + inputBytePage.putData(i, inputByteData[i]); + } + this.expectedEncodedByteData = expectedEncodedByteData; + } + + } + + private static TestData data1; + private static TestData data2; + private static TestData data3; + + @BeforeClass public static void setUp() throws IOException, MemoryException { + setUpData1(); + setUpData2(); + setUpData3(); + } + + private static void setUpData1() throws IOException, MemoryException { + byte[] inputData = new byte[]{10, 10, 3, 4, 5, 6, 7, 7, 7, 7}; + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + DataOutputStream stream = new DataOutputStream(bao); + stream.writeShort(2); + stream.writeByte(10); + stream.writeShort(5|0x8000); + stream.writeByte(3); + stream.writeByte(4); + stream.writeByte(5); + stream.writeByte(6); + stream.writeByte(7); + stream.writeShort(3); + stream.writeByte(7); + byte[] expectedEncodedByteData = bao.toByteArray(); + data1 = new TestData( + inputData, + expectedEncodedByteData); + } + + private static void setUpData2() throws IOException, MemoryException { + byte[] inputData = new byte[]{1, 2, 3, 4, 5, 6}; + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + DataOutputStream stream = new DataOutputStream(bao); + stream.writeShort(6|0x8000); + stream.writeByte(1); + stream.writeByte(2); + stream.writeByte(3); + stream.writeByte(4); + stream.writeByte(5); + stream.writeByte(6); + byte[] expectedEncodedByteData = bao.toByteArray(); + data2 = new TestData( + inputData, + expectedEncodedByteData); + } + + private static void setUpData3() throws IOException, MemoryException { + byte[] inputData = new byte[]{10, 10, 10, 10, 10, 10}; + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + DataOutputStream stream = new DataOutputStream(bao); + stream.writeShort(6); + stream.writeByte(10); + byte[] expectedEncodedByteData = bao.toByteArray(); + data3 = new TestData( + inputData, + expectedEncodedByteData); + } + + private void testBytePageEncode(ColumnPage inputPage, byte[] expectedEncodedBytes) + throws IOException, MemoryException { + RLECodec codec = new RLECodec(DataType.BYTE, inputPage.getPageSize()); + EncodedColumnPage out = codec.encode(inputPage); + byte[] encoded = out.getEncodedData(); + assertEquals(expectedEncodedBytes.length, encoded.length); + for (int i = 0; i < encoded.length; i++) { + assertEquals(expectedEncodedBytes[i], encoded[i]); + } + } + + private void testBytePageDecode(byte[] inputBytes, byte[] expectedDecodedBytes) throws IOException, MemoryException { + RLECodec codec = new RLECodec(DataType.BYTE, expectedDecodedBytes.length); + ColumnPage page = codec.decode(inputBytes, 0, inputBytes.length); + byte[] decoded = page.getBytePage(); + assertEquals(expectedDecodedBytes.length, decoded.length); + for (int i = 0; i < decoded.length; i++) { + assertEquals(expectedDecodedBytes[i], decoded[i]); + } + } + + @Test public void testBytePageEncode() throws MemoryException, IOException { + testBytePageEncode(data1.inputBytePage, data1.expectedEncodedByteData); + testBytePageEncode(data2.inputBytePage, data2.expectedEncodedByteData); + testBytePageEncode(data3.inputBytePage, data3.expectedEncodedByteData); + } + + @Test public void testBytePageDecode() throws IOException, MemoryException { + testBytePageDecode(data1.expectedEncodedByteData, data1.inputByteData); + testBytePageDecode(data2.expectedEncodedByteData, data2.inputByteData); + testBytePageDecode(data3.expectedEncodedByteData, data3.inputByteData); + } + + @AfterClass public static void tearDown() { + } +}
