yihua commented on code in PR #12970:
URL: https://github.com/apache/hudi/pull/12970#discussion_r1999641315
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala:
##########
@@ -294,6 +299,127 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
inputDf2.unpersist(true)
}
+ @ParameterizedTest
+ @CsvSource(value = Array("COPY_ON_WRITE,8,EVENT_TIME_ORDERING,RECORD_INDEX",
+ "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,RECORD_INDEX",
+ "COPY_ON_WRITE,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE",
+ "COPY_ON_WRITE,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE",
+ "MERGE_ON_READ,8,EVENT_TIME_ORDERING,RECORD_INDEX",
+ "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,RECORD_INDEX",
+ "MERGE_ON_READ,8,EVENT_TIME_ORDERING,GLOBAL_SIMPLE",
+ "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE"))
+ def testDeletesWithHoodieIsDeleted(tableType: HoodieTableType, tableVersion:
Int, mergeMode: RecordMergeMode, indexType: IndexType): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+ writeOpts = writeOpts + ("hoodie.write.table.version" ->
tableVersion.toString,
+ "hoodie.datasource.write.table.type" -> tableType.name(),
+ "hoodie.datasource.write.precombine.field" -> "ts",
+ "hoodie.write.record.merge.mode" -> mergeMode.name(),
+ "hoodie.index.type" -> indexType.name(),
+ "hoodie.metadata.record.index.enable" -> "true",
+ "hoodie.record.index.update.partition.path" -> "true",
+ "hoodie.parquet.small.file.limit" -> "0")
+
+ writeOpts = writeOpts + (if (indexType == IndexType.RECORD_INDEX) {
+ "hoodie.record.index.update.partition.path" -> "true"
+ } else {
+ "hoodie.simple.index.update.partition.path" -> "true"
+ })
+
+ // generate the inserts
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val inserts = DataSourceTestUtils.generateRandomRows(400)
+ val df =
spark.createDataFrame(spark.sparkContext.parallelize(convertRowListToSeq(inserts)),
structType)
+
+ df.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val hudiSnapshotDF1 = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(400, hudiSnapshotDF1.count())
+
+ // ingest batch2 with mix of updates and deletes. some of them are
updating same partition, some of them are moving to new partition.
+ // some are having higher ts and some are having lower ts.
+ ingestNewBatch(tableType, 200, structType, inserts.subList(0, 200),
writeOpts)
+
+ val expectedRecordCount2 = if (mergeMode ==
RecordMergeMode.EVENT_TIME_ORDERING) 350 else 300;
+ val hudiSnapshotDF2 = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(expectedRecordCount2, hudiSnapshotDF2.count())
+
+ // querying subset of column. even if not including _hoodie_is_deleted,
snapshot read should return right data.
+ assertEquals(expectedRecordCount2, spark.read.format("hudi")
+ .options(readOpts).load(basePath).select("_hoodie_record_key",
"_hoodie_partition_path").count())
+
+ // ingest batch3 with mix of updates and deletes. some of them are
updating same partition, some of them are moving to new partition.
+ // some are having higher ts and some are having lower ts.
+ ingestNewBatch(tableType, 200, structType, inserts.subList(200, 400),
writeOpts)
+
+ val expectedRecordCount3 = if (mergeMode ==
RecordMergeMode.EVENT_TIME_ORDERING) 300 else 200;
+ val hudiSnapshotDF3 = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(expectedRecordCount3, hudiSnapshotDF3.count())
+
+ // querying subset of column. even if not including _hoodie_is_deleted,
snapshot read should return right data.
+ assertEquals(expectedRecordCount3, spark.read.format("hudi")
+ .options(readOpts).load(basePath).select("_hoodie_record_key",
"_hoodie_partition_path").count())
+ }
+
+ def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer,
structType: StructType, inserts: java.util.List[Row],
Review Comment:
nit: lot of similar code here in this method. Filed HUDI-9193 as a follow-up
to abstract them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]