This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 107cffca4af [HUDI-7908] HoodieFileGroupReader fails if preCombine and 
partition fields are the same (#11473)
107cffca4af is described below

commit 107cffca4aff52140dc8a59e120cf9bf695e4979
Author: Vova Kolmakov <[email protected]>
AuthorDate: Sat Jun 29 12:42:57 2024 +0700

    [HUDI-7908] HoodieFileGroupReader fails if preCombine and partition fields 
are the same (#11473)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  4 +-
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 44 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 0ad2bda9cf2..f7c09b007be 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -107,7 +107,9 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
     //dataSchema is not always right due to spark bugs
     val partitionColumns = partitionSchema.fieldNames
-    val dataSchema = 
StructType(tableSchema.structTypeSchema.fields.filterNot(f => 
partitionColumns.contains(f.name)))
+    val preCombineField = 
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
+    val dataSchema = 
StructType(tableSchema.structTypeSchema.fields.filterNot(f => 
partitionColumns.contains(f.name)
+      && preCombineField.equals(f.name)))
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedStorageConf = new 
HadoopStorageConfiguration(hadoopConf).getInline
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index b46723e97bf..0b1d4ca8999 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -2587,6 +2587,50 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     })
   }
 
+  test(s"Test INSERT INTO with upsert operation type") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir { tmp =>
+        Seq("mor").foreach { tableType =>
+          val tableName = generateTableName
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  ts long,
+               |  price int
+               |) using hudi
+               |partitioned by (ts)
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               |)
+               |location '${tmp.getCanonicalPath}/$tableName'
+               |""".stripMargin
+          )
+
+          // Test insert into with upsert operation type
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 
3000, 30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
+               | """.stripMargin
+          )
+          checkAnswer(s"select id, name, price, ts from $tableName where price 
> 3000")(
+            Seq(6, "a6", 4000, 30)
+          )
+
+          // Test update
+          spark.sql(s"update $tableName set price = price + 1 where id = 6")
+          checkAnswer(s"select id, name, price, ts from $tableName where price 
> 3000")(
+            Seq(6, "a6", 4001, 30)
+          )
+        }
+      }
+    }
+  }
+
   test("Test Insert Into with extraMetadata") {
     withTempDir { tmp =>
       val tableName = generateTableName

Reply via email to