nsivabalan commented on code in PR #12610:
URL: https://github.com/apache/hudi/pull/12610#discussion_r1925991077


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase {
+
+  Seq("cow", "mor").foreach { tableType =>
+    test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") {
+      withSparkSqlSessionConfig(
+        "hoodie.merge.small.file.group.candidates.limit" -> "0",
+        DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true"
+      ) {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with EVENT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts
+               | union all
+               | select 2, 'B', 20.0, 100
+             """.stripMargin)
+
+          // Verify inserting records with the same ts value are visible
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
+               | union all
+               | select 2, 'B_equal', 70.0, 100
+            """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 60.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Verify updating records with the same ts value are visible
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 100
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Insert records with lower ts=99 (should not be visible)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
+               | union all
+               | select 2, 'B', 40.0, 99
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Update record with a lower ts=98 (should not be visible)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 98
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Insert records with higher ts=101 (should be visible)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
+               | union all
+               | select 2, 'B', 40.0, 101
+             """.stripMargin)
+
+          // Verify records with ts=101 are visible
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 30.0, 101),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Update with a higher ts=102 is visible
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 102
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 50.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Insert records with missing ts is not allowed
+          checkExceptionContain(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A_missing_ts' as name, 31.0 as price
+               | union all
+               | select 2, 'B_missing_ts', 41.0
+             """.stripMargin)("not enough data columns")
+
+          // Update with missing ts is the same as COMMIT_TIME_ORDERING 
(should be visible)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 53.0
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 53.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+          // Delete record with no ts.
+          // [HUDI-8851] For MOR we hit NPE
+          if (tableType.equals("cow")) {
+            spark.sql(s"delete from $tableName where id = 1")
+            // Verify deletion
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(2, "B", 40.0, 101)

Review Comment:
   I feel, we need to fix HUDI-8851 for 1.0.1 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase {
+
+  Seq("cow", "mor").foreach { tableType =>
+    test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") {
+      withSparkSqlSessionConfig(
+        "hoodie.merge.small.file.group.candidates.limit" -> "0",
+        DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true"
+      ) {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with EVENT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts
+               | union all
+               | select 2, 'B', 20.0, 100
+             """.stripMargin)
+
+          // Verify inserting records with the same ts value are visible
+          spark.sql(
+            s"""
+               | insert into $tableName

Review Comment:
   looks like there is some gap here. 
   insert operation in spark-sql translated to WriteOperation.INSERT and not 
WriteOperation.UPSERT. 
   So, if its INSERT, we should not dedup the records. 
   So, if you ingest the same set of records again, we should see duplicate 
records after 2nd ingest. 
   
   we can take that up separately. but we are not honoring 
`hoodie.merge.allow.duplicate.on.inserts` properly.  



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
+
+  Seq("mor").foreach { tableType =>
+    // [HUDI-8850] For COW commit time ordering does not work.

Review Comment:
   I am curious as to why commit time ordering does not work w/ COW 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase {
+
+  Seq("cow", "mor").foreach { tableType =>
+    test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") {
+      withSparkSqlSessionConfig(
+        "hoodie.merge.small.file.group.candidates.limit" -> "0",
+        DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true"
+      ) {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with EVENT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts
+               | union all
+               | select 2, 'B', 20.0, 100
+             """.stripMargin)
+
+          // Verify inserting records with the same ts value are visible
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
+               | union all
+               | select 2, 'B_equal', 70.0, 100
+            """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 60.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Verify updating records with the same ts value are visible
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 100
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Insert records with lower ts=99 (should not be visible)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
+               | union all
+               | select 2, 'B', 40.0, 99
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Update record with a lower ts=98 (should not be visible)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 98
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A_equal", 50.0, 100),
+            Seq(2, "B_equal", 70.0, 100)
+          )
+
+          // Insert records with higher ts=101 (should be visible)
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
+               | union all
+               | select 2, 'B', 40.0, 101
+             """.stripMargin)
+
+          // Verify records with ts=101 are visible
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 30.0, 101),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Update with a higher ts=102 is visible
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 50.0, ts = 102
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 50.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+
+          // Insert records with missing ts is not allowed
+          checkExceptionContain(
+            s"""
+               | insert into $tableName
+               | select 1 as id, 'A_missing_ts' as name, 31.0 as price
+               | union all
+               | select 2, 'B_missing_ts', 41.0
+             """.stripMargin)("not enough data columns")
+
+          // Update with missing ts is the same as COMMIT_TIME_ORDERING 
(should be visible)
+          spark.sql(
+            s"""
+               | update $tableName
+               | set price = 53.0
+               | where id = 1
+             """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+            Seq(1, "A", 53.0, 102),
+            Seq(2, "B", 40.0, 101)
+          )
+          // Delete record with no ts.
+          // [HUDI-8851] For MOR we hit NPE
+          if (tableType.equals("cow")) {
+            spark.sql(s"delete from $tableName where id = 1")
+            // Verify deletion
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(2, "B", 40.0, 101)
+            )
+          }
+        })
+      }
+    }
+
+    test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType 
table") {
+      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0") {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create table with EVENT_TIME_ORDERING
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               | ) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               | )
+               | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+          // Insert initial records with ts=100
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | select 0 as id, 'A0' as name, 0.0 as price, 100 as ts union 
all
+               | select 1, 'A', 10.0, 100 union all
+               | select 2, 'B', 20.0, 100 union all
+               | select 3, 'C', 30.0, 100 union all
+               | select 4, 'D', 40.0, 100 union all
+               | select 5, 'E', 50.0, 100 union all
+               | select 6, 'F', 60.0, 100
+             """.stripMargin)
+
+          // Merge operation - delete with an equal or higher ts (should work)
+          spark.sql(

Review Comment:
   do we have delete w/ a lower ordering value case covered? can you point me 
to that. if not, can we add them please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to