[
https://issues.apache.org/jira/browse/HUDI-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
voon updated HUDI-6033:
-----------------------
Description:
This issue only exists in MOR tables.
When performing a FLOAT to DECIMAL(p, s) casting and when the a row's data has
a floating point/decimal placing that is larger than the provided scale (s),
the error below is thrown.
For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
DECIMAL(3, 2) will throw the error below when the row/column is required to be
read out.
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArithmeticException: Rounding necessary
at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
at java.math.BigDecimal.setScale(BigDecimal.java:2455)
at java.math.BigDecimal.setScale(BigDecimal.java:2515)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
at
org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
... 27 more {code}
This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*},
which is what we use internally when performing an unsafe projection.
Reference:
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289
To verify if *HALF_EVEN* should be used, one can run the query below in
Spark-SQL:
{code:java}
test("Test float to decimal schema evolution COW") {
withTempDir { tmp =>
// Create table with INMEMORY index to generate log only mor table.
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price float,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| tblproperties (
| primaryKey ='id',
| type = 'cow',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10.025, 1000)")
spark.sql("SET hoodie.schema.on.read.enable=true")
spark.sql(s"ALTER TABLE $tableName ALTER COLUMN price TYPE DECIMAL(4, 2)")
spark.sql(s"select id, name, price, ts from $tableName order by
id").show(false)
}
}{code}
It should return:
{code:java}
+---+----+-----+----+
|id |name|price|ts |
+---+----+-----+----+
|1 |a1 |10.02|1000|
+---+----+-----+----+ {code}
was:
This issue only exists in MOR tables.
When performing a FLOAT to DECIMAL(p, s) casting and when the a row's data has
a floating point/decimal placing that is larger than the provided scale (s),
the error below is thrown.
For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
DECIMAL(3, 2) will throw the error below when the row/column is required to be
read out.
{code:java}
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArithmeticException: Rounding necessary
at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
at java.math.BigDecimal.setScale(BigDecimal.java:2455)
at java.math.BigDecimal.setScale(BigDecimal.java:2515)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
at
org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
... 27 more {code}
This can be fixed by performing specifying the RoundingMode HALF_UP, which is
the default that Spark uses when performing a cast on COW tables.
To verify this, one can run the query below in Spark-SQL:
{code:java}
SELECT CAST("35.245" AS DECIMAL(4,2)); {code}
It should return:
{code:java}
35.25
{code}
Which is what HALF_UP rounding should return.
> Fix FLOAT to DECIMAL(p, s) when reading avro log files
> ------------------------------------------------------
>
> Key: HUDI-6033
> URL: https://issues.apache.org/jira/browse/HUDI-6033
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: voon
> Assignee: voon
> Priority: Major
>
> This issue only exists in MOR tables.
>
> When performing a FLOAT to DECIMAL(p, s) casting and when the a row's data
> has a floating point/decimal placing that is larger than the provided scale
> (s), the error below is thrown.
>
> For example, a float in 3 decimal place (dp), e.g. 3.123, when casted to
> DECIMAL(3, 2) will throw the error below when the row/column is required to
> be read out.
>
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
> log file
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
> at
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
> at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:326)
> at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:92)
> at
> org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.ArithmeticException: Rounding necessary
> at java.math.BigDecimal.commonNeedIncrement(BigDecimal.java:4179)
> at java.math.BigDecimal.needIncrement(BigDecimal.java:4235)
> at java.math.BigDecimal.divideAndRound(BigDecimal.java:4143)
> at java.math.BigDecimal.setScale(BigDecimal.java:2455)
> at java.math.BigDecimal.setScale(BigDecimal.java:2515)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1032)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:954)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:897)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:855)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:834)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:804)
> at
> org.apache.hudi.common.model.HoodieAvroIndexedRecord.rewriteRecordWithNewSchema(HoodieAvroIndexedRecord.java:123)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.lambda$composeEvolvedSchemaTransformer$5(AbstractHoodieLogRecordReader.java:848)
> at
> org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:634)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
> at
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366)
> ... 27 more {code}
> This can be fixed by performing specifying the RoundingMode {*}HALF_EVEN{*},
> which is what we use internally when performing an unsafe projection.
>
> Reference:
> https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala#L289
>
> To verify if *HALF_EVEN* should be used, one can run the query below in
> Spark-SQL:
> {code:java}
> test("Test float to decimal schema evolution COW") {
> withTempDir { tmp =>
> // Create table with INMEMORY index to generate log only mor table.
> val tableName = generateTableName
> spark.sql(
> s"""
> |create table $tableName (
> | id int,
> | name string,
> | price float,
> | ts long
> |) using hudi
> | location '${tmp.getCanonicalPath}'
> | tblproperties (
> | primaryKey ='id',
> | type = 'cow',
> | preCombineField = 'ts'
> | )
> """.stripMargin)
> spark.sql(s"insert into $tableName values(1, 'a1', 10.025, 1000)")
> spark.sql("SET hoodie.schema.on.read.enable=true")
> spark.sql(s"ALTER TABLE $tableName ALTER COLUMN price TYPE DECIMAL(4, 2)")
> spark.sql(s"select id, name, price, ts from $tableName order by
> id").show(false)
> }
> }{code}
> It should return:
> {code:java}
> +---+----+-----+----+
> |id |name|price|ts |
> +---+----+-----+----+
> |1 |a1 |10.02|1000|
> +---+----+-----+----+ {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)