[
https://issues.apache.org/jira/browse/HUDI-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-6033:
-----------------------
Description:
This issue only exists in MOR tables.
When performing a FLOAT to DECIMAL(p, s) casting and when the a row's data has
a floating point/decimal placing that is larger than the provided scale (s),
the error below is thrown.
For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
DECIMAL(3, 2) will throw the error below when the row/column is required to be
read out.
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArithmeticException: Rounding necessary
at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
at java.math.BigDecimal.setScale(BigDecimal.java:2455)
at java.math.BigDecimal.setScale(BigDecimal.java:2515)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
at
org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
... 27 more {code}
This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*},
which is what we use internally when performing an unsafe projection.
{*}Reference{*}:
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]
{*}NOTE:{*}{*}{*}
If the results of casting a FLOAT to DECIMAL type differs depending on the
table type used when reading on SPARK
COW tables will rely on COW's unsafe projection, and hence, Spark's casting.
MOR tables will rely on MOR's HoodieAvroUtils to perform the
{*}rewriteWithNewSchema{*}.
Floating point errors are hard to control given that different execution code
paths are used between COW and MOR, causing a discrepancy in the results.
Hence, for the test to verify this fix, no verification on the correctness of
results will be performed. As long as the table can be read without issue
(after performing schema evolution), the fix is deemed to be valid.
was:
This issue only exists in MOR tables.
When performing a FLOAT to DECIMAL(p, s) casting and when the a row's data has
a floating point/decimal placing that is larger than the provided scale (s),
the error below is thrown.
For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
DECIMAL(3, 2) will throw the error below when the row/column is required to be
read out.
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArithmeticException: Rounding necessary
at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
at java.math.BigDecimal.setScale(BigDecimal.java:2455)
at java.math.BigDecimal.setScale(BigDecimal.java:2515)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
at
org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
... 27 more {code}
This can be fixed by performing specifying the RoundingMode {*}XXX{*}, which is
what we use internally when performing an unsafe projection.
Reference:
[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]
To verify if *HALF_EVEN* should be used, one can run the query below in
Spark-SQL:
{code:java}
test("Test float to decimal schema evolution COW") {
withTempDir { tmp =>
// Create table with INMEMORY index to generate log only mor table.
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price float,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
| primaryKey ='id',
| type = 'cow',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10.025, 1000)")
spark.sql("SET hoodie.schema.on.read.enable=true")
spark.sql(s"ALTER TABLE $tableName ALTER COLUMN price TYPE DECIMAL(4, 2)")
spark.sql(s"select id, name, price, ts from $tableName order by
id").show(false)
}
}{code}
It should return:
{code:java}
+---+----+-----+----+
|id |name|price|ts |
+---+----+-----+----+
|1 |a1 |10.02|1000|
+---+----+-----+----+ {code}
> Fix FLOAT to DECIMAL(p, s) when reading avro log files
> ------------------------------------------------------
>
> Key: HUDI-6033
> URL: https://issues.apache.org/jira/browse/HUDI-6033
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: voon
> Assignee: voon
> Priority: Major
>
> This issue only exists in MOR tables.
>
> When performing a FLOAT to DECIMAL(p, s) casting and when the a row's data
> has a floating point/decimal placing that is larger than the provided scale
> (s), the error below is thrown.
>
> For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
> DECIMAL(3, 2) will throw the error below when the row/column is required to
> be read out.
>
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
> log file
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
> at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
> at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
> at
> org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.ArithmeticException: Rounding necessary
> at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
> at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
> at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
> at java.math.BigDecimal.setScale(BigDecimal.java:2455)
> at java.math.BigDecimal.setScale(BigDecimal.java:2515)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
> at
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
> at
> org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
> ... 27 more {code}
> This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*},
> which is what we use internally when performing an unsafe projection.
>
> {*}Reference{*}:
> [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289]
>
> {*}NOTE:{*}{*}{*}
> If the results of casting a FLOAT to DECIMAL type differs depending on the
> table type used when reading on SPARK
>
> COW tables will rely on COW's unsafe projection, and hence, Spark's casting.
> MOR tables will rely on MOR's HoodieAvroUtils to perform the
> {*}rewriteWithNewSchema{*}.
>
> Floating point errors are hard to control given that different execution code
> paths are used between COW and MOR, causing a discrepancy in the results.
>
> Hence, for the test to verify this fix, no verification on the correctness of
> results will be performed. As long as the table can be read without issue
> (after performing schema evolution), the fix is deemed to be valid.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)