This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 107cffca4af [HUDI-7908] HoodieFileGroupReader fails if preCombine and
partition fields are the same (#11473)
107cffca4af is described below
commit 107cffca4aff52140dc8a59e120cf9bf695e4979
Author: Vova Kolmakov <[email protected]>
AuthorDate: Sat Jun 29 12:42:57 2024 +0700
[HUDI-7908] HoodieFileGroupReader fails if preCombine and partition fields
are the same (#11473)
Co-authored-by: Vova Kolmakov <[email protected]>
---
...odieFileGroupReaderBasedParquetFileFormat.scala | 4 +-
.../spark/sql/hudi/dml/TestInsertTable.scala | 44 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 0ad2bda9cf2..f7c09b007be 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -107,7 +107,9 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
//dataSchema is not always right due to spark bugs
val partitionColumns = partitionSchema.fieldNames
- val dataSchema =
StructType(tableSchema.structTypeSchema.fields.filterNot(f =>
partitionColumns.contains(f.name)))
+ val preCombineField =
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
+ val dataSchema =
StructType(tableSchema.structTypeSchema.fields.filterNot(f =>
partitionColumns.contains(f.name)
+ && preCombineField.equals(f.name)))
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new
HadoopStorageConfiguration(hadoopConf).getInline
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index b46723e97bf..0b1d4ca8999 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -2587,6 +2587,50 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
+ test(s"Test INSERT INTO with upsert operation type") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq("mor").foreach { tableType =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ |location '${tmp.getCanonicalPath}/$tableName'
+ |""".stripMargin
+ )
+
+ // Test insert into with upsert operation type
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3',
3000, 30), (4, 'a4', 2000, 10), (5, 'a5', 3000, 20), (6, 'a6', 4000, 30)
+ | """.stripMargin
+ )
+ checkAnswer(s"select id, name, price, ts from $tableName where price
> 3000")(
+ Seq(6, "a6", 4000, 30)
+ )
+
+ // Test update
+ spark.sql(s"update $tableName set price = price + 1 where id = 6")
+ checkAnswer(s"select id, name, price, ts from $tableName where price
> 3000")(
+ Seq(6, "a6", 4001, 30)
+ )
+ }
+ }
+ }
+ }
+
test("Test Insert Into with extraMetadata") {
withTempDir { tmp =>
val tableName = generateTableName