This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f43b231763 [HUDI-9170] Fixing schema projection with file group 
reader (#12970)
1f43b231763 is described below

commit 1f43b231763a978bef8d340a654e9f6287241ec9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Mar 17 20:23:27 2025 -0700

    [HUDI-9170] Fixing schema projection with file group reader (#12970)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../read/HoodieFileGroupReaderSchemaHandler.java   |   7 +-
 .../read/TestHoodieFileGroupRecordBuffer.java      |  81 +++++++++++++
 .../TestHoodieDatasetBulkInsertHelper.java         |   2 +-
 .../apache/hudi/testutils/DataSourceTestUtils.java |  33 +++--
 .../src/test/resources/exampleEvolvedSchema.txt    |   5 +
 .../src/test/resources/exampleSchema.txt           |   5 +
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |   2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  41 +------
 .../apache/hudi/functional/TestMORDataSource.scala |  46 -------
 .../hudi/functional/TestSparkDataSource.scala      | 135 ++++++++++++++++++++-
 10 files changed, 257 insertions(+), 100 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 714d43784be..f007f9ef400 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -150,7 +150,8 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     return 
AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema, 
internalSchema);
   }
 
-  private Schema generateRequiredSchema() {
+  @VisibleForTesting
+  Schema generateRequiredSchema() {
     //might need to change this if other queries than mor have mandatory fields
     if (!needsMORMerge) {
       return requestedSchema;
@@ -204,6 +205,10 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
         requiredFields.add(preCombine);
       }
     }
+
+    if (dataSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) {
+      requiredFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+    }
     return requiredFields.toArray(new String[0]);
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
index 65683bd889a..ca3a09d96df 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
@@ -19,10 +19,25 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
 import static 
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.getOrderingValue;
@@ -49,6 +64,72 @@ public class TestHoodieFileGroupRecordBuffer {
     assertEquals(convertedValue, getOrderingValue(readerContext, 
deleteRecord));
   }
 
+  @ParameterizedTest
+  @CsvSource({
+      "true, true, EVENT_TIME_ORDERING",
+      "true, false, EVENT_TIME_ORDERING",
+      "false, true, EVENT_TIME_ORDERING",
+      "false, false, EVENT_TIME_ORDERING",
+      "true, true, COMMIT_TIME_ORDERING",
+      "true, false, COMMIT_TIME_ORDERING",
+      "false, true, COMMIT_TIME_ORDERING",
+      "false, false, COMMIT_TIME_ORDERING",
+      "true, true, CUSTOM",
+      "true, false, CUSTOM",
+      "false, true, CUSTOM",
+      "false, false, CUSTOM"
+  })
+  public void testSchemaForMandatoryFields(boolean setPrecombine, boolean 
addHoodieIsDeleted, RecordMergeMode mergeMode) {
+    HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+    when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+    when(readerContext.getHasLogFiles()).thenReturn(true);
+    HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+    when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+    when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+    String preCombineField = "ts";
+    List<String> dataSchemaFields = new ArrayList<>();
+    dataSchemaFields.addAll(Arrays.asList(
+        HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField,
+        "colA", "colB", "colC", "colD"));
+    if (addHoodieIsDeleted) {
+      dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+    }
+
+    Schema dataSchema = getSchema(dataSchemaFields);
+    Schema requestedSchema = 
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+
+    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+    when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+    when(tableConfig.populateMetaFields()).thenReturn(true);
+    when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ? 
preCombineField : StringUtils.EMPTY_STRING);
+
+    TypedProperties props = new TypedProperties();
+    HoodieFileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new 
HoodieFileGroupReaderSchemaHandler(readerContext,
+        dataSchema, requestedSchema, Option.empty(), tableConfig, props);
+    List<String> expectedFields = new ArrayList();
+    expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+    if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) { 
// commit time ordering does not project ordering field.
+      expectedFields.add(preCombineField);
+    }
+    if (addHoodieIsDeleted) {
+      expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+    }
+    Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema : 
getSchema(expectedFields);
+    Schema actualSchema = 
fileGroupReaderSchemaHandler.generateRequiredSchema();
+    assertEquals(expectedSchema, actualSchema);
+  }
+
+  private Schema getSchema(List<String> fields) {
+    SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler = 
SchemaBuilder.builder().record("test_schema")
+        .namespace("test_namespace").fields();
+    for (String field : fields) {
+      schemaFieldAssembler = 
schemaFieldAssembler.name(field).type().stringType().noDefault();
+    }
+    return schemaFieldAssembler.endRecord();
+  }
+
   private void mockDeleteRecord(DeleteRecord deleteRecord,
                                 Comparable orderingValue) {
     when(deleteRecord.getOrderingValue()).thenReturn(orderingValue);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index bb24ee0e52a..21b1a9be870 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -207,7 +207,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
             .withPreCombineField("ts").build();
     List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
     Dataset<Row> toUpdateDataset = 
sqlContext.createDataFrame(inserts.subList(0, 5), structType);
-    List<Row> updates = 
DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset);
+    List<Row> updates = 
DataSourceTestUtils.updateRowsWithUpdatedTs(toUpdateDataset);
     List<Row> rows = new ArrayList<>();
     rows.addAll(inserts);
     rows.addAll(updates);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 08a82284898..4c371136dbc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -68,10 +68,11 @@ public class DataSourceTestUtils {
     List<Row> toReturn = new ArrayList<>();
     List<String> partitions = Arrays.asList(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH});
     for (int i = 0; i < count; i++) {
-      Object[] values = new Object[3];
+      Object[] values = new Object[4];
       values[0] = 
HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
       values[1] = partitions.get(RANDOM.nextInt(3));
       values[2] = new Date().getTime();
+      values[3] = false;
       toReturn.add(RowFactory.create(values));
     }
     return toReturn;
@@ -80,10 +81,11 @@ public class DataSourceTestUtils {
   public static List<Row> generateRandomRowsByPartition(int count, String 
partition) {
     List<Row> toReturn = new ArrayList<>();
     for (int i = 0; i < count; i++) {
-      Object[] values = new Object[3];
+      Object[] values = new Object[4];
       values[0] = 
HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
       values[1] = partition;
       values[2] = new Date().getTime();
+      values[3] = false;
       toReturn.add(RowFactory.create(values));
     }
     return toReturn;
@@ -92,10 +94,11 @@ public class DataSourceTestUtils {
   public static List<Row> generateUpdates(List<Row> records, int count) {
     List<Row> toReturn = new ArrayList<>();
     for (int i = 0; i < count; i++) {
-      Object[] values = new Object[3];
+      Object[] values = new Object[4];
       values[0] = records.get(i).getString(0);
       values[1] = records.get(i).getAs(1);
       values[2] = new Date().getTime();
+      values[3] = false;
       toReturn.add(RowFactory.create(values));
     }
     return toReturn;
@@ -119,24 +122,36 @@ public class DataSourceTestUtils {
     List<Row> toReturn = new ArrayList<>();
     List<String> partitions = Arrays.asList(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH});
     for (int i = 0; i < count; i++) {
-      Object[] values = new Object[4];
+      Object[] values = new Object[5];
       values[0] = UUID.randomUUID().toString();
       values[1] = partitions.get(RANDOM.nextInt(3));
       values[2] = new Date().getTime();
-      values[3] = UUID.randomUUID().toString();
+      values[3] = false;
+      values[4] = UUID.randomUUID().toString();
       toReturn.add(RowFactory.create(values));
     }
     return toReturn;
   }
 
-  public static List<Row> updateRowsWithHigherTs(Dataset<Row> inputDf) {
+  public static List<Row> updateRowsWithUpdatedTs(Dataset<Row> inputDf) {
+    return updateRowsWithUpdatedTs(inputDf, false, false);
+  }
+
+  public static List<Row> updateRowsWithUpdatedTs(Dataset<Row> inputDf, 
Boolean lowerTs, Boolean updatePartitionPath) {
     List<Row> input = inputDf.collectAsList();
     List<Row> rows = new ArrayList<>();
     for (Row row : input) {
-      Object[] values = new Object[3];
+      Object[] values = new Object[4];
       values[0] = row.getAs("_row_key");
-      values[1] = row.getAs("partition");
-      values[2] = ((Long) row.getAs("ts")) + RANDOM.nextInt(1000);
+      String partition = row.getAs("partition");
+      if (updatePartitionPath) {
+        values[1] = partition.equals(DEFAULT_FIRST_PARTITION_PATH) ? 
DEFAULT_SECOND_PARTITION_PATH :
+            (partition.equals(DEFAULT_SECOND_PARTITION_PATH) ? 
DEFAULT_THIRD_PARTITION_PATH : DEFAULT_FIRST_PARTITION_PATH);
+      } else {
+        values[1] = partition;
+      }
+      values[2] = ((Long) row.getAs("ts")) + (lowerTs ? (-1 - 
RANDOM.nextInt(1000)) : RANDOM.nextInt(1000));
+      values[3] = false;
       rows.add(RowFactory.create(values));
     }
     return rows;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt 
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
index 5fcddac6a84..2db0ff3428d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
@@ -32,6 +32,11 @@
              "name": "ts",
              "type": ["long", "null"]
          },
+         {
+             "name": "_hoodie_is_deleted",
+             "type": ["boolean", "null"],
+             "default" : false
+         },
          {
              "name": "new_field",
              "type": ["string","null"]
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt 
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
index c7c0ff73792..a311dc68543 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleSchema.txt
@@ -31,6 +31,11 @@
         {
             "name": "ts",
             "type": ["long", "null"]
+        },
+        {
+            "name": "_hoodie_is_deleted",
+            "type": ["boolean", "null"],
+            "default" : false
         }
     ]
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index ec9cf372912..4f4854d497c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -89,7 +89,7 @@ class TestHoodieSparkSqlWriter extends 
HoodieSparkWriterTestBase {
 
     // add some updates so that preCombine kicks in
     val toUpdateDataset = 
sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), 
structType)
-    val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset)
+    val updates = DataSourceTestUtils.updateRowsWithUpdatedTs(toUpdateDataset)
     val records = inserts.asScala.union(updates.asScala)
     val recordsSeq = convertRowListToSeq(records.asJava)
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 687314940ea..164103e0e77 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -33,15 +33,13 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, 
TimelineUtils}
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
-import 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
-import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR
+import 
org.apache.hudi.common.testutils.HoodieTestUtils.{INSTANT_FILE_NAME_GENERATOR, 
INSTANT_GENERATOR}
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
 import org.apache.hudi.common.util.{ClusteringUtils, Option}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.metrics.HoodieMetricsConfig
 import org.apache.hudi.exception.{HoodieException, 
SchemaBackwardsCompatibilityException}
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
-import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, 
GlobalDeleteKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator, 
TimestampBasedKeyGenerator}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -55,7 +53,7 @@ import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.sql.{DataFrame, DataFrameWriter, Dataset, Encoders, 
Row, SaveMode, SparkSession, SparkSessionExtensions}
 import org.apache.spark.sql.functions.{col, concat, lit, udf, when}
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
-import org.apache.spark.sql.types.{ArrayType, BooleanType, DataTypes, 
DateType, IntegerType, LongType, MapType, StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes, DateType, 
IntegerType, LongType, MapType, StringType, StructField, StructType, 
TimestampType}
 import org.joda.time.DateTime
 import org.joda.time.format.DateTimeFormat
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
@@ -1415,41 +1413,6 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals(3, snapshotDF1.select("partition").distinct().count())
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testHoodieIsDeletedCOW(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
-
-    val numRecords = 100
-    val numRecordsToDelete = 2
-    val records0 = recordsToStrings(dataGen.generateInserts("000", 
numRecords)).asScala.toList
-    val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
-    df0.write.format("org.apache.hudi")
-      .options(writeOpts)
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val snapshotDF0 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .load(basePath)
-    assertEquals(numRecords, snapshotDF0.count())
-
-    val df1 = snapshotDF0.limit(numRecordsToDelete)
-    val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
-    val df2 = convertColumnsToNullable(
-      dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)),
-      "_hoodie_is_deleted"
-    )
-    df2.write.format("org.apache.hudi")
-      .options(writeOpts)
-      .mode(SaveMode.Append)
-      .save(basePath)
-    val snapshotDF2 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .load(basePath)
-    assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
-  }
-
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
   def testWriteSmallPrecisionDecimalTable(recordType: HoodieRecordType): Unit 
= {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index a4e240c57fc..2bb27a1a7fe 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -31,7 +31,6 @@ import 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util.Option
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
-import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
 import org.apache.hudi.index.HoodieIndex.IndexType
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists, 
PARTITION_NAME_SECONDARY_INDEX_PREFIX}
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
@@ -43,7 +42,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
-import org.apache.spark.sql.types.BooleanType
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
@@ -1192,50 +1190,6 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
       spark.read.format("hudi").load(basePath).count())
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testHoodieIsDeletedMOR(recordType: HoodieRecordType): Unit =  {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
-
-    val numRecords = 100
-    val numRecordsToDelete = 2
-    val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
-    val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", 
numRecords, schema)).asScala.toSeq
-    val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
-    inputDF0.write.format("org.apache.hudi")
-      .options(writeOpts)
-      .option("hoodie.compact.inline", "false")
-      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val snapshotDF0 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
-      .load(basePath)
-    assertEquals(numRecords, snapshotDF0.count())
-
-    val df1 = snapshotDF0.limit(numRecordsToDelete)
-    val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
-
-    val df2 = convertColumnsToNullable(
-      dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)),
-      "_hoodie_is_deleted"
-    )
-    df2.write.format("org.apache.hudi")
-      .options(writeOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .mode(SaveMode.Append)
-      .save(basePath)
-
-    val snapshotDF2 = spark.read.format("org.apache.hudi")
-      .options(readOpts)
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
-      .load(basePath)
-    assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
-  }
-
   /**
    * This tests the case that query by with a specified partition condition on 
hudi table which is
    * different between the value of the partition field and the actual 
partition path,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 8873ada92d3..d891ed7c302 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -19,19 +19,24 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
-import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
+import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
SparkClientFunctionalTestHarness}
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.types.StructType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
@@ -294,6 +299,127 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     inputDf2.unpersist(true)
   }
 
+  @ParameterizedTest
+  @CsvSource(value = Array("COPY_ON_WRITE,8,EVENT_TIME_ORDERING,RECORD_INDEX",
+    "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,RECORD_INDEX",
+    "COPY_ON_WRITE,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE",
+    "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE",
+    "MERGE_ON_READ,8,EVENT_TIME_ORDERING,RECORD_INDEX",
+    "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,RECORD_INDEX",
+    "MERGE_ON_READ,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE",
+    "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE"))
+  def testDeletesWithHoodieIsDeleted(tableType: HoodieTableType, tableVersion: 
Int, mergeMode: RecordMergeMode, indexType: IndexType): Unit = {
+    var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+    writeOpts = writeOpts ++ Map("hoodie.write.table.version" -> 
tableVersion.toString,
+      "hoodie.datasource.write.table.type" -> tableType.name(),
+      "hoodie.datasource.write.precombine.field" -> "ts",
+      "hoodie.write.record.merge.mode" -> mergeMode.name(),
+      "hoodie.index.type" -> indexType.name(),
+      "hoodie.metadata.record.index.enable" -> "true",
+      "hoodie.record.index.update.partition.path" -> "true",
+      "hoodie.parquet.small.file.limit" -> "0")
+
+    writeOpts = writeOpts + (if (indexType == IndexType.RECORD_INDEX) {
+      "hoodie.record.index.update.partition.path" -> "true"
+    } else {
+      "hoodie.simple.index.update.partition.path" -> "true"
+    })
+
+    // generate the inserts
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val inserts = DataSourceTestUtils.generateRandomRows(400)
+    val df = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(inserts)),
 structType)
+
+    df.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val hudiSnapshotDF1 = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+    assertEquals(400, hudiSnapshotDF1.count())
+
+    // ingest batch2 with mix of updates and deletes. some of them are 
updating same partition, some of them are moving to new partition.
+    // some are having higher ts and some are having lower ts.
+    ingestNewBatch(tableType, 200, structType, inserts.subList(0, 200), 
writeOpts)
+
+    val expectedRecordCount2 = if (mergeMode == 
RecordMergeMode.EVENT_TIME_ORDERING) 350 else 300;
+    val hudiSnapshotDF2 = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+    assertEquals(expectedRecordCount2, hudiSnapshotDF2.count())
+
+    // querying subset of column. even if not including _hoodie_is_deleted, 
snapshot read should return right data.
+    assertEquals(expectedRecordCount2, spark.read.format("hudi")
+      .options(readOpts).load(basePath).select("_hoodie_record_key", 
"_hoodie_partition_path").count())
+
+    // ingest batch3 with mix of updates and deletes. some of them are 
updating same partition, some of them are moving to new partition.
+    // some are having higher ts and some are having lower ts.
+    ingestNewBatch(tableType, 200, structType, inserts.subList(200, 400), 
writeOpts)
+
+    val expectedRecordCount3 = if (mergeMode == 
RecordMergeMode.EVENT_TIME_ORDERING) 300 else 200;
+    val hudiSnapshotDF3 = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+    assertEquals(expectedRecordCount3, hudiSnapshotDF3.count())
+
+    // querying subset of column. even if not including _hoodie_is_deleted, 
snapshot read should return right data.
+    assertEquals(expectedRecordCount3, spark.read.format("hudi")
+      .options(readOpts).load(basePath).select("_hoodie_record_key", 
"_hoodie_partition_path").count())
+  }
+
+  def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer, 
structType: StructType, inserts: java.util.List[Row],
+                     writeOpts: Map[String, String]): Unit = {
+    val toUpdate = 
sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 
recordsToUpdate), structType).collectAsList()
+
+    val updateToSamePartitionHigherTs = 
sqlContext.createDataFrame(toUpdate.subList(0, recordsToUpdate / 4), structType)
+    val rowsToUpdate1 = 
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToSamePartitionHigherTs)
+    val updates1 = rowsToUpdate1.subList(0, recordsToUpdate / 8)
+    val updateDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates1)),
 structType)
+    val deletes1 = rowsToUpdate1.subList(recordsToUpdate / 8, recordsToUpdate 
/ 4)
+    val deleteDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes1)),
 structType)
+    val batch1 = deleteDf1.withColumn("_hoodie_is_deleted", 
lit(true)).union(updateDf1)
+    batch1.cache()
+
+    val updateToDiffPartitionHigherTs = 
sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate / 4, 
recordsToUpdate / 2), structType)
+    val rowsToUpdate2 = 
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToDiffPartitionHigherTs, 
false, true)
+    val updates2 = rowsToUpdate2.subList(0, recordsToUpdate / 8)
+    val updateDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates2)),
 structType)
+    val deletes2 = rowsToUpdate2.subList(recordsToUpdate / 8, recordsToUpdate 
/ 4)
+    val deleteDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes2)),
 structType)
+    val batch2 = deleteDf2.withColumn("_hoodie_is_deleted", 
lit(true)).union(updateDf2)
+    batch2.cache()
+
+    val updateToSamePartitionLowerTs = 
sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate / 2, 
recordsToUpdate * 3 / 4), structType)
+    val rowsToUpdate3 = 
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToSamePartitionLowerTs, true, 
false)
+    val updates3 = rowsToUpdate3.subList(0, recordsToUpdate / 8)
+    val updateDf3 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates3)),
 structType)
+    val deletes3 = rowsToUpdate3.subList(recordsToUpdate / 8, recordsToUpdate 
/ 4)
+    val deleteDf3 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes3)),
 structType)
+    val batch3 = deleteDf3.withColumn("_hoodie_is_deleted", 
lit(true)).union(updateDf3)
+    batch3.cache()
+
+    val updateToDiffPartitionLowerTs = 
sqlContext.createDataFrame(toUpdate.subList(recordsToUpdate * 3 / 4, 
recordsToUpdate), structType)
+    val rowsToUpdate4 = 
DataSourceTestUtils.updateRowsWithUpdatedTs(updateToDiffPartitionLowerTs, true, 
true)
+    val updates4 = rowsToUpdate4.subList(0, recordsToUpdate / 8)
+    val updateDf4 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(updates4)),
 structType)
+    val deletes4 = rowsToUpdate4.subList(recordsToUpdate / 8, recordsToUpdate 
/ 4)
+    val deleteDf4 = 
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(deletes4)),
 structType)
+    val batch4 = deleteDf4.withColumn("_hoodie_is_deleted", 
lit(true)).union(updateDf4)
+    batch4.cache()
+
+    batch1.union(batch2).union(batch3).union(batch4).write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+      .mode(SaveMode.Append)
+      .save(basePath)
+  }
+
   def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeRows: List[Row], colsToCompare: String): Unit = {
     val hudiWithoutMetaDf = 
hudiDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD)
     hudiWithoutMetaDf.registerTempTable("hudiTbl")
@@ -354,4 +480,7 @@ class TestSparkDataSource extends 
SparkClientFunctionalTestHarness {
     assertEquals(hudiDf1ToCompare.count, 
hudiDf1ToCompare.intersect(hudiDf2ToCompare).count)
     assertEquals(0, hudiDf1ToCompare.except(hudiDf2ToCompare).count)
   }
+
+  def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
+    asScalaIteratorConverter(inputList.iterator).asScala.toSeq
 }

Reply via email to