suryaprasanna commented on code in PR #17850: URL: https://github.com/apache/hudi/pull/17850#discussion_r2781046360
########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala: ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.common + +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.hudi.storage.StoragePath +import org.apache.hudi.sync.common.HoodieSyncConfig + +import org.junit.jupiter.api.Assertions.assertTrue + +class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase { + test("Test custom partition value extractor interface") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', + | 'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + // yyyy/mm/dd/country/state/city + spark.sql( + s""" + | insert into $targetTable values + | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"), + | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"), + | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"), + | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"), + | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO") + """.stripMargin) + + // check result after insert and merge data into target table + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable" + + s" where state = 'CA'")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"), + Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO") + ) + + // Verify table config has custom partition value extractor class set + val metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)) + .setBasePath(tablePath) + .build() + val tableConfig = metaClient.getTableConfig + val partitionExtractorClass = tableConfig.getProps.getProperty( + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()) + assertTrue(partitionExtractorClass == "org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor", + s"Table config should have custom partition value extractor class set to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass") + + // Verify that partition paths are created with slash separated format (yyyy/MM/dd/country/state/city) + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/CA/SFO")), + s"Partition path 2024/01/01/USA/CA/SFO should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/01/USA/TX/AU")), + s"Partition path 2024/01/01/USA/TX/AU should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/02/USA/CA/LA")), + s"Partition path 2024/01/02/USA/CA/LA should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/02/USA/WA/SEA")), + s"Partition path 2024/01/02/USA/WA/SEA should exist") + assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, "2024/01/03/USA/CA/SFO")), + s"Partition path 2024/01/03/USA/CA/SFO should exist") + + val engine = new HoodieSparkEngineContext(spark.sparkContext) + val storage = metaClient.getStorage() + val metadataConfig = HoodieMetadataConfig.newBuilder().build() + val metadataTable = new HoodieBackedTableMetadata(engine, storage, metadataConfig, tablePath) + val partitionPaths = metadataTable.getAllPartitionPaths + assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO")) + assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU")) + assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA")) + assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA")) + assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO")) + metadataTable.close() + } + } + + test("Test custom partition value extractor with partition pruning and filtering") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table $targetTable ( + | `id` string, + | `name` string, + | `ts` bigint, + | `datestr` string, + | `country` string, + | `state` string, + | `city` string + |) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'COW', + | 'preCombineField'='ts', + | 'hoodie.datasource.write.hive_style_partitioning'='false', + | 'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator', + | '${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor' + | ) + | partitioned by (`datestr`, `country`, `state`, `city`) + | location '$tablePath' + """.stripMargin) + + // Insert data across multiple partitions + spark.sql( + s""" + | insert into $targetTable values + | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"), + | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"), + | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"), + | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"), + | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"), + | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"), + | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"), + | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC") + """.stripMargin) + + // Test partition pruning with single partition column filter (state) + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' order by id")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"), + Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA") + ) + + // Test partition pruning with multiple partition column filters + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where state = 'CA' and city = 'SFO' order by id")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO") + ) + + // Test partition pruning with date filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where datestr = '2024-01-01' order by id")( + Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"), + Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"), + Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU") + ) + + // Test partition pruning with country filter + checkAnswer(s"select id, name, ts, datestr, country, state, city from $targetTable where country = 'CAN' order by id")( + Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR") + ) + + // Test partition pruning with combined date and state filter Review Comment: Added the test case with corrupted parquet file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
