yihua commented on code in PR #12303:
URL: https://github.com/apache/hudi/pull/12303#discussion_r1857514230


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -44,451 +42,407 @@ class TestSparkSqlWithCustomKeyGenerator extends 
HoodieSparkSqlTestBase {
   private val LOG = LoggerFactory.getLogger(getClass)
 
   test("Test Spark SQL DML with custom key generator") {
-    for (extractPartition <- Seq(true, false)) {
-      withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> 
extractPartition.toString) {
-        withTempDir { tmp =>
-          Seq(
-            Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple",
-              "(ts=202401, segment='cat2')", "202401/cat2",
-              Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
-              TS_FORMATTER_FUNC,
-              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment, false),
-            Seq("MERGE_ON_READ", "segment:simple",
-              "(segment='cat3')", "cat3",
-              Seq("cat1", "cat2", "cat4", "cat5"),
-              TS_TO_STRING_FUNC,
-              (_: Integer, segment: String) => segment, false),
-            Seq("MERGE_ON_READ", "ts:timestamp",
-              "(ts=202312)", "202312",
-              Seq("202401", "202402"),
-              TS_FORMATTER_FUNC,
-              (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false),
-            Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
-              "(ts=202401, segment='cat2')", "202401/cat2",
-              Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
-              TS_FORMATTER_FUNC,
-              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment, false),
-            Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
-              "(ts=202401, segment='cat2')", "202401/cat2",
-              Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
-              TS_FORMATTER_FUNC,
-              (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + 
"/" + segment, true)
-          ).foreach { testParams =>
-            withTable(generateTableName) { tableName =>
-              LOG.warn("Testing with parameters: " + testParams)
-              val tableType = testParams(0).asInstanceOf[String]
-              val writePartitionFields = testParams(1).asInstanceOf[String]
-              val dropPartitionStatement = testParams(2).asInstanceOf[String]
-              val droppedPartition = testParams(3).asInstanceOf[String]
-              val expectedPartitions = testParams(4).asInstanceOf[Seq[String]]
-              val tsGenFunc = testParams(5).asInstanceOf[Integer => String]
-              val partitionGenFunc = testParams(6).asInstanceOf[(Integer, 
String) => String]
-              val tablePath = tmp.getCanonicalPath + "/" + tableName
-              val timestampKeyGeneratorConfig = if 
(writePartitionFields.contains("timestamp")) {
-                TS_KEY_GEN_CONFIGS
-              } else {
-                Map[String, String]()
-              }
-              val timestampKeyGenProps = if 
(timestampKeyGeneratorConfig.nonEmpty) {
-                ", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + 
e._2 + "'").mkString(", ")
-              } else {
-                ""
-              }
-              val useOlderPartitionFieldFormat = 
testParams(7).asInstanceOf[Boolean]
-
-              prepareTableWithKeyGenerator(
-                tableName, tablePath, tableType,
-                CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, 
timestampKeyGeneratorConfig)
-
-              if (useOlderPartitionFieldFormat) {
-                var metaClient = createMetaClient(spark, tablePath)
-                val props = new TypedProperties()
-                props.put(HoodieTableConfig.PARTITION_FIELDS.key(), 
metaClient.getTableConfig.getPartitionFieldProp)
-                HoodieTableConfig.update(metaClient.getStorage, 
metaClient.getMetaPath, props)
-                metaClient = createMetaClient(spark, tablePath)
-                assertEquals(metaClient.getTableConfig.getPartitionFieldProp, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse(""))
-              }
-
-              // SQL CTAS with table properties containing key generator write 
configs
-              createTableWithSql(tableName, tablePath,
-                s"hoodie.datasource.write.partitionpath.field = 
'$writePartitionFields'" + timestampKeyGenProps)
-
-              // Prepare source and test SQL INSERT INTO
-              val sourceTableName = tableName + "_source"
-              prepareParquetSource(sourceTableName, Seq(
-                "(7, 'a7', 1399.0, 1706800227, 'cat1')",
-                "(8, 'a8', 26.9, 1706800227, 'cat3')",
-                "(9, 'a9', 299.0, 1701443427, 'cat4')"))
-              spark.sql(
-                s"""
-                   | INSERT INTO $tableName
-                   | SELECT * from ${tableName}_source
-                   | """.stripMargin)
-              val sqlStr = s"SELECT id, name, cast(price as string), cast(ts 
as string), segment from $tableName"
-              validateResults(
-                tableName,
-                sqlStr,
-                extractPartition,
-                tsGenFunc,
-                partitionGenFunc,
-                Seq(),
-                Seq(1, "a1", "1.6", 1704121827, "cat1"),
-                Seq(2, "a2", "10.8", 1704121827, "cat1"),
-                Seq(3, "a3", "30.0", 1706800227, "cat1"),
-                Seq(4, "a4", "103.4", 1701443427, "cat2"),
-                Seq(5, "a5", "1999.0", 1704121827, "cat2"),
-                Seq(6, "a6", "80.0", 1704121827, "cat3"),
-                Seq(7, "a7", "1399.0", 1706800227, "cat1"),
-                Seq(8, "a8", "26.9", 1706800227, "cat3"),
-                Seq(9, "a9", "299.0", 1701443427, "cat4")
-              )
-
-              // Test SQL UPDATE
-              spark.sql(
-                s"""
-                   | UPDATE $tableName
-                   | SET price = price + 10.0
-                   | WHERE id between 4 and 7
-                   | """.stripMargin)
-              validateResults(
-                tableName,
-                sqlStr,
-                extractPartition,
-                tsGenFunc,
-                partitionGenFunc,
-                Seq(),
-                Seq(1, "a1", "1.6", 1704121827, "cat1"),
-                Seq(2, "a2", "10.8", 1704121827, "cat1"),
-                Seq(3, "a3", "30.0", 1706800227, "cat1"),
-                Seq(4, "a4", "113.4", 1701443427, "cat2"),
-                Seq(5, "a5", "2009.0", 1704121827, "cat2"),
-                Seq(6, "a6", "90.0", 1704121827, "cat3"),
-                Seq(7, "a7", "1409.0", 1706800227, "cat1"),
-                Seq(8, "a8", "26.9", 1706800227, "cat3"),
-                Seq(9, "a9", "299.0", 1701443427, "cat4")
-              )
-
-              // Test SQL MERGE INTO
-              spark.sql(
-                s"""
-                   | MERGE INTO $tableName as target
-                   | USING (
-                   |   SELECT 1 as id, 'a1' as name, 1.6 as price, 1704121827 
as ts, 'cat1' as segment, 'delete' as flag
-                   |   UNION
-                   |   SELECT 2 as id, 'a2' as name, 11.9 as price, 1704121827 
as ts, 'cat1' as segment, '' as flag
-                   |   UNION
-                   |   SELECT 6 as id, 'a6' as name, 99.0 as price, 1704121827 
as ts, 'cat3' as segment, '' as flag
-                   |   UNION
-                   |   SELECT 8 as id, 'a8' as name, 24.9 as price, 1706800227 
as ts, 'cat3' as segment, '' as flag
-                   |   UNION
-                   |   SELECT 10 as id, 'a10' as name, 888.8 as price, 
1706800227 as ts, 'cat5' as segment, '' as flag
-                   | ) source
-                   | on target.id = source.id
-                   | WHEN MATCHED AND flag != 'delete' THEN UPDATE SET
-                   |   id = source.id, name = source.name, price = 
source.price, ts = source.ts, segment = source.segment
-                   | WHEN MATCHED AND flag = 'delete' THEN DELETE
-                   | WHEN NOT MATCHED THEN INSERT (id, name, price, ts, 
segment)
-                   |   values (source.id, source.name, source.price, 
source.ts, source.segment)
-                   | """.stripMargin)
-              validateResults(
-                tableName,
-                sqlStr,
-                extractPartition,
-                tsGenFunc,
-                partitionGenFunc,
-                Seq(),
-                Seq(2, "a2", "11.9", 1704121827, "cat1"),
-                Seq(3, "a3", "30.0", 1706800227, "cat1"),
-                Seq(4, "a4", "113.4", 1701443427, "cat2"),
-                Seq(5, "a5", "2009.0", 1704121827, "cat2"),
-                Seq(6, "a6", "99.0", 1704121827, "cat3"),
-                Seq(7, "a7", "1409.0", 1706800227, "cat1"),
-                Seq(8, "a8", "24.9", 1706800227, "cat3"),
-                Seq(9, "a9", "299.0", 1701443427, "cat4"),
-                Seq(10, "a10", "888.8", 1706800227, "cat5")
-              )
-
-              // Test SQL DELETE
-              spark.sql(
-                s"""
-                   | DELETE FROM $tableName
-                   | WHERE id = 7
-                   | """.stripMargin)
-              validateResults(
-                tableName,
-                sqlStr,
-                extractPartition,
-                tsGenFunc,
-                partitionGenFunc,
-                Seq(),
-                Seq(2, "a2", "11.9", 1704121827, "cat1"),
-                Seq(3, "a3", "30.0", 1706800227, "cat1"),
-                Seq(4, "a4", "113.4", 1701443427, "cat2"),
-                Seq(5, "a5", "2009.0", 1704121827, "cat2"),
-                Seq(6, "a6", "99.0", 1704121827, "cat3"),
-                Seq(8, "a8", "24.9", 1706800227, "cat3"),
-                Seq(9, "a9", "299.0", 1701443427, "cat4"),
-                Seq(10, "a10", "888.8", 1706800227, "cat5")
-              )
-
-              // Test DROP PARTITION
-              
assertTrue(getSortedTablePartitions(tableName).contains(droppedPartition))
-              spark.sql(
-                s"""
-                   | ALTER TABLE $tableName DROP PARTITION 
$dropPartitionStatement
-                   |""".stripMargin)
-              validatePartitions(tableName, Seq(droppedPartition), 
expectedPartitions)
-
-              if (HoodieSparkUtils.isSpark3) {
-                // Test INSERT OVERWRITE, only supported in Spark 3.x
-                spark.sql(
-                  s"""
-                     | INSERT OVERWRITE $tableName
-                     | SELECT 100 as id, 'a100' as name, 299.0 as price, 
1706800227 as ts, 'cat10' as segment
-                     | """.stripMargin)
-                validateResults(
-                  tableName,
-                  sqlStr,
-                  extractPartition,
-                  tsGenFunc,
-                  partitionGenFunc,
-                  Seq(),
-                  Seq(100, "a100", "299.0", 1706800227, "cat10")
-                )
-              }
-
-              // Validate ts field is still of type int in the table
-              validateTsFieldSchema(tablePath, "ts", Schema.Type.INT)
-              if (useOlderPartitionFieldFormat) {
-                val metaClient = createMetaClient(spark, tablePath)
-                assertEquals(metaClient.getTableConfig.getPartitionFieldProp, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse(""))
-              }
-            }
+    withTempDir { tmp =>

Review Comment:
   These changes are much more than what's in #11710.  Could you strictly 
revert what's changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to