[
https://issues.apache.org/jira/browse/HUDI-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-6033:
-----------------------
Summary: Fix to DECIMAL(p, s) schema evolution when reading avro log files
when scale is lost (was: Fix DECIMAL/FLOAT to DECIMAL(p, s) schema evolution
when reading avro log files)
> Fix to DECIMAL(p, s) schema evolution when reading avro log files when scale
> is lost
> ------------------------------------------------------------------------------------
>
> Key: HUDI-6033
> URL: https://issues.apache.org/jira/browse/HUDI-6033
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: voon
> Assignee: voon
> Priority: Major
> Labels: pull-request-available
>
> This issue only exists in MOR tables.
>
> When performing a DECIMAL/FLOAT to DECIMAL(p, s) casting and when the a row's
> data has a floating point/decimal placing that is larger than the provided
> scale (s), the error below is thrown. i.e. this error will be thrown if there
> is a lost in scale when casting from the source to destination type.
>
> For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
> DECIMAL(3, 2) will throw the error below when the row/column is required to
> be read out.
>
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
> log file
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
> at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
> at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
> at
> org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.ArithmeticException: Rounding necessary
> at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
> at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
> at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
> at java.math.BigDecimal.setScale(BigDecimal.java:2455)
> at java.math.BigDecimal.setScale(BigDecimal.java:2515)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
> at
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
> at
> org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
> ... 27 more {code}
> This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*},
> which is what we use internally when performing an unsafe projection.
>
> {*}Reference{*}:
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]
>
> *NOTE 1:*
> If the results of casting a FLOAT to DECIMAL type differs depending on the
> table type used when reading on SPARK
>
> COW tables will rely on COW's unsafe projection, and hence, Spark's casting.
> MOR tables will rely on MOR's HoodieAvroUtils to perform the
> {*}rewriteWithNewSchema{*}.
>
> Floating point errors are hard to control given that different execution code
> paths are used between COW and MOR, causing a discrepancy in the results.
>
> Hence, for the test to verify this fix, no verification on the correctness of
> results will be performed. As long as the table can be read without issue
> (after performing schema evolution), the fix is deemed to be valid.
>
> *NOTE 2:*
> When performing a DOUBLE to DECIMAL casting, the result for COW and MOR
> tables should be consistent given that fixed scale types are not susceptible
> to the floating point rounding errors as described in {*}NOTE 1{*}.
>
> Since SPARK uses *HALF_UP* rounding when performing a DOUBLE to DECIMAL
> casting when there is a lost in scale, hence, MOR's HoodieAvroUtils should
> also follow the same heuristics.
>
> {code:java}
> -- test HALF_UP rounding (verify that it does not use HALF_EVEN)
> > SELECT CAST(CAST("10.024" AS DOUBLE) AS DECIMAL(4, 2));
> 10.02
> > SELECT CAST(CAST("10.025" AS DOUBLE) AS DECIMAL(4, 2));
> 10.03
> > SELECT CAST(CAST("10.026" AS DOUBLE) AS DECIMAL(4, 2));
> 10.03
> -- test negative HALF_UP rounding (verify that it does not use HALF_EVEN)
> > SELECT CAST(CAST("-10.024" AS DOUBLE) AS DECIMAL(4, 2));
> -10.02
> > SELECT CAST(CAST("-10.025" AS DOUBLE) AS DECIMAL(4, 2));
> -10.03
> > SELECT CAST(CAST("-10.026" AS DOUBLE) AS DECIMAL(4, 2));
> -10.03
> -- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> > SELECT CAST(CAST("10.034" AS DOUBLE) AS DECIMAL(4, 2));
> 10.03
> > SELECT CAST(CAST("10.035" AS DOUBLE) AS DECIMAL(4, 2));
> 10.04
> > SELECT CAST(CAST("10.036" AS DOUBLE) AS DECIMAL(4, 2));
> 10.04
> -- test negative HALF_UP rounding (will return same result as HALF_EVEN)
> > SELECT CAST(CAST("-10.034" AS DOUBLE) AS DECIMAL(4, 2));
> -10.03
> > SELECT CAST(CAST("-10.035" AS DOUBLE) AS DECIMAL(4, 2));
> -10.04
> > SELECT CAST(CAST("-10.036" AS DOUBLE) AS DECIMAL(4, 2));
> -10.04{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)