This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cac1bea2c5b [HUDI-8902] Fix schema evolution from float to double for 
avro log blocks (#13289)
cac1bea2c5b is described below

commit cac1bea2c5bde002f78b2203f9295832c6831716
Author: Jon Vexler <[email protected]>
AuthorDate: Tue May 13 21:35:30 2025 -0400

    [HUDI-8902] Fix schema evolution from float to double for avro log blocks 
(#13289)
    
    * fix for avro log blocks and for vectorized reader
    * add test and revert vectorized change
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  3 +
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  | 87 +++++++++++++++++++++-
 2 files changed, 88 insertions(+), 2 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ca91a51b892..f1595d8100d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1296,6 +1296,9 @@ public class HoodieAvroUtils {
       case STRING:
       case BYTES:
         return needsRewriteToString(writerSchema, false);
+      case DOUBLE:
+        // To maintain precision, you need to convert Float -> String -> Double
+        return writerSchema.getType().equals(Schema.Type.FLOAT);
       default:
         return false;
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 16f20b74323..ee13b61dd71 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.ddl
 
 import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger, 
QuickstartUtils}
 import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.TableSchemaResolver
 import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -30,14 +30,17 @@ import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.streaming.OutputMode.Append
+import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, 
StringType, StructField, StructType}
 import org.junit.jupiter.api.Assertions.assertEquals
 
+import scala.Seq
 import scala.collection.JavaConverters._
 
 class TestSpark3DDL extends HoodieSparkSqlTestBase {
@@ -183,6 +186,86 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test float to double evolution") {
+    withTempDir { tmp =>
+      Seq(HoodieTableType.COPY_ON_WRITE, 
HoodieTableType.MERGE_ON_READ).foreach { tableType =>
+        val tableName = generateTableName
+        val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
+        spark.sql("set hoodie.schema.on.read.enable=false")
+
+        val structType = StructType(Array(
+          StructField("id", StringType, true),
+          StructField("ts", IntegerType, true),
+          StructField("partition", StringType, true),
+          StructField("col", FloatType, true)
+        ))
+
+        val data = Seq(Row("r1", 0, "p1", 1.01f), Row("r2", 0, "p1", 2.02f), 
Row("r3", 0, "p2", 3.03f))
+        val rowRdd: RDD[Row] = spark.sparkContext.parallelize(data)
+        val df = spark.createDataFrame(rowRdd, structType)
+        df.write.format("hudi")
+          .option("hoodie.datasource.write.recordkey.field", "id")
+          .option("hoodie.datasource.write.precombine.field", "ts")
+          .option("hoodie.datasource.write.partitionpath.field", "partition")
+          .option("hoodie.table.name", tableName)
+          .option("hoodie.datasource.write.table.type", tableType.name())
+          .mode(SaveMode.Overwrite)
+          .save(tablePath)
+
+        checkAnswer(spark.read.format("hudi").load(tablePath).select("id", 
"col").orderBy("id").collect())(
+          Seq("r1", 1.01f),
+          Seq("r2", 2.02f),
+          Seq("r3", 3.03f)
+        )
+
+
+        val data2 = Seq(Row("r2", 1, "p1", 2.03f))
+        val rowRdd2: RDD[Row] = spark.sparkContext.parallelize(data2)
+        val df2 = spark.createDataFrame(rowRdd2, structType)
+        df2.write.format("hudi")
+          .option("hoodie.datasource.write.recordkey.field", "id")
+          .option("hoodie.datasource.write.precombine.field", "ts")
+          .option("hoodie.datasource.write.partitionpath.field", "partition")
+          .option("hoodie.table.name", tableName)
+          .option("hoodie.datasource.write.table.type", tableType.name())
+          .mode(SaveMode.Append)
+          .save(tablePath)
+
+
+        checkAnswer(spark.read.format("hudi").load(tablePath).select("id", 
"col").orderBy("id").collect())(
+          Seq("r1", 1.01f),
+          Seq("r2", 2.03f),
+          Seq("r3", 3.03f)
+        )
+
+        val structType3 = StructType(Array(
+          StructField("id", StringType, true),
+          StructField("ts", IntegerType, true),
+          StructField("partition", StringType, true),
+          StructField("col", DoubleType, true)
+        ))
+
+        val data3 = Seq(Row("r1", 2, "p1", 1.000000000001d))
+        val rowRdd3: RDD[Row] = spark.sparkContext.parallelize(data3)
+        val df3 = spark.createDataFrame(rowRdd3, structType3)
+        df3.write.format("hudi")
+          .option("hoodie.datasource.write.recordkey.field", "id")
+          .option("hoodie.datasource.write.precombine.field", "ts")
+          .option("hoodie.datasource.write.partitionpath.field", "partition")
+          .option("hoodie.table.name", tableName)
+          .option("hoodie.datasource.write.table.type", tableType.name())
+          .mode(SaveMode.Append)
+          .save(tablePath)
+
+        checkAnswer(spark.read.format("hudi").load(tablePath).select("id", 
"col").orderBy("id").collect())(
+          Seq("r1", 1.000000000001d),
+          Seq("r2", 2.03d),
+          Seq("r3", 3.03d)
+        )
+      }
+    }
+  }
+
   test("Test Enable and Disable Schema on read") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>

Reply via email to