This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cac1bea2c5b [HUDI-8902] Fix schema evolution from float to double for
avro log blocks (#13289)
cac1bea2c5b is described below
commit cac1bea2c5bde002f78b2203f9295832c6831716
Author: Jon Vexler <[email protected]>
AuthorDate: Tue May 13 21:35:30 2025 -0400
[HUDI-8902] Fix schema evolution from float to double for avro log blocks
(#13289)
* fix for avro log blocks and for vectorized reader
* add test and revert vectorized change
---------
Co-authored-by: Jonathan Vexler <=>
---
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 3 +
.../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 87 +++++++++++++++++++++-
2 files changed, 88 insertions(+), 2 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ca91a51b892..f1595d8100d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1296,6 +1296,9 @@ public class HoodieAvroUtils {
case STRING:
case BYTES:
return needsRewriteToString(writerSchema, false);
+ case DOUBLE:
+ // To maintain precision, you need to convert Float -> String -> Double
+ return writerSchema.getType().equals(Schema.Type.FLOAT);
default:
return false;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 16f20b74323..ee13b61dd71 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.ddl
import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger,
QuickstartUtils}
import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -30,14 +30,17 @@ import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.streaming.OutputMode.Append
+import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType,
StringType, StructField, StructType}
import org.junit.jupiter.api.Assertions.assertEquals
+import scala.Seq
import scala.collection.JavaConverters._
class TestSpark3DDL extends HoodieSparkSqlTestBase {
@@ -183,6 +186,86 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}
+ test("Test float to double evolution") {
+ withTempDir { tmp =>
+ Seq(HoodieTableType.COPY_ON_WRITE,
HoodieTableType.MERGE_ON_READ).foreach { tableType =>
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ spark.sql("set hoodie.schema.on.read.enable=false")
+
+ val structType = StructType(Array(
+ StructField("id", StringType, true),
+ StructField("ts", IntegerType, true),
+ StructField("partition", StringType, true),
+ StructField("col", FloatType, true)
+ ))
+
+ val data = Seq(Row("r1", 0, "p1", 1.01f), Row("r2", 0, "p1", 2.02f),
Row("r3", 0, "p2", 3.03f))
+ val rowRdd: RDD[Row] = spark.sparkContext.parallelize(data)
+ val df = spark.createDataFrame(rowRdd, structType)
+ df.write.format("hudi")
+ .option("hoodie.datasource.write.recordkey.field", "id")
+ .option("hoodie.datasource.write.precombine.field", "ts")
+ .option("hoodie.datasource.write.partitionpath.field", "partition")
+ .option("hoodie.table.name", tableName)
+ .option("hoodie.datasource.write.table.type", tableType.name())
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ checkAnswer(spark.read.format("hudi").load(tablePath).select("id",
"col").orderBy("id").collect())(
+ Seq("r1", 1.01f),
+ Seq("r2", 2.02f),
+ Seq("r3", 3.03f)
+ )
+
+
+ val data2 = Seq(Row("r2", 1, "p1", 2.03f))
+ val rowRdd2: RDD[Row] = spark.sparkContext.parallelize(data2)
+ val df2 = spark.createDataFrame(rowRdd2, structType)
+ df2.write.format("hudi")
+ .option("hoodie.datasource.write.recordkey.field", "id")
+ .option("hoodie.datasource.write.precombine.field", "ts")
+ .option("hoodie.datasource.write.partitionpath.field", "partition")
+ .option("hoodie.table.name", tableName)
+ .option("hoodie.datasource.write.table.type", tableType.name())
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+
+ checkAnswer(spark.read.format("hudi").load(tablePath).select("id",
"col").orderBy("id").collect())(
+ Seq("r1", 1.01f),
+ Seq("r2", 2.03f),
+ Seq("r3", 3.03f)
+ )
+
+ val structType3 = StructType(Array(
+ StructField("id", StringType, true),
+ StructField("ts", IntegerType, true),
+ StructField("partition", StringType, true),
+ StructField("col", DoubleType, true)
+ ))
+
+ val data3 = Seq(Row("r1", 2, "p1", 1.000000000001d))
+ val rowRdd3: RDD[Row] = spark.sparkContext.parallelize(data3)
+ val df3 = spark.createDataFrame(rowRdd3, structType3)
+ df3.write.format("hudi")
+ .option("hoodie.datasource.write.recordkey.field", "id")
+ .option("hoodie.datasource.write.precombine.field", "ts")
+ .option("hoodie.datasource.write.partitionpath.field", "partition")
+ .option("hoodie.table.name", tableName)
+ .option("hoodie.datasource.write.table.type", tableType.name())
+ .mode(SaveMode.Append)
+ .save(tablePath)
+
+ checkAnswer(spark.read.format("hudi").load(tablePath).select("id",
"col").orderBy("id").collect())(
+ Seq("r1", 1.000000000001d),
+ Seq("r2", 2.03d),
+ Seq("r3", 3.03d)
+ )
+ }
+ }
+ }
+
test("Test Enable and Disable Schema on read") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>