yihua commented on code in PR #11770:
URL: https://github.com/apache/hudi/pull/11770#discussion_r1720407316
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -240,136 +255,145 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
// Only testing Spark 3.3 and above as lower Spark versions do not support
// ALTER TABLE .. SET TBLPROPERTIES .. to store table-level properties in
Hudi Catalog
if (HoodieSparkUtils.gteqSpark3_3) {
- withTempDir { tmp => {
- val tableNameNonPartitioned = generateTableName
- val tableNameSimpleKey = generateTableName
- val tableNameCustom1 = generateTableName
- val tableNameCustom2 = generateTableName
-
- val tablePathNonPartitioned = tmp.getCanonicalPath + "/" +
tableNameNonPartitioned
- val tablePathSimpleKey = tmp.getCanonicalPath + "/" +
tableNameSimpleKey
- val tablePathCustom1 = tmp.getCanonicalPath + "/" + tableNameCustom1
- val tablePathCustom2 = tmp.getCanonicalPath + "/" + tableNameCustom2
-
- val tableType = "MERGE_ON_READ"
- val writePartitionFields1 = "segment:simple"
- val writePartitionFields2 = "ts:timestamp,segment:simple"
-
- prepareTableWithKeyGenerator(
- tableNameNonPartitioned, tablePathNonPartitioned, tableType,
- NONPARTITIONED_KEY_GEN_CLASS_NAME, "", Map())
- prepareTableWithKeyGenerator(
- tableNameSimpleKey, tablePathSimpleKey, tableType,
- SIMPLE_KEY_GEN_CLASS_NAME, "segment", Map())
- prepareTableWithKeyGenerator(
- tableNameCustom1, tablePathCustom1, tableType,
- CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields1, Map())
- prepareTableWithKeyGenerator(
- tableNameCustom2, tablePathCustom2, tableType,
- CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields2, TS_KEY_GEN_CONFIGS)
-
- // Non-partitioned table does not require additional partition path
field write config
- createTableWithSql(tableNameNonPartitioned, tablePathNonPartitioned,
"")
- // Partitioned table with simple key generator does not require
additional partition path field write config
- createTableWithSql(tableNameSimpleKey, tablePathSimpleKey, "")
- // Partitioned table with custom key generator requires additional
partition path field write config
- // Without that, right now the SQL DML fails
- createTableWithSql(tableNameCustom1, tablePathCustom1, "")
- createTableWithSql(tableNameCustom2, tablePathCustom2,
- s"hoodie.datasource.write.partitionpath.field =
'$writePartitionFields2', "
- + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 +
"'").mkString(", "))
-
- val segmentPartitionFunc = (_: Integer, segment: String) => segment
- val customPartitionFunc = (ts: Integer, segment: String) =>
TS_FORMATTER_FUNC.apply(ts) + "/" + segment
-
- testFirstRoundInserts(tableNameNonPartitioned, TS_TO_STRING_FUNC, (_,
_) => "")
- testFirstRoundInserts(tableNameSimpleKey, TS_TO_STRING_FUNC,
segmentPartitionFunc)
- // INSERT INTO should fail for tableNameCustom1
- val sourceTableName = tableNameCustom1 + "_source"
- prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0,
1706800227, 'cat1')"))
- assertThrows[HoodieException] {
- spark.sql(
- s"""
- | INSERT INTO $tableNameCustom1
- | SELECT * from $sourceTableName
- | """.stripMargin)
+ for (extractPartition <- Seq(true, false)) {
+ withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() ->
extractPartition.toString) {
Review Comment:
Similar here.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -41,194 +42,208 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
private val LOG = LoggerFactory.getLogger(getClass)
test("Test Spark SQL DML with custom key generator") {
- withTempDir { tmp =>
- Seq(
- Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple",
- "(ts=202401, segment='cat2')", "202401/cat2",
- Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3",
"202402/cat1", "202402/cat3", "202402/cat5"),
- TS_FORMATTER_FUNC,
- (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/"
+ segment),
- Seq("MERGE_ON_READ", "segment:simple",
- "(segment='cat3')", "cat3",
- Seq("cat1", "cat2", "cat4", "cat5"),
- TS_TO_STRING_FUNC,
- (_: Integer, segment: String) => segment),
- Seq("MERGE_ON_READ", "ts:timestamp",
- "(ts=202312)", "202312",
- Seq("202401", "202402"),
- TS_FORMATTER_FUNC,
- (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts)),
- Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
- "(ts=202401, segment='cat2')", "202401/cat2",
- Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3",
"202402/cat1", "202402/cat3", "202402/cat5"),
- TS_FORMATTER_FUNC,
- (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/"
+ segment)
- ).foreach { testParams =>
- withTable(generateTableName) { tableName =>
- LOG.warn("Testing with parameters: " + testParams)
- val tableType = testParams(0).asInstanceOf[String]
- val writePartitionFields = testParams(1).asInstanceOf[String]
- val dropPartitionStatement = testParams(2).asInstanceOf[String]
- val droppedPartition = testParams(3).asInstanceOf[String]
- val expectedPartitions = testParams(4).asInstanceOf[Seq[String]]
- val tsGenFunc = testParams(5).asInstanceOf[Integer => String]
- val partitionGenFunc = testParams(6).asInstanceOf[(Integer, String)
=> String]
- val tablePath = tmp.getCanonicalPath + "/" + tableName
- val timestampKeyGeneratorConfig = if
(writePartitionFields.contains("timestamp")) {
- TS_KEY_GEN_CONFIGS
- } else {
- Map[String, String]()
- }
- val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty)
{
- ", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 +
"'").mkString(", ")
- } else {
- ""
- }
-
- prepareTableWithKeyGenerator(
- tableName, tablePath, tableType,
- CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields,
timestampKeyGeneratorConfig)
-
- // SQL CTAS with table properties containing key generator write
configs
- createTableWithSql(tableName, tablePath,
- s"hoodie.datasource.write.partitionpath.field =
'$writePartitionFields'" + timestampKeyGenProps)
-
- // Prepare source and test SQL INSERT INTO
- val sourceTableName = tableName + "_source"
- prepareParquetSource(sourceTableName, Seq(
- "(7, 'a7', 1399.0, 1706800227, 'cat1')",
- "(8, 'a8', 26.9, 1706800227, 'cat3')",
- "(9, 'a9', 299.0, 1701443427, 'cat4')"))
- spark.sql(
- s"""
- | INSERT INTO $tableName
- | SELECT * from ${tableName}_source
- | """.stripMargin)
- validateResults(
- tableName,
- s"SELECT id, name, cast(price as string), cast(ts as string),
segment from $tableName",
- tsGenFunc,
- partitionGenFunc,
- Seq(),
- Seq(1, "a1", "1.6", 1704121827, "cat1"),
- Seq(2, "a2", "10.8", 1704121827, "cat1"),
- Seq(3, "a3", "30.0", 1706800227, "cat1"),
- Seq(4, "a4", "103.4", 1701443427, "cat2"),
- Seq(5, "a5", "1999.0", 1704121827, "cat2"),
- Seq(6, "a6", "80.0", 1704121827, "cat3"),
- Seq(7, "a7", "1399.0", 1706800227, "cat1"),
- Seq(8, "a8", "26.9", 1706800227, "cat3"),
- Seq(9, "a9", "299.0", 1701443427, "cat4")
- )
-
- // Test SQL UPDATE
- spark.sql(
- s"""
- | UPDATE $tableName
- | SET price = price + 10.0
- | WHERE id between 4 and 7
- | """.stripMargin)
- validateResults(
- tableName,
- s"SELECT id, name, cast(price as string), cast(ts as string),
segment from $tableName",
- tsGenFunc,
- partitionGenFunc,
- Seq(),
- Seq(1, "a1", "1.6", 1704121827, "cat1"),
- Seq(2, "a2", "10.8", 1704121827, "cat1"),
- Seq(3, "a3", "30.0", 1706800227, "cat1"),
- Seq(4, "a4", "113.4", 1701443427, "cat2"),
- Seq(5, "a5", "2009.0", 1704121827, "cat2"),
- Seq(6, "a6", "90.0", 1704121827, "cat3"),
- Seq(7, "a7", "1409.0", 1706800227, "cat1"),
- Seq(8, "a8", "26.9", 1706800227, "cat3"),
- Seq(9, "a9", "299.0", 1701443427, "cat4")
- )
-
- // Test SQL MERGE INTO
- spark.sql(
- s"""
- | MERGE INTO $tableName as target
- | USING (
- | SELECT 1 as id, 'a1' as name, 1.6 as price, 1704121827 as
ts, 'cat1' as segment, 'delete' as flag
- | UNION
- | SELECT 2 as id, 'a2' as name, 11.9 as price, 1704121827 as
ts, 'cat1' as segment, '' as flag
- | UNION
- | SELECT 6 as id, 'a6' as name, 99.0 as price, 1704121827 as
ts, 'cat3' as segment, '' as flag
- | UNION
- | SELECT 8 as id, 'a8' as name, 24.9 as price, 1706800227 as
ts, 'cat3' as segment, '' as flag
- | UNION
- | SELECT 10 as id, 'a10' as name, 888.8 as price, 1706800227
as ts, 'cat5' as segment, '' as flag
- | ) source
- | on target.id = source.id
- | WHEN MATCHED AND flag != 'delete' THEN UPDATE SET
- | id = source.id, name = source.name, price = source.price,
ts = source.ts, segment = source.segment
- | WHEN MATCHED AND flag = 'delete' THEN DELETE
- | WHEN NOT MATCHED THEN INSERT (id, name, price, ts, segment)
- | values (source.id, source.name, source.price, source.ts,
source.segment)
- | """.stripMargin)
- validateResults(
- tableName,
- s"SELECT id, name, cast(price as string), cast(ts as string),
segment from $tableName",
- tsGenFunc,
- partitionGenFunc,
- Seq(),
- Seq(2, "a2", "11.9", 1704121827, "cat1"),
- Seq(3, "a3", "30.0", 1706800227, "cat1"),
- Seq(4, "a4", "113.4", 1701443427, "cat2"),
- Seq(5, "a5", "2009.0", 1704121827, "cat2"),
- Seq(6, "a6", "99.0", 1704121827, "cat3"),
- Seq(7, "a7", "1409.0", 1706800227, "cat1"),
- Seq(8, "a8", "24.9", 1706800227, "cat3"),
- Seq(9, "a9", "299.0", 1701443427, "cat4"),
- Seq(10, "a10", "888.8", 1706800227, "cat5")
- )
-
- // Test SQL DELETE
- spark.sql(
- s"""
- | DELETE FROM $tableName
- | WHERE id = 7
- | """.stripMargin)
- validateResults(
- tableName,
- s"SELECT id, name, cast(price as string), cast(ts as string),
segment from $tableName",
- tsGenFunc,
- partitionGenFunc,
- Seq(),
- Seq(2, "a2", "11.9", 1704121827, "cat1"),
- Seq(3, "a3", "30.0", 1706800227, "cat1"),
- Seq(4, "a4", "113.4", 1701443427, "cat2"),
- Seq(5, "a5", "2009.0", 1704121827, "cat2"),
- Seq(6, "a6", "99.0", 1704121827, "cat3"),
- Seq(8, "a8", "24.9", 1706800227, "cat3"),
- Seq(9, "a9", "299.0", 1701443427, "cat4"),
- Seq(10, "a10", "888.8", 1706800227, "cat5")
- )
-
- // Test DROP PARTITION
-
assertTrue(getSortedTablePartitions(tableName).contains(droppedPartition))
- spark.sql(
- s"""
- | ALTER TABLE $tableName DROP PARTITION $dropPartitionStatement
- |""".stripMargin)
- validatePartitions(tableName, Seq(droppedPartition),
expectedPartitions)
-
- if (HoodieSparkUtils.isSpark3) {
- // Test INSERT OVERWRITE, only supported in Spark 3.x
- spark.sql(
- s"""
- | INSERT OVERWRITE $tableName
- | SELECT 100 as id, 'a100' as name, 299.0 as price,
1706800227 as ts, 'cat10' as segment
- | """.stripMargin)
- validateResults(
- tableName,
- s"SELECT id, name, cast(price as string), cast(ts as string),
segment from $tableName",
- tsGenFunc,
- partitionGenFunc,
- Seq(),
- Seq(100, "a100", "299.0", 1706800227, "cat10")
- )
+ for (extractPartition <- Seq(true, false)) {
+ withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() ->
extractPartition.toString) {
Review Comment:
I think it's OK to use default value and deprecate
`EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH` in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]