This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d1fe9279c95 [HUDI-8832] Add merge mode test coverage for DML (#12610)
d1fe9279c95 is described below

commit d1fe9279c95dd1f4f02bfe03328fca7e2e90c7ec
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Sun Jan 26 09:21:39 2025 -0800

    [HUDI-8832] Add merge mode test coverage for DML (#12610)
---
 .../hudi/dml/TestMergeModeCommitTimeOrdering.scala | 247 ++++++++++++++++++
 .../hudi/dml/TestMergeModeEventTimeOrdering.scala  | 275 +++++++++++++++++++++
 2 files changed, 522 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
new file mode 100644
index 00000000000..a0af78797e6
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
+
+  Seq("mor").foreach { tableType =>
+    // [HUDI-8850] For COW commit time ordering does not work.
+    // Seq("cow", "mor").foreach { tableType =>
+    test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode") {
+      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0"
+      ) {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with COMMIT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts
+               | union all
+               | select 2, 'B', 20.0, 100
+             """.stripMargin)
+
+          // Verify inserting records with the same ts value are visible 
(COMMIT_TIME_ORDERING)
+          spark.sql(
+            s"""
+              | insert into $tableName
+              | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
+              | union all
+              | select 2, 'B_equal', 70.0, 100
+            """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 60.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Verify updating records with the same ts value are visible 
(COMMIT_TIME_ORDERING)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 100
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Verify inserting records with a lower ts value are visible 
(COMMIT_TIME_ORDERING)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
+               | union all
+               | select 2, 'B', 40.0, 99
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 30.0, 99),
+            Seq(2, "B", 40.0, 99)
+          )
+
+          // Verify updating records with a lower ts value are visible 
(COMMIT_TIME_ORDERING)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 98
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 50.0, 98),
+            Seq(2, "B", 40.0, 99)
+          )
+
+          // Verify inserting records with a higher ts value are visible 
(COMMIT_TIME_ORDERING)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
+               | union all
+               | select 2, 'B', 40.0, 101
+             """.stripMargin)
+
+          // Verify records with ts=101 are visible
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 30.0, 101),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Verify updating records with a higher ts value are visible 
(COMMIT_TIME_ORDERING)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 102
+               | where id = 1
+             """.stripMargin)
+
+          // Verify final state after all operations
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 50.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Delete record
+          spark.sql(s"delete from $tableName where id = 1")
+
+          // Verify deletion
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(2, "B", 40.0, 101)
+          )
+        })
+      }
+    }
+
+    test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType 
table") {
+      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0") {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with COMMIT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union 
all
+               | select 0, 'X', 20.0, 100 union all
+               | select 2, 'B', 20.0, 100 union all
+               | select 3, 'C', 30.0, 100 union all
+               | select 4, 'D', 40.0, 100 union all
+               | select 5, 'E', 50.0, 100 union all
+               | select 6, 'F', 60.0, 100
+             """.stripMargin)
+
+          // Merge operation - delete with higher, lower and equal ordering 
field value, all should take effect.
+          spark.sql(
+            s"""
+               | merge into $tableName t
+               | using (
+               |   select 1 as id, 'B2' as name, 25.0 as price, 101 as ts 
union all
+               |   select 2, '', 55.0, 99 as ts union all
+               |   select 0, '', 55.0, 100 as ts
+               | ) s
+               | on t.id = s.id
+               | when matched then delete
+             """.stripMargin)
+
+          // Merge operation - update with mixed ts values
+          spark.sql(
+            s"""
+               | merge into $tableName t
+               | using (
+               |   select 4 as id, 'D2' as name, 45.0 as price, 101 as ts 
union all
+               |   select 5, 'E2', 55.0, 99 as ts union all
+               |   select 6, 'F2', 65.0, 100 as ts
+               | ) s
+               | on t.id = s.id
+               | when matched then update set *
+             """.stripMargin)
+
+          // Verify state after merges
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(3, "C", 30.0, 100),
+            Seq(4, "D2", 45.0, 101),
+            Seq(5, "E2", 55.0, 99),
+            Seq(6, "F2", 65.0, 100)
+          )
+
+          // Insert new records through merge
+          spark.sql(
+            s"""
+               | merge into $tableName t
+               | using (
+               |   select 7 as id, 'D2' as name, 45.0 as price, 100 as ts 
union all
+               |   select 8, 'E2', 55.0, 100 as ts
+               | ) s
+               | on t.id = s.id
+               | when not matched then insert *
+             """.stripMargin)
+
+          // Verify final state
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(3, "C", 30.0, 100),
+            Seq(4, "D2", 45.0, 101),
+            Seq(5, "E2", 55.0, 99),
+            Seq(6, "F2", 65.0, 100),
+            Seq(7, "D2", 45.0, 100),
+            Seq(8, "E2", 55.0, 100)
+          )
+        })
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
new file mode 100644
index 00000000000..9a362dfe384
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase {
+
+  Seq("cow", "mor").foreach { tableType =>
+    test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") {
+      withSparkSqlSessionConfig(
+        "hoodie.merge.small.file.group.candidates.limit" -> "0",
+        DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true"
+      ) {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with EVENT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts
+               | union all
+               | select 2, 'B', 20.0, 100
+             """.stripMargin)
+
+          // Verify inserting records with the same ts value are visible
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
+               | union all
+               | select 2, 'B_equal', 70.0, 100
+            """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 60.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Verify updating records with the same ts value are visible
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 100
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Insert records with lower ts=99 (should not be visible)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
+               | union all
+               | select 2, 'B', 40.0, 99
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Update record with a lower ts=98 (should not be visible)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 98
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Insert records with higher ts=101 (should be visible)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
+               | union all
+               | select 2, 'B', 40.0, 101
+             """.stripMargin)
+
+          // Verify records with ts=101 are visible
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 30.0, 101),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Update with a higher ts=102 is visible
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 102
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 50.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Insert records with missing ts is not allowed
+          checkExceptionContain(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A_missing_ts' as name, 31.0 as price
+               | union all
+               | select 2, 'B_missing_ts', 41.0
+             """.stripMargin)("not enough data columns")
+
+          // Update with missing ts is the same as COMMIT_TIME_ORDERING 
(should be visible)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 53.0
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 53.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+          // Delete record with no ts.
+          // [HUDI-8851] For MOR we hit NPE
+          if (tableType.equals("cow")) {
+            spark.sql(s"delete from $tableName where id = 1")
+            // Verify deletion
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(2, "B", 40.0, 101)
+            )
+          }
+        })
+      }
+    }
+  }
+
+  Seq("mor").foreach { tableType =>
+    // [HUDI-8915]: COW MIT delete does not honor event time ordering.
+    //  Seq("cow", "mor").foreach { tableType =>
+    test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType 
table") {
+      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0") {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with EVENT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 0 as id, 'A0' as name, 0.0 as price, 100 as ts union 
all
+               | select 1, 'A', 10.0, 100 union all
+               | select 2, 'B', 20.0, 100 union all
+               | select 3, 'C', 30.0, 100 union all
+               | select 4, 'D', 40.0, 100 union all
+               | select 5, 'E', 50.0, 100 union all
+               | select 6, 'F', 60.0, 100
+             """.stripMargin)
+
+          // Merge operation - delete with arbitrary ts value (lower, equal 
and higher). Lower ts won't take effect.
+          spark.sql(
+            s"""
+               | merge into $tableName t
+               | using (
+               |   select 0 as id, 'B2' as name, 25.0 as price, 100 as ts 
union all
+               |   select 1 as id, 'B2' as name, 25.0 as price, 101 as ts 
union all
+               |   select 2 as id, 'B2' as name, 25.0 as price, 99 as ts
+               | ) s
+               | on t.id = s.id
+               | when matched then delete
+             """.stripMargin)
+
+          // Merge operation - update with mixed ts values (only equal or 
higher ts should take effect)
+          spark.sql(
+            s"""
+               | merge into $tableName t
+               | using (
+               |   select 4 as id, 'D2' as name, 45.0 as price, 101 as ts 
union all
+               |   select 5, 'E2', 55.0, 99 as ts union all
+               |   select 6, 'F2', 65.0, 100 as ts
+               | ) s
+               | on t.id = s.id
+               | when matched then update set *
+             """.stripMargin)
+
+          // Verify state after merges
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(2, "B", 20.0, 100),
+            Seq(3, "C", 30.0, 100),
+            Seq(4, "D2", 45.0, 101),
+            Seq(5, "E", 50.0, 100),
+            Seq(6, "F2", 65.0, 100)
+          )
+
+          // Insert new records through merge
+          spark.sql(
+            s"""
+               | merge into $tableName t
+               | using (
+               |   select 7 as id, 'G' as name, 70.0 as price, 99 as ts union 
all
+               |   select 8, 'H', 80.0, 99 as ts
+               | ) s
+               | on t.id = s.id
+               | when not matched then insert *
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(2, "B", 20.0, 100),
+            Seq(3, "C", 30.0, 100),
+            Seq(4, "D2", 45.0, 101),
+            Seq(5, "E", 50.0, 100),
+            Seq(6, "F2", 65.0, 100),
+            Seq(7, "G", 70.0, 99),
+            Seq(8, "H", 80.0, 99)
+          )
+        })
+      }
+    }
+  }
+}

Reply via email to