This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 59f1c66848c [HUDI-6953] Adding test for composite keys with bulk
insert row writer (#10214)
59f1c66848c is described below
commit 59f1c66848c3ddbfff1ea5fe3eacd39f1adf9a3a
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Mar 2 21:57:23 2024 -0800
[HUDI-6953] Adding test for composite keys with bulk insert row writer
(#10214)
---
.../apache/hudi/functional/TestCOWDataSource.scala | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 5614b414927..ff87a90cef8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -487,6 +487,27 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
assertEquals(snapshotDF2.count(), (validRecordsFromBatch1 +
validRecordsFromBatch2))
}
+ @Test
+ def bulkInsertCompositeKeys(): Unit = {
+ val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val inputDf1 = inputDF.withColumn("new_col",lit("value1"))
+ val inputDf2 = inputDF.withColumn("new_col", lit(null).cast("String") )
+
+ inputDf1.union(inputDf2).write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "_row_key,new_col")
+ .option(DataSourceWriteOptions.OPERATION.key(),"bulk_insert")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertEquals(200,
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count())
+ }
+
/**
* This tests the case that query by with a specified partition condition on
hudi table which is
* different between the value of the partition field and the actual
partition path,