This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 752f956f03 [HUDI-3933] Add UT cases to cover different key gen (#5638)
752f956f03 is described below

commit 752f956f036e5aaf8dcc480ab0baf0c1e3d7cec9
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon May 23 06:48:09 2022 -0700

    [HUDI-3933] Add UT cases to cover different key gen (#5638)
---
 .../hudi/functional/TestCOWDataSourceStorage.scala | 66 +++++++++++-----------
 1 file changed, 32 insertions(+), 34 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 48bb46f81b..6f13dbc82f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -26,14 +26,14 @@ import 
org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
-import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{Disabled, Tag}
+import org.junit.jupiter.api.Tag
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
@@ -51,31 +51,33 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
-    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false"
   )
 
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
 
   @ParameterizedTest
-  @CsvSource(Array(
-    "true,org.apache.hudi.keygen.SimpleKeyGenerator",
-    "true,org.apache.hudi.keygen.ComplexKeyGenerator",
-    "true,org.apache.hudi.keygen.TimestampBasedKeyGenerator",
-    "false,org.apache.hudi.keygen.SimpleKeyGenerator",
-    "false,org.apache.hudi.keygen.ComplexKeyGenerator",
-    "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"
-  ))
-  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): 
Unit = {
-    commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
keyGenClass
-    if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) {
-      commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key, 
pii_col"
-    }
-    if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
-      commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
-      commonOpts += DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> 
"current_ts"
-      commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS"
-      commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
+  @CsvSource(value = Array(
+    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+    "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
+    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
+    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+    "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
+    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
+  ), delimiter = '|')
+  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, 
recordKeys: String): Unit = {
+    var options: Map[String, String] = commonOpts +
+      (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) +
+      (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
+      (DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys)
+    val isTimestampBasedKeyGen: Boolean = 
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
+    if (isTimestampBasedKeyGen) {
+      options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
+      options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING"
+      options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd"
+      options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
     }
     val dataGen = new HoodieTestDataGenerator(0xDEED)
     val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
@@ -83,14 +85,12 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
     val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
     inputDF0.write.format("org.apache.hudi")
-      .options(commonOpts)
+      .options(options)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
-    val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
 
     // Snapshot query
     val snapshotDF1 = spark.read.format("org.apache.hudi")
@@ -102,7 +102,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     val verificationRowKey = 
inputDF1.limit(1).select("_row_key").first.getString(0)
     var updateDf: DataFrame = null
-    if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+    if (isTimestampBasedKeyGen) {
       // update current_ts to be same as original record so that partition 
path does not change with timestamp based key gen
       val originalRow = snapshotDF1.filter(col("_row_key") === 
verificationRowKey).collectAsList().get(0)
       updateDf = inputDF1.filter(col("_row_key") === verificationRowKey)
@@ -116,8 +116,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     }
 
     updateDf.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
     val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
@@ -132,7 +131,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList
     var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
 
-    if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+    if (isTimestampBasedKeyGen) {
       // incase of Timestamp based key gen, current_ts should not be updated. 
but dataGen.generateUpdates() would have updated
       // the value of current_ts. So, we need to revert it back to original 
value.
       // here is what we are going to do. Copy values to temp columns, join 
with original df and update the current_ts
@@ -152,8 +151,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
 
     inputDF2.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -191,8 +189,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val emptyRecords = recordsToStrings(dataGen.generateUpdates("003", 
0)).toList
     val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 
1))
     emptyDF.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(options)
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -211,9 +208,10 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
-      .option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*")
+      .option(DataSourceReadOptions.INCR_PATH_GLOB.key, if 
(isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
       .load(basePath)
-    
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(),
 hoodieIncViewDF3.count())
+    assertEquals(hoodieIncViewDF2
+      .filter(col("_hoodie_partition_path").startsWith("2016")).count(), 
hoodieIncViewDF3.count())
 
     val timeTravelDF = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)

Reply via email to