use raw compression
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eadfea78 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eadfea78 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eadfea78 Branch: refs/heads/streaming_ingest Commit: eadfea789b0fd63c4adcd4f7f335530a98dfbb78 Parents: a459dea Author: jackylk <[email protected]> Authored: Tue Jun 27 16:54:54 2017 +0800 Committer: QiangCai <[email protected]> Committed: Tue Jun 27 23:56:05 2017 +0800 ---------------------------------------------------------------------- .../core/datastore/compression/Compressor.java | 5 + .../datastore/compression/SnappyCompressor.java | 10 + .../core/datastore/page/ColumnPage.java | 3 +- .../page/UnsafeFixLengthColumnPage.java | 20 +- .../page/encoding/AdaptiveCompressionCodec.java | 4 +- .../page/encoding/AdaptiveIntegerCodec.java | 18 +- .../page/encoding/ColumnPageCodec.java | 4 +- .../page/encoding/CompressionCodec.java | 57 ------ .../page/encoding/DefaultEncodingStrategy.java | 58 +----- .../page/encoding/DeltaIntegerCodec.java | 18 +- .../page/encoding/DirectCompressCodec.java | 58 ++++++ .../page/encoding/UpscaleFloatingCodec.java | 202 ------------------- .../core/memory/UnsafeMemoryManager.java | 9 +- .../store/CarbonFactDataHandlerColumnar.java | 3 +- .../processing/store/TablePageEncoder.java | 10 +- 15 files changed, 137 insertions(+), 342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java index 8da7c8b..2bc8678 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.compression; +import java.io.IOException; + public interface Compressor { byte[] compressByte(byte[] unCompInput); @@ -55,4 +57,7 @@ public interface Compressor { double[] unCompressDouble(byte[] compInput, int offset, int length); + long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException; + + int maxCompressedLength(int inputSize); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java index f255339..f8a2f4f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java @@ -217,4 +217,14 @@ public class SnappyCompressor implements Compressor { } return null; } + + @Override + public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException { + return snappyNative.rawCompress(inputAddress, inputSize, outputAddress); + } + + @Override + public int maxCompressedLength(int inputSize) { + return snappyNative.maxCompressedLength(inputSize); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 155b4ee..730243c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.page; +import java.io.IOException; import java.math.BigDecimal; import java.util.BitSet; @@ -474,7 +475,7 @@ public abstract class ColumnPage { /** * Compress page data using specified compressor */ - public byte[] compress(Compressor compressor) { + public byte[] compress(Compressor compressor) throws MemoryException, IOException { switch (dataType) { case BYTE: return compressor.compressByte(getBytePage()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index 6bd6d31..9f71768 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.page; +import java.io.IOException; import java.math.BigDecimal; import org.apache.carbondata.core.datastore.compression.Compressor; @@ -354,9 +355,22 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { } @Override - public byte[] compress(Compressor compressor) { - // TODO: use zero-copy raw compression - return super.compress(compressor); + public byte[] compress(Compressor compressor) throws MemoryException, IOException { + if (UnsafeMemoryManager.isOffHeap()) { + // use raw compression and copy to byte[] + int inputSize = pageSize << dataType.getSizeBits(); + int compressedMaxSize = compressor.maxCompressedLength(inputSize); + MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(compressedMaxSize); + long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset()); + assert outSize < Integer.MAX_VALUE; + byte[] output = new byte[(int) outSize]; + CarbonUnsafe.unsafe.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output, + CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize); + UnsafeMemoryManager.INSTANCE.freeMemory(compressed); + return output; + } else { + return super.compress(compressor); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java index 6127583..2e8eff2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.page.encoding; +import java.io.IOException; + import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; @@ -53,7 +55,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec { public abstract String getName(); - public abstract byte[] encode(ColumnPage input) throws MemoryException; + public abstract byte[] encode(ColumnPage input) throws MemoryException, IOException; public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException; http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java index a12ce00..3d56f0c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.page.encoding; +import java.io.IOException; + import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.LazyColumnPage; @@ -49,16 +51,12 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec { } @Override - public byte[] encode(ColumnPage input) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return input.compress(compressor); - } else { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); - input.encode(codec); - byte[] result = encodedPage.compress(compressor); - encodedPage.freeMemory(); - return result; - } + public byte[] encode(ColumnPage input) throws MemoryException, IOException { + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.encode(codec); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return result; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java index afba173..36d5989 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.page.encoding; +import java.io.IOException; + import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.memory.MemoryException; @@ -36,7 +38,7 @@ public interface ColumnPageCodec { * @param input column page to apply * @return encoded data */ - byte[] encode(ColumnPage input) throws MemoryException; + byte[] encode(ColumnPage input) throws MemoryException, IOException; /** * decode byte array from offset to a column page http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java deleted file mode 100644 index 722ba21..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * Codec for variable length data type (decimal, string). - * This codec will flatten the variable length data before applying compression. - */ -public class CompressionCodec implements ColumnPageCodec { - - private Compressor compressor; - private DataType dataType; - - private CompressionCodec(DataType dataType, Compressor compressor) { - this.compressor = compressor; - this.dataType = dataType; - } - - public static CompressionCodec newInstance(DataType dataType, Compressor compressor) { - return new CompressionCodec(dataType, compressor); - } - - @Override - public String getName() { - return "CompressionCodec"; - } - - @Override - public byte[] encode(ColumnPage input) { - return input.compress(compressor); - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - return ColumnPage.decompress(compressor, dataType, input, offset, length); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java index f8e43fc..3818263 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java @@ -61,29 +61,11 @@ public class DefaultEncodingStrategy extends EncodingStrategy { } } - // fit the input double value into minimum data type - private DataType fitDataType(double value, int decimal) { - DataType dataType = DataType.DOUBLE; - if (decimal == 0) { - if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) { - dataType = DataType.BYTE; - } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) { - dataType = DataType.SHORT; - } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) { - return DataType.SHORT_INT; - } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) { - dataType = DataType.INT; - } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) { - dataType = DataType.LONG; - } - } - return dataType; - } - // choose between adaptive encoder or delta adaptive encoder, based on whose target data type // size is smaller @Override ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) { + DataType srcDataType = stats.getDataType(); DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin()); DataType deltaDataType; @@ -94,6 +76,11 @@ public class DefaultEncodingStrategy extends EncodingStrategy { } else { deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin()); } + if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == + srcDataType.getSizeInBytes()) { + // no effect to use adaptive or delta, use compression only + return DirectCompressCodec.newInstance(srcDataType, compressor); + } if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) { // choose adaptive encoding return AdaptiveIntegerCodec.newInstance( @@ -104,46 +91,19 @@ public class DefaultEncodingStrategy extends EncodingStrategy { } } - // choose between upscale adaptive encoder or upscale delta adaptive encoder, - // based on whose target data type size is smaller @Override ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) { - DataType srcDataType = stats.getDataType(); - double maxValue = (double) stats.getMax(); - double minValue = (double) stats.getMin(); - int decimal = stats.getDecimal(); - - //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max, - //but we can't use -1 to getDatatype, we should use -10000000. - double absMaxValue = Math.abs(maxValue) >= Math.abs(minValue) ? maxValue : minValue; - - if (decimal == 0) { - // short, int, long - DataType adaptiveDataType = fitDataType(absMaxValue, decimal); - DataType deltaDataType = fitDataType(maxValue - minValue, decimal); - if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) { - // choose adaptive encoding - return AdaptiveIntegerCodec.newInstance(srcDataType, adaptiveDataType, stats, compressor); - } else { - // choose delta adaptive encoding - return DeltaIntegerCodec.newInstance(srcDataType, deltaDataType, stats, compressor); - } - } else { - // double - DataType upscaleAdaptiveDataType = fitDataType(Math.pow(10, decimal) * absMaxValue, decimal); - return UpscaleFloatingCodec.newInstance( - srcDataType, upscaleAdaptiveDataType, stats, compressor); - } + return DirectCompressCodec.newInstance(stats.getDataType(), compressor); } // for decimal, currently it is a very basic implementation @Override ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) { - return CompressionCodec.newInstance(stats.getDataType(), compressor); + return DirectCompressCodec.newInstance(stats.getDataType(), compressor); } @Override ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) { - return CompressionCodec.newInstance(stats.getDataType(), compressor); + return DirectCompressCodec.newInstance(stats.getDataType(), compressor); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java index 2036df5..b77f7a2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.page.encoding; +import java.io.IOException; + import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.LazyColumnPage; @@ -64,16 +66,12 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec { } @Override - public byte[] encode(ColumnPage input) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return input.compress(compressor); - } else { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); - input.encode(codec); - byte[] result = encodedPage.compress(compressor); - encodedPage.freeMemory(); - return result; - } + public byte[] encode(ColumnPage input) throws MemoryException, IOException { + encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); + input.encode(codec); + byte[] result = encodedPage.compress(compressor); + encodedPage.freeMemory(); + return result; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java new file mode 100644 index 0000000..dcb9b7c --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page.encoding; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * This codec directly apply compression on the input data + */ +public class DirectCompressCodec implements ColumnPageCodec { + + private Compressor compressor; + private DataType dataType; + + private DirectCompressCodec(DataType dataType, Compressor compressor) { + this.compressor = compressor; + this.dataType = dataType; + } + + public static DirectCompressCodec newInstance(DataType dataType, Compressor compressor) { + return new DirectCompressCodec(dataType, compressor); + } + + @Override + public String getName() { + return "DirectCompressCodec"; + } + + @Override + public byte[] encode(ColumnPage input) throws IOException, MemoryException { + return input.compress(compressor); + } + + @Override + public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { + return ColumnPage.decompress(compressor, dataType, input, offset, length); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java deleted file mode 100644 index 73898af..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import java.math.BigDecimal; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.LazyColumnPage; -import org.apache.carbondata.core.datastore.page.PrimitiveCodec; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * Codec for floating point (float, double) data type page. - * This codec will upscale the diff from page max value to integer value, - * and do type casting to make storage minimum. - */ -public class UpscaleFloatingCodec extends AdaptiveCompressionCodec { - - private ColumnPage encodedPage; - private double factor; - - public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - return new UpscaleFloatingCodec(srcDataType, targetDataType, stats, compressor); - } - - private UpscaleFloatingCodec(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - super(srcDataType, targetDataType, stats, compressor); - this.factor = Math.pow(10, stats.getDecimal()); - } - - @Override - public String getName() { - return "UpscaleFloatingCodec"; - } - - @Override - public byte[] encode(ColumnPage input) throws MemoryException { - if (targetDataType.equals(srcDataType)) { - return input.compress(compressor); - } else { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); - input.encode(codec); - byte[] result = encodedPage.compress(compressor); - encodedPage.freeMemory(); - return result; - } - } - - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return ColumnPage.decompress(compressor, targetDataType, input, offset, length); - } else { - ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); - return LazyColumnPage.newPage(page, codec); - } - } - - // encoded value = (10 power of decimal) * (page value) - private PrimitiveCodec codec = new PrimitiveCodec() { - @Override - public void encode(int rowId, byte value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, short value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, int value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, long value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, float value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue()); - break; - case SHORT: - encodedPage.putShort(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue()); - break; - case INT: - encodedPage.putInt(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue()); - break; - case LONG: - encodedPage.putLong(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue()); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, double value) { - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue()); - break; - case SHORT: - encodedPage.putShort(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue()); - break; - case INT: - encodedPage.putInt(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue()); - break; - case LONG: - encodedPage.putLong(rowId, - BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue()); - break; - case DOUBLE: - encodedPage.putDouble(rowId, value); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public long decodeLong(byte value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public long decodeLong(short value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public long decodeLong(int value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public double decodeDouble(byte value) { - return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); - } - - @Override - public double decodeDouble(short value) { - return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); - } - - @Override - public double decodeDouble(int value) { - return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); - } - - @Override - public double decodeDouble(long value) { - return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); - } - - @Override - public double decodeDouble(float value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public double decodeDouble(double value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - }; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 90cbe75..28e63a9 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -33,6 +33,9 @@ public class UnsafeMemoryManager { private static final LogService LOGGER = LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName()); + private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); static { long size; try { @@ -50,9 +53,6 @@ public class UnsafeMemoryManager { + "so setting default value to " + size); } - boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, - CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); long takenSize = size * 1024 * 1024; MemoryAllocator allocator; if (offHeap) { @@ -159,4 +159,7 @@ public class UnsafeMemoryManager { return baseBlock; } + public static boolean isOffHeap() { + return offHeap; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 300ff0c..01e3ab6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -18,6 +18,7 @@ package org.apache.carbondata.processing.store; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -336,7 +337,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * generate the NodeHolder from the input rows (one page in case of V3 format) */ private NodeHolder processDataRows(List<CarbonRow> dataRows) - throws CarbonDataWriterException, KeyGenException, MemoryException { + throws CarbonDataWriterException, KeyGenException, MemoryException, IOException { if (dataRows.size() == 0) { return new NodeHolder(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java index 608f578..8547845 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java @@ -17,6 +17,7 @@ package org.apache.carbondata.processing.store; +import java.io.IOException; import java.util.Iterator; import org.apache.carbondata.core.datastore.TableSpec; @@ -39,7 +40,7 @@ import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; -public class TablePageEncoder { +class TablePageEncoder { private ColumnarFormatVersion version; @@ -49,14 +50,15 @@ public class TablePageEncoder { private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy(); - public TablePageEncoder(CarbonFactDataHandlerModel model) { + TablePageEncoder(CarbonFactDataHandlerModel model) { this.version = CarbonProperties.getInstance().getFormatVersion(); this.model = model; this.isUseInvertedIndex = model.getIsUseInvertedIndex(); } // function to apply all columns in one table page - public EncodedData encode(TablePage tablePage) throws KeyGenException, MemoryException { + EncodedData encode(TablePage tablePage) + throws KeyGenException, MemoryException, IOException { EncodedData encodedData = new EncodedData(); encodeAndCompressDimensions(tablePage, encodedData); encodeAndCompressMeasures(tablePage, encodedData); @@ -65,7 +67,7 @@ public class TablePageEncoder { // apply measure and set encodedData in `encodedData` private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData) - throws MemoryException { + throws MemoryException, IOException { ColumnPage[] measurePage = tablePage.getMeasurePage(); byte[][] encodedMeasures = new byte[measurePage.length][]; for (int i = 0; i < measurePage.length; i++) {
