nsivabalan commented on code in PR #12122:
URL: https://github.com/apache/hudi/pull/12122#discussion_r1807438965


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -110,6 +117,17 @@ protected <T> void processNextRecord(HoodieRecord<T> 
newRecord) throws IOExcepti
     }
   }
 
+  /**
+   * Processing time based delete is found when
+   * 1. The current record is a delete whose orderingVal is default value, or
+   * 2. The current record's metadata contains the flag: 
PROCESSING_TIME_BASED_DELETE_FOUND.
+   */
+  private <T> boolean hasProcessingTimeBasedDelete(HoodieRecord<T> record) 
throws IOException {
+    return (record.isDelete(readerSchema, getPayloadProps())

Review Comment:
   can we flip the comparison. lets first check 
(record.getMetadata().isPresent() && 
record.getMetaDataInfo(PROCESSING_TIME_BASED_DELETE_FOUND).isPresent()
   and then check for ordering value from the record. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestDeleteRecordLogic.scala:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestDeleteRecordLogic extends SparkClientFunctionalTestHarness{
+  val expected = Seq(
+    (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+    (-9, "4", "rider-DDDD", "driver-DDDD", 20.0, 1),
+    (10, "3", "rider-C", "driver-C", 33.9, 10),
+    (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+  @ParameterizedTest
+  @MethodSource(Array("provideParams"))
+  def testDeleteLogic(useFgReader: String, tableType: String, recordType: 
String): Unit = {
+    val sparkOpts: Map[String, String] = Map(
+      HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+      HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[DefaultSparkRecordMerger].getName)
+    val fgReaderOpts: Map[String, String] = Map(
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader)
+
+    val opts = if (recordType.equals("SPARK")) sparkOpts ++ fgReaderOpts else 
fgReaderOpts
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
+
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, 7),
+      (10, "2", "rider-B", "driver-B", 27.70, 1),
+      (10, "3", "rider-C", "driver-C", 33.90, 10),
+      (-1, "4", "rider-D", "driver-D", 34.15, 6),
+      (10, "5", "rider-E", "driver-E", 17.85, 10))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+
+    val updateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, 9),
+      (9, "2", "rider-Y", "driver-Y", 27.70, 7))
+    val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+    updates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+    val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+    deletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondUpdateData = Seq(
+      (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+      (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
+    val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondDeletesData = Seq(
+      (10, "4", "rider-D", "driver-D", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val secondDeletes = spark.createDataFrame(secondDeletesData).toDF(columns: 
_*)
+    secondDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdUpdateData = Seq((-8, "4", "rider-DDD", "driver-DDD", 20.00, 1))
+    val thirdUpdates = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    thirdUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdDeletesData = Seq(
+      (10, "4", "rider-D4", "driver-D4", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val thirdDeletes = spark.createDataFrame(thirdDeletesData).toDF(columns: 
_*)
+    thirdDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val fourUpdateData = Seq((-9, "4", "rider-DDDD", "driver-DDDD", 20.00, 1))

Review Comment:
   lets do a round of validation before compaction. and once after compaction. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestDeleteRecordLogic.scala:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestDeleteRecordLogic extends SparkClientFunctionalTestHarness{
+  val expected = Seq(
+    (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+    (-9, "4", "rider-DDDD", "driver-DDDD", 20.0, 1),
+    (10, "3", "rider-C", "driver-C", 33.9, 10),
+    (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+  @ParameterizedTest
+  @MethodSource(Array("provideParams"))
+  def testDeleteLogic(useFgReader: String, tableType: String, recordType: 
String): Unit = {
+    val sparkOpts: Map[String, String] = Map(
+      HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+      HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[DefaultSparkRecordMerger].getName)
+    val fgReaderOpts: Map[String, String] = Map(
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader)
+
+    val opts = if (recordType.equals("SPARK")) sparkOpts ++ fgReaderOpts else 
fgReaderOpts
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
+
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, 7),
+      (10, "2", "rider-B", "driver-B", 27.70, 1),
+      (10, "3", "rider-C", "driver-C", 33.90, 10),
+      (-1, "4", "rider-D", "driver-D", 34.15, 6),
+      (10, "5", "rider-E", "driver-E", 17.85, 10))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+
+    val updateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, 9),
+      (9, "2", "rider-Y", "driver-Y", 27.70, 7))
+    val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+    updates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+    val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+    deletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondUpdateData = Seq(
+      (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+      (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
+    val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondDeletesData = Seq(
+      (10, "4", "rider-D", "driver-D", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val secondDeletes = spark.createDataFrame(secondDeletesData).toDF(columns: 
_*)
+    secondDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdUpdateData = Seq((-8, "4", "rider-DDD", "driver-DDD", 20.00, 1))
+    val thirdUpdates = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    thirdUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdDeletesData = Seq(
+      (10, "4", "rider-D4", "driver-D4", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val thirdDeletes = spark.createDataFrame(thirdDeletesData).toDF(columns: 
_*)
+    thirdDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val fourUpdateData = Seq((-9, "4", "rider-DDDD", "driver-DDDD", 20.00, 1))
+    val fourUpdates = spark.createDataFrame(fourUpdateData).toDF(columns: _*)
+    fourUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(),

Review Comment:
   you may need to set hoodie.compact.inline.max.delta.commits = 1 here. so we 
know for sure compaction kicked in for MOR table. 
   just by setting HoodieCompactionConfig.INLINE_COMPACT.key() = true, 
compaction may not kick in. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestDeleteRecordLogic.scala:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestDeleteRecordLogic extends SparkClientFunctionalTestHarness{
+  val expected = Seq(
+    (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+    (-9, "4", "rider-DDDD", "driver-DDDD", 20.0, 1),
+    (10, "3", "rider-C", "driver-C", 33.9, 10),
+    (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+  @ParameterizedTest
+  @MethodSource(Array("provideParams"))
+  def testDeleteLogic(useFgReader: String, tableType: String, recordType: 
String): Unit = {
+    val sparkOpts: Map[String, String] = Map(
+      HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+      HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[DefaultSparkRecordMerger].getName)
+    val fgReaderOpts: Map[String, String] = Map(
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader)
+
+    val opts = if (recordType.equals("SPARK")) sparkOpts ++ fgReaderOpts else 
fgReaderOpts
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
+
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, 7),
+      (10, "2", "rider-B", "driver-B", 27.70, 1),
+      (10, "3", "rider-C", "driver-C", 33.90, 10),
+      (-1, "4", "rider-D", "driver-D", 34.15, 6),
+      (10, "5", "rider-E", "driver-E", 17.85, 10))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+
+    val updateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, 9),
+      (9, "2", "rider-Y", "driver-Y", 27.70, 7))
+    val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+    updates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+    val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+    deletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondUpdateData = Seq(
+      (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+      (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
+    val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondDeletesData = Seq(
+      (10, "4", "rider-D", "driver-D", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val secondDeletes = spark.createDataFrame(secondDeletesData).toDF(columns: 
_*)
+    secondDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdUpdateData = Seq((-8, "4", "rider-DDD", "driver-DDD", 20.00, 1))
+    val thirdUpdates = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    thirdUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdDeletesData = Seq(
+      (10, "4", "rider-D4", "driver-D4", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val thirdDeletes = spark.createDataFrame(thirdDeletesData).toDF(columns: 
_*)
+    thirdDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").

Review Comment:
   can you set HoodieCompactionConfig.INLINE_COMPACT.key() = false for these 
ingests. so that we know compaction does not kick in. 
   we added automatic compaction for spark data source writes (which is after 5 
commits). 
   So, lets ensure we do not trigger compaction unless we explicitly want to



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -278,14 +288,16 @@ protected Option<DeleteRecord> 
doProcessNextDeletedRecord(DeleteRecord deleteRec
           if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingVal)) {
             return Option.empty();
           }
-          Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
-          // Checks the ordering value does not equal to 0
-          // because we use 0 as the default value which means natural order
-          boolean chooseExisting = !deleteOrderingVal.equals(0)
-              && ReflectionUtils.isSameClass(existingOrderingVal, 
deleteOrderingVal)
-              && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
-          if (chooseExisting) {
-            // The DELETE message is obsolete if the old message has greater 
orderingVal.
+          Comparable deleteOrderingVal = deleteRecord.getOrderingValue() == 
null ? orderingFieldDefault : deleteRecord.getOrderingValue();
+          // Here existing record represents newer record with the same key, 
which can be a delete or non-delete record.
+          // Therefore, we should use event time based merging if possible. 
So, the newer record is returned if
+          // 1. the delete is processing time based, or
+          // 2. the delete is event time based, and has higher value.

Review Comment:
   minor. 
   2. delete is event time based, and the existing record has higher value. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/TypeCaster.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TypeCaster {
+  private static final Logger LOG = LoggerFactory.getLogger(TypeCaster.class);
+  private TypeCaster() {
+  }
+
+  public static Comparable<?> castValue(Comparable<?> value, Schema.Type 
newType) {
+    if (value instanceof Integer) {
+      Integer v = (Integer) value;
+      switch (newType) {
+        case INT:
+          return v;
+        case LONG:
+          return v.longValue();
+        case FLOAT:
+          return v.floatValue();
+        case DOUBLE:
+          return v.doubleValue();
+        case STRING:
+          return v.toString();
+        default:
+          LOG.warn("Cannot cast integer to {}", newType);
+          return null;
+      }
+    } else if (value instanceof Long) {
+      Long v = (Long) value;
+      switch (newType) {
+        case LONG:
+          return v;
+        case FLOAT:
+          return v.floatValue();
+        case DOUBLE:
+          return v.doubleValue();
+        case STRING:
+          return v.toString();
+        default:
+          LOG.warn("Cannot cast integer to {}", newType);
+          return null;
+      }
+    } else if (value instanceof Float) {
+      Float v = (Float) value;
+      switch (newType) {
+        case FLOAT:
+          return v;
+        case DOUBLE:
+          return v.doubleValue();
+        case STRING:
+          return v.toString();
+        default:
+          LOG.warn("Cannot cast integer to {}", newType);
+          return null;
+      }
+    } else if (value instanceof Double) {
+      Double v = (Double) value;
+      switch (newType) {
+        case DOUBLE:
+          return v;
+        case STRING:
+          return v.toString();
+        default:
+          LOG.warn("Cannot cast integer to {}", newType);
+          return null;
+      }
+    } else if (value instanceof String) {
+      String v = (String) value;
+      if (newType == Type.STRING) {
+        return v;
+      } else {
+        LOG.warn("Cannot cast integer to {}", newType);
+        return null;
+      }
+    } else {
+      throw new UnsupportedOperationException(

Review Comment:
   does this mean that we can only support ordering values of certain data 
types? I am not asking to fix it in this patch. just trying to gauge whats the 
limitation we have around this 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestDeleteRecordLogic.scala:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestDeleteRecordLogic extends SparkClientFunctionalTestHarness{
+  val expected = Seq(
+    (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+    (-9, "4", "rider-DDDD", "driver-DDDD", 20.0, 1),
+    (10, "3", "rider-C", "driver-C", 33.9, 10),
+    (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+  @ParameterizedTest
+  @MethodSource(Array("provideParams"))
+  def testDeleteLogic(useFgReader: String, tableType: String, recordType: 
String): Unit = {
+    val sparkOpts: Map[String, String] = Map(
+      HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+      HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[DefaultSparkRecordMerger].getName)
+    val fgReaderOpts: Map[String, String] = Map(
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader)
+
+    val opts = if (recordType.equals("SPARK")) sparkOpts ++ fgReaderOpts else 
fgReaderOpts
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
+
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, 7),
+      (10, "2", "rider-B", "driver-B", 27.70, 1),
+      (10, "3", "rider-C", "driver-C", 33.90, 10),
+      (-1, "4", "rider-D", "driver-D", 34.15, 6),
+      (10, "5", "rider-E", "driver-E", 17.85, 10))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+
+    val updateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, 9),
+      (9, "2", "rider-Y", "driver-Y", 27.70, 7))
+    val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+    updates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+    val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+    deletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondUpdateData = Seq(
+      (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+      (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
+    val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val secondDeletesData = Seq(
+      (10, "4", "rider-D", "driver-D", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val secondDeletes = spark.createDataFrame(secondDeletesData).toDF(columns: 
_*)
+    secondDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdUpdateData = Seq((-8, "4", "rider-DDD", "driver-DDD", 20.00, 1))
+    val thirdUpdates = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    thirdUpdates.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "upsert").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val thirdDeletesData = Seq(
+      (10, "4", "rider-D4", "driver-D4", 34.15, 6),
+      (0, "1", "rider-X", "driver-X", 19.10, 8))
+    val thirdDeletes = spark.createDataFrame(thirdDeletesData).toDF(columns: 
_*)
+    thirdDeletes.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(OPERATION.key(), "delete").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    val fourUpdateData = Seq((-9, "4", "rider-DDDD", "driver-DDDD", 20.00, 1))

Review Comment:
   and lets validate that in case of MOR table, compaction has not kicked in 
while we do the first validation. 
   
   and post compaction, lets validate that 1 compaction commit is seen. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to