danny0405 commented on code in PR #7998:
URL: https://github.com/apache/hudi/pull/7998#discussion_r1118429803
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala:
##########
@@ -960,6 +960,87 @@ class TestHoodieSparkSqlWriter {
assert(spark.read.format("hudi").load(tempBasePath).where("age >=
2000").count() == 10)
}
+ /**
+ * Test upsert for CoW table without precombine field and combine before
upsert disabled.
+ */
+ @Test
+ def testUpsertWithoutPrecombineFieldAndCombineBeforeUpsertDisabled(): Unit =
{
+ val options = Map(DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key -> "false",
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1"
+ )
+
+ val df = spark.range(0, 10).toDF("keyid")
+ .withColumn("age", expr("keyid + 1000"))
+ df.write.format("hudi")
+ .options(options.updated(DataSourceWriteOptions.OPERATION.key, "insert"))
+ .mode(SaveMode.Overwrite).save(tempBasePath)
+
+ // upsert same records again, should work
+ val df_update = spark.range(0, 10).toDF("keyid")
+ .withColumn("age", expr("keyid + 2000"))
+ df_update.write.format("hudi")
+ .options(options.updated(DataSourceWriteOptions.OPERATION.key, "upsert"))
+ .mode(SaveMode.Append).save(tempBasePath)
+ val df_result_1 =
spark.read.format("hudi").load(tempBasePath).selectExpr("keyid", "age")
+ assert(df_result_1.count() == 10)
+ assert(df_result_1.where("age >= 2000").count() == 10)
+
+ // insert duplicated rows (overwrite because of bug, non-strict mode does
not work with append)
+ val df_with_duplicates = df.union(df)
+ df_with_duplicates.write.format("hudi")
+ .options(options.updated(DataSourceWriteOptions.OPERATION.key, "insert"))
+ .mode(SaveMode.Overwrite).save(tempBasePath)
+ val df_result_2 =
spark.read.format("hudi").load(tempBasePath).selectExpr("keyid", "age")
+ assert(df_result_2.count() == 20)
+ assert(df_result_2.distinct().count() == 10)
+ assert(df_result_2.where("age >= 1000 and age < 2000").count() == 20)
+
+ // upsert with duplicates, should update but not deduplicate
+ val df_with_duplicates_update = df_with_duplicates.withColumn("age",
expr("keyid + 3000"))
+ df_with_duplicates_update.write.format("hudi")
+ .options(options.updated(DataSourceWriteOptions.OPERATION.key, "upsert"))
+ .mode(SaveMode.Append).save(tempBasePath)
+ val df_result_3 =
spark.read.format("hudi").load(tempBasePath).selectExpr("keyid", "age")
+ assert(df_result_3.distinct().count() == 10)
+ assert(df_result_3.count() == 20)
+ assert(df_result_3.where("age >= 3000").count() == 20)
+ }
+
+ /**
+ * Test upsert for CoW table with combine before upsert disabled.
+ */
Review Comment:
The comment says `CoW` which is inconsistency with the actual tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]