fix double issue
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/434f32dd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/434f32dd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/434f32dd Branch: refs/heads/streaming_ingest Commit: 434f32ddbbd56cf59cbb8ca54229ad17451d2491 Parents: 15acd9d Author: QiangCai <[email protected]> Authored: Sat Jun 24 18:38:35 2017 +0800 Committer: QiangCai <[email protected]> Committed: Mon Jun 26 22:23:17 2017 +0800 ---------------------------------------------------------------------- .../page/encoding/DefaultEncodingStrategy.java | 11 +- .../encoding/UpscaleDeltaFloatingCodec.java | 198 ------------------- .../page/encoding/UpscaleFloatingCodec.java | 34 ++-- .../primitiveTypes/DoubleDataTypeTestCase.scala | 91 +++++++++ 4 files changed, 115 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/434f32dd/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java index 94e1cea..f8e43fc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java @@ -131,15 +131,8 @@ public class DefaultEncodingStrategy extends EncodingStrategy { } else { // double DataType upscaleAdaptiveDataType = fitDataType(Math.pow(10, decimal) * absMaxValue, decimal); - DataType upscaleDiffDataType = - fitDataType(Math.pow(10, decimal) * (maxValue - minValue), decimal); - if (upscaleAdaptiveDataType.getSizeInBytes() <= upscaleDiffDataType.getSizeInBytes()) { - return UpscaleFloatingCodec.newInstance( - srcDataType, upscaleAdaptiveDataType, stats, compressor); - } else { - return UpscaleDeltaFloatingCodec.newInstance( - srcDataType, upscaleDiffDataType, stats, compressor); - } + return UpscaleFloatingCodec.newInstance( + srcDataType, upscaleAdaptiveDataType, stats, compressor); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/434f32dd/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java deleted file mode 100644 index e53346b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.page.encoding; - -import java.math.BigDecimal; - -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.LazyColumnPage; -import org.apache.carbondata.core.datastore.page.PrimitiveCodec; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * Codec for floating point (float, double) data type page. - * This codec will upscale (multiple page value by decimal) to integer value, - * and do type casting to make storage minimum. - */ -public class UpscaleDeltaFloatingCodec extends AdaptiveCompressionCodec { - - private ColumnPage encodedPage; - - private BigDecimal max; - private double factor; - - public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - return new UpscaleDeltaFloatingCodec(srcDataType, targetDataType, stats, compressor); - } - - private UpscaleDeltaFloatingCodec(DataType srcDataType, DataType targetDataType, - ColumnPageStatsVO stats, Compressor compressor) { - super(srcDataType, targetDataType, stats, compressor); - this.max = BigDecimal.valueOf((double) stats.getMax()); - this.factor = Math.pow(10, stats.getDecimal()); - } - - @Override - public String getName() { - return "UpscaleDeltaFloatingCodec"; - } - - @Override - public byte[] encode(ColumnPage input) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return input.compress(compressor); - } else { - encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize()); - input.encode(codec); - byte[] result = encodedPage.compress(compressor); - encodedPage.freeMemory(); - return result; - } - } - - @Override - public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException { - if (srcDataType.equals(targetDataType)) { - return ColumnPage.decompress(compressor, targetDataType, input, offset, length); - } else { - ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length); - return LazyColumnPage.newPage(page, codec); - } - } - - // encoded value = (10 power of decimal) * ((max value of page) - (page value)) - private PrimitiveCodec codec = new PrimitiveCodec() { - @Override - public void encode(int rowId, byte value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, short value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, int value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, long value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public void encode(int rowId, float value) { - double diff = max.subtract(BigDecimal.valueOf(value)).doubleValue(); - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(Math.round(factor * diff))); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(Math.round(factor * diff))); - break; - case INT: - encodedPage.putInt(rowId, (int)(Math.round(factor * diff))); - break; - case LONG: - encodedPage.putLong(rowId, (long)(Math.round(factor * diff))); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public void encode(int rowId, double value) { - double diff = max.subtract(BigDecimal.valueOf(value)).doubleValue(); - switch (targetDataType) { - case BYTE: - encodedPage.putByte(rowId, (byte)(Math.round(factor * diff))); - break; - case SHORT: - encodedPage.putShort(rowId, (short)(Math.round(factor * diff))); - break; - case INT: - encodedPage.putInt(rowId, (int)(Math.round(factor * diff))); - break; - case LONG: - encodedPage.putLong(rowId, (long)(Math.round(factor * diff))); - break; - default: - throw new RuntimeException("internal error: " + debugInfo()); - } - } - - @Override - public long decodeLong(byte value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public long decodeLong(short value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public long decodeLong(int value) { - // this codec is for floating point type only - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public double decodeDouble(byte value) { - return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue(); - } - - @Override - public double decodeDouble(short value) { - return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue(); - } - - @Override - public double decodeDouble(int value) { - return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue(); - } - - @Override - public double decodeDouble(long value) { - return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue(); - } - - @Override - public double decodeDouble(float value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - - @Override - public double decodeDouble(double value) { - throw new RuntimeException("internal error: " + debugInfo()); - } - }; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/434f32dd/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java index 56c4508..73898af 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datastore.page.encoding; +import java.math.BigDecimal; + import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.LazyColumnPage; @@ -105,16 +107,20 @@ public class UpscaleFloatingCodec extends AdaptiveCompressionCodec { public void encode(int rowId, float value) { switch (targetDataType) { case BYTE: - encodedPage.putByte(rowId, (byte)(Math.round(factor * value))); + encodedPage.putByte(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue()); break; case SHORT: - encodedPage.putShort(rowId, (short)(Math.round(factor * value))); + encodedPage.putShort(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue()); break; case INT: - encodedPage.putInt(rowId, (int)(Math.round(factor * value))); + encodedPage.putInt(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue()); break; case LONG: - encodedPage.putLong(rowId, (long)(Math.round(factor * value))); + encodedPage.putLong(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue()); break; default: throw new RuntimeException("internal error: " + debugInfo()); @@ -125,16 +131,20 @@ public class UpscaleFloatingCodec extends AdaptiveCompressionCodec { public void encode(int rowId, double value) { switch (targetDataType) { case BYTE: - encodedPage.putByte(rowId, (byte)(Math.round(factor * value))); + encodedPage.putByte(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue()); break; case SHORT: - encodedPage.putShort(rowId, (short)(Math.round(factor * value))); + encodedPage.putShort(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue()); break; case INT: - encodedPage.putInt(rowId, (int)(Math.round(factor * value))); + encodedPage.putInt(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue()); break; case LONG: - encodedPage.putLong(rowId, (long)(Math.round(factor * value))); + encodedPage.putLong(rowId, + BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue()); break; case DOUBLE: encodedPage.putDouble(rowId, value); @@ -161,22 +171,22 @@ public class UpscaleFloatingCodec extends AdaptiveCompressionCodec { @Override public double decodeDouble(byte value) { - return value / factor; + return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); } @Override public double decodeDouble(short value) { - return value / factor; + return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); } @Override public double decodeDouble(int value) { - return value / factor; + return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); } @Override public double decodeDouble(long value) { - return value / factor; + return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue(); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/434f32dd/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/DoubleDataTypeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/DoubleDataTypeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/DoubleDataTypeTestCase.scala new file mode 100644 index 0000000..e72151d --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/DoubleDataTypeTestCase.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.primitiveTypes + +import java.util.Random + +import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.types._ +import org.scalatest.BeforeAndAfterAll + +/** + * Test Class for filter query on Double datatypes + */ +class DoubleDataTypeTestCase extends QueryTest with BeforeAndAfterAll { + + lazy val df: DataFrame = generateDataFrame + + private def generateDataFrame(): DataFrame = { + val r = new Random() + val rdd = sqlContext.sparkContext + .parallelize(1 to 10, 2) + .map { x => + Row(x, "London" + (x % 2), x.toDouble / 13, x.toDouble / 11) + } + + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("city", StringType, nullable = false), + StructField("m1", DoubleType, nullable = false), + StructField("m2", DoubleType, nullable = false) + ) + ) + + sqlContext.createDataFrame(rdd, schema) + } + + override def beforeAll { + sql("drop table if exists doubleTypeCarbonTable") + sql("drop table if exists doubleTypeHiveTable") + + df.write + .format("carbondata") + .option("tableName", "doubleTypeCarbonTable") + .option("tempCSV", "false") + .option("single_pass", "true") + .option("dictionary_exclude", "city") + .option("table_blocksize", "32") + .mode(SaveMode.Overwrite) + .save() + + df.write + .mode(SaveMode.Overwrite) + .saveAsTable("doubleTypeHiveTable") + + } + + test("detail query") { + checkAnswer(sql("select * from doubleTypeCarbonTable order by id"), + sql("select * from doubleTypeHiveTable order by id")) + + } + +// test("agg query") { +// checkAnswer(sql("select city, sum(m1), avg(m1), count(m1), max(m1), min(m1) from doubleTypeCarbonTable group by city"), +// sql("select city, sum(m1), avg(m1), count(m1), max(m1), min(m1) from doubleTypeHiveTable group by city")) +// +// checkAnswer(sql("select city, sum(m2), avg(m2), count(m2), max(m2), min(m2) from doubleTypeCarbonTable group by city"), +// sql("select city, sum(m2), avg(m2), count(m2), max(m2), min(m2) from doubleTypeHiveTable group by city")) +// } + + override def afterAll { + sql("drop table if exists doubleTypeCarbonTable") + sql("drop table if exists doubleTypeHiveTable") + } +} \ No newline at end of file
