This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 752f956f03 [HUDI-3933] Add UT cases to cover different key gen (#5638)
752f956f03 is described below
commit 752f956f036e5aaf8dcc480ab0baf0c1e3d7cec9
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon May 23 06:48:09 2022 -0700
[HUDI-3933] Add UT cases to cover different key gen (#5638)
---
.../hudi/functional/TestCOWDataSourceStorage.scala | 66 +++++++++++-----------
1 file changed, 32 insertions(+), 34 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 48bb46f81b..6f13dbc82f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -26,14 +26,14 @@ import
org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
-import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{Disabled, Tag}
+import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
@@ -51,31 +51,33 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
- HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false"
)
val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"
@ParameterizedTest
- @CsvSource(Array(
- "true,org.apache.hudi.keygen.SimpleKeyGenerator",
- "true,org.apache.hudi.keygen.ComplexKeyGenerator",
- "true,org.apache.hudi.keygen.TimestampBasedKeyGenerator",
- "false,org.apache.hudi.keygen.SimpleKeyGenerator",
- "false,org.apache.hudi.keygen.ComplexKeyGenerator",
- "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"
- ))
- def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String):
Unit = {
- commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() ->
keyGenClass
- if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) {
- commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key,
pii_col"
- }
- if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
- commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
- commonOpts += DataSourceWriteOptions.PARTITIONPATH_FIELD.key() ->
"current_ts"
- commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS"
- commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
+ @CsvSource(value = Array(
+ "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+ "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
+ "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
+ "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+ "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
+ "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
+ ), delimiter = '|')
+ def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String,
recordKeys: String): Unit = {
+ var options: Map[String, String] = commonOpts +
+ (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) +
+ (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
+ (DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys)
+ val isTimestampBasedKeyGen: Boolean =
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
+ if (isTimestampBasedKeyGen) {
+ options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
+ options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING"
+ options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd"
+ options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
}
val dataGen = new HoodieTestDataGenerator(0xDEED)
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
@@ -83,14 +85,12 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
inputDF0.write.format("org.apache.hudi")
- .options(commonOpts)
+ .options(options)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
- val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
@@ -102,7 +102,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
val verificationRowKey =
inputDF1.limit(1).select("_row_key").first.getString(0)
var updateDf: DataFrame = null
- if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+ if (isTimestampBasedKeyGen) {
// update current_ts to be same as original record so that partition
path does not change with timestamp based key gen
val originalRow = snapshotDF1.filter(col("_row_key") ===
verificationRowKey).collectAsList().get(0)
updateDf = inputDF1.filter(col("_row_key") === verificationRowKey)
@@ -116,8 +116,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
}
updateDf.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(options)
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
@@ -132,7 +131,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList
var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
+ if (isTimestampBasedKeyGen) {
// incase of Timestamp based key gen, current_ts should not be updated.
but dataGen.generateUpdates() would have updated
// the value of current_ts. So, we need to revert it back to original
value.
// here is what we are going to do. Copy values to temp columns, join
with original df and update the current_ts
@@ -152,8 +151,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
inputDF2.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(options)
.mode(SaveMode.Append)
.save(basePath)
@@ -191,8 +189,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val emptyRecords = recordsToStrings(dataGen.generateUpdates("003",
0)).toList
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords,
1))
emptyDF.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(options)
.mode(SaveMode.Append)
.save(basePath)
@@ -211,9 +208,10 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
- .option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*")
+ .option(DataSourceReadOptions.INCR_PATH_GLOB.key, if
(isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
.load(basePath)
-
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(),
hoodieIncViewDF3.count())
+ assertEquals(hoodieIncViewDF2
+ .filter(col("_hoodie_partition_path").startsWith("2016")).count(),
hoodieIncViewDF3.count())
val timeTravelDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)