nsivabalan commented on code in PR #17850:
URL: https://github.com/apache/hudi/pull/17850#discussion_r2770846900


##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
       .sinceVersion("1.0.0")
       .withDocumentation("Key Generator type to determine key generator 
class");
 
+  public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = 
ConfigProperty
+      .key("hoodie.datasource.hive_sync.partition_extractor_class")

Review Comment:
   none of the table property will have `"hoodie.datasource.` as prefixes. 
   
   we should define two configs. 
   
   `hoodie.datasource.hive_sync.partition_extractor_class` for writer property. 
   and 
   `hoodie.table.hive_sync.partition_extractor_class` for table config. 
   
   users should not be able to directly set the table property. they should 
always set the writer property only i.e. 
`hoodie.datasource.hive_sync.partition_extractor_class`
   
   



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -238,10 +242,25 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
       // we couldn't reconstruct initial partition column values from 
partition paths due to lost data after formatting.
       // But the output for these cases is in a string format, so we can pass 
partitionPath as UTF8String
       Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+    } else if(usePartitionValueExtractorOnRead && 
!StringUtils.isNullOrEmpty(partitionValueExtractorClass)) {
+      try {
+        val partitionValueExtractor = 
Class.forName(partitionValueExtractorClass)

Review Comment:
   can we move this to a private method to keep this method lean
   
   may be, `parsePartitionValuesBasedOnPartitionValueExtrator`
   
   



##########
hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java:
##########
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hudi.sync.common.model;
+package org.apache.hudi.hive.sync;

Review Comment:
   lets not change this



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
       .sinceVersion("1.0.0")
       .withDocumentation("Key Generator type to determine key generator 
class");
 
+  public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = 
ConfigProperty
+      .key("hoodie.datasource.hive_sync.partition_extractor_class")
+      .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
+      .withInferFunction(cfg -> {
+        Option<String> partitionFieldsOpt = 
HoodieTableConfig.getPartitionFieldProp(cfg)
+            .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
+
+        if (!partitionFieldsOpt.isPresent()) {
+          return Option.empty();

Review Comment:
   is this not NonPartitionedExtractor ?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.keygen.{BuiltinKeyGenerator, ComplexAvroKeyGenerator, 
KeyGenUtils}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.Arrays
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Key generator that converts partition values to slash-separated partition 
paths.
+ *
+ * This generator is useful when you have partition columns like:
+ * - datestr: "yyyy-mm-dd" format
+ * - country, state, city: regular string values
+ *
+ * And you want to create partition paths like: yyyy/mm/dd/country/state/city
+ *
+ * The first partition field (typically a date) will have its hyphens replaced 
with slashes.
+ * All partition fields are then combined with "/" as the separator.
+ */
+class TestCustomSlashKeyGenerator(props: TypedProperties) extends 
BuiltinKeyGenerator(props) {

Review Comment:
   we should name this class differently. 
   only test class should typically be named like this. 
   
   how about `MockSlashKeyGenerator` or `UserDefinedSlashKeyGenerator`
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -267,6 +267,15 @@ object DataSourceReadOptions {
     .sinceVersion("1.1.0")
     .withDocumentation("Fully qualified class name of the catalog that is used 
by the Polaris spark client.")
 
+  val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = 
ConfigProperty
+    
.key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class")
+    .defaultValue("true")

Review Comment:
   can we disable by default. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.hive.sync.PartitionValueExtractor
+
+import java.util
+
+class TestCustomSlashPartitionValueExtractor extends PartitionValueExtractor {

Review Comment:
   same here



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java:
##########
@@ -116,43 +114,7 @@ public class HoodieSyncConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Field in the table to use for determining hive 
partition columns.");
 
-  public static final ConfigProperty<String> 
META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty

Review Comment:
   lets leave this as is w/o any changes



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    val usePartitionValueExtractorForReaders = getConfigValue(options, sqlConf,

Review Comment:
   again, lets try to align method names, variables names across the board so 
that its easier to maintain. 
   It could lead to confusion.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    val usePartitionValueExtractorForReaders = getConfigValue(options, sqlConf,

Review Comment:
   `usePartitionValueExtractorForReaders` -> `usePartitionValueExtractorOnRead`



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.sync.common.HoodieSyncConfig
+
+import org.junit.jupiter.api.Assertions.assertTrue
+
+class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase {
+  test("Test custom partition value extractor interface") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+           |  
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+           |  
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+      // yyyy/mm/dd/country/state/city
+      spark.sql(
+        s"""
+           | insert into $targetTable values
+           | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+           | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"),
+           | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"),
+           | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"),
+           | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO")
+        """.stripMargin)
+
+      // check result after insert and merge data into target table
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable"
+        + s" where state = 'CA'")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"),
+        Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO")
+      )
+
+      // Verify table config has custom partition value extractor class set
+      val metaClient = HoodieTableMetaClient.builder()
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build()
+      val tableConfig = metaClient.getTableConfig
+      val partitionExtractorClass = tableConfig.getProps.getProperty(
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key())
+      assertTrue(partitionExtractorClass == 
"org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor",
+        s"Table config should have custom partition value extractor class set 
to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass")
+
+      // Verify that partition paths are created with slash separated format 
(yyyy/MM/dd/country/state/city)
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/01/USA/CA/SFO")),
+        s"Partition path 2024/01/01/USA/CA/SFO should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/01/USA/TX/AU")),
+        s"Partition path 2024/01/01/USA/TX/AU should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/02/USA/CA/LA")),
+        s"Partition path 2024/01/02/USA/CA/LA should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/02/USA/WA/SEA")),
+        s"Partition path 2024/01/02/USA/WA/SEA should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/03/USA/CA/SFO")),
+        s"Partition path 2024/01/03/USA/CA/SFO should exist")
+
+      val engine = new HoodieSparkEngineContext(spark.sparkContext)
+      val storage = metaClient.getStorage()
+      val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+      val metadataTable = new HoodieBackedTableMetadata(engine, storage, 
metadataConfig, tablePath)
+      val partitionPaths = metadataTable.getAllPartitionPaths
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+      assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO"))
+      metadataTable.close()
+    }
+  }
+
+  test("Test custom partition value extractor with partition pruning and 
filtering") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+           |  
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+
+      // Insert data across multiple partitions
+      spark.sql(
+        s"""
+           | insert into $targetTable values
+           | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+           | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"),
+           | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"),
+           | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"),
+           | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"),
+           | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"),
+           | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"),
+           | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC")
+        """.stripMargin)
+
+      // Test partition pruning with single partition column filter (state)
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state = 'CA' order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+        Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA")
+      )
+
+      // Test partition pruning with multiple partition column filters
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state = 'CA' and city = 'SFO' order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+      )
+
+      // Test partition pruning with date filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where datestr = '2024-01-01' order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU")
+      )
+
+      // Test partition pruning with country filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where country = 'CAN' order by id")(
+        Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR")
+      )
+
+      // Test partition pruning with combined date and state filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where datestr = '2024-01-02' and state = 'CA' order by id")(
+        Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+      )
+
+      // Test partition pruning with IN clause
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state IN ('CA', 'NY') order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+        Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA"),
+        Seq(8, "a8", 8000, "2024-01-04", "USA", "NY", "NYC")
+      )
+
+      // Test reading with _hoodie_partition_path to verify custom partition 
format
+      checkAnswer(s"select id, _hoodie_partition_path from $targetTable where 
state = 'CA' order by id")(
+        Seq("1", "2024/01/01/USA/CA/SFO"),
+        Seq("2", "2024/01/01/USA/CA/LA"),
+        Seq("4", "2024/01/02/USA/CA/SFO"),
+        Seq("6", "2024/01/03/USA/CA/LA")
+      )
+
+      // Create DataFrame and analyze query plan to verify partition pruning
+      val dfWithStateFilter = spark.sql(s"select * from $targetTable where 
state = 'CA'")
+      val planWithStateFilter = 
dfWithStateFilter.queryExecution.executedPlan.toString()
+      // Verify partition filters are pushed down
+      assertTrue(planWithStateFilter.contains("PartitionFilters") || 
planWithStateFilter.contains("PushedFilters"),
+        s"Query plan should contain partition filters for state column")
+
+      // Test DataFrame API with multiple partition filters
+      val dfWithMultipleFilters = spark.table(targetTable)
+        .filter("state = 'CA' and datestr = '2024-01-01'")
+      val planWithMultipleFilters = 
dfWithMultipleFilters.queryExecution.executedPlan.toString()
+      assertTrue(planWithMultipleFilters.contains("PartitionFilters") || 
planWithMultipleFilters.contains("PushedFilters"),
+        s"Query plan should contain partition filters for multiple columns")
+
+      // Verify the filtered results
+      val multiFilterResults = dfWithMultipleFilters.select("id", "name", 
"state", "datestr").orderBy("id").collect()
+      assertTrue(multiFilterResults.length == 2, s"Expected 2 rows but got 
${multiFilterResults.length}")
+      assertTrue(multiFilterResults(0).getString(0) == "1", s"First row id 
should be 1")
+      assertTrue(multiFilterResults(0).getString(1) == "a1", s"First row name 
should be a1")
+      assertTrue(multiFilterResults(0).getString(2) == "CA", s"First row state 
should be CA")
+      assertTrue(multiFilterResults(0).getString(3) == "2024-01-01", s"First 
row datestr should be 2024-01-01")
+      assertTrue(multiFilterResults(1).getString(0) == "2", s"Second row id 
should be 2")
+      assertTrue(multiFilterResults(1).getString(1) == "a2", s"Second row name 
should be a2")
+
+      // Test DataFrame with country filter
+      val dfWithCountryFilter = spark.table(targetTable).filter("country = 
'CAN'")
+      val planWithCountryFilter = 
dfWithCountryFilter.queryExecution.executedPlan.toString()
+      assertTrue(planWithCountryFilter.contains("PartitionFilters") || 
planWithCountryFilter.contains("PushedFilters"),
+        s"Query plan should contain partition filters for country column")
+
+      val countryFilterResults = dfWithCountryFilter.select("id", 
"country").orderBy("id").collect()
+      assertTrue(countryFilterResults.length == 1, s"Expected 1 row but got 
${countryFilterResults.length}")
+      assertTrue(countryFilterResults(0).getString(0) == "7", s"Row id should 
be 7")
+      assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country 
should be CAN")
+
+      // Verify all partitions exist as expected
+      val metaClient = HoodieTableMetaClient.builder()
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build()
+
+      val engine = new HoodieSparkEngineContext(spark.sparkContext)
+      val storage = metaClient.getStorage()
+      val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+      val metadataTable = new HoodieBackedTableMetadata(engine, storage, 
metadataConfig, tablePath)
+      val partitionPaths = metadataTable.getAllPartitionPaths
+
+      // Verify expected partition paths
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+      assertTrue(partitionPaths.contains("2024/01/03/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/03/CAN/ON/TOR"))
+      assertTrue(partitionPaths.contains("2024/01/04/USA/NY/NYC"))
+
+      metadataTable.close()
+    }
+  }

Review Comment:
   I understand hive style partitioning and the custom partition value 
extractor are mutually exclusive. 
   
   but can we add a test w/ custom partition value extractor and url encoding 
enabled. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
       .sinceVersion("1.0.0")
       .withDocumentation("Key Generator type to determine key generator 
class");
 
+  public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = 
ConfigProperty
+      .key("hoodie.datasource.hive_sync.partition_extractor_class")
+      .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
+      .withInferFunction(cfg -> {

Review Comment:
   we should not be setting any default here right?
   ok to have the infer function. 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.sync.common.HoodieSyncConfig
+
+import org.junit.jupiter.api.Assertions.assertTrue
+
+class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase {
+  test("Test custom partition value extractor interface") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+           |  
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+           |  
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+      // yyyy/mm/dd/country/state/city
+      spark.sql(
+        s"""
+           | insert into $targetTable values
+           | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+           | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"),
+           | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"),
+           | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"),
+           | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO")
+        """.stripMargin)
+
+      // check result after insert and merge data into target table
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable"
+        + s" where state = 'CA'")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"),
+        Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO")
+      )
+
+      // Verify table config has custom partition value extractor class set
+      val metaClient = HoodieTableMetaClient.builder()
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build()
+      val tableConfig = metaClient.getTableConfig
+      val partitionExtractorClass = tableConfig.getProps.getProperty(
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key())
+      assertTrue(partitionExtractorClass == 
"org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor",
+        s"Table config should have custom partition value extractor class set 
to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass")
+
+      // Verify that partition paths are created with slash separated format 
(yyyy/MM/dd/country/state/city)
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/01/USA/CA/SFO")),
+        s"Partition path 2024/01/01/USA/CA/SFO should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/01/USA/TX/AU")),
+        s"Partition path 2024/01/01/USA/TX/AU should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/02/USA/CA/LA")),
+        s"Partition path 2024/01/02/USA/CA/LA should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/02/USA/WA/SEA")),
+        s"Partition path 2024/01/02/USA/WA/SEA should exist")
+      assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath, 
"2024/01/03/USA/CA/SFO")),
+        s"Partition path 2024/01/03/USA/CA/SFO should exist")
+
+      val engine = new HoodieSparkEngineContext(spark.sparkContext)
+      val storage = metaClient.getStorage()
+      val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+      val metadataTable = new HoodieBackedTableMetadata(engine, storage, 
metadataConfig, tablePath)
+      val partitionPaths = metadataTable.getAllPartitionPaths
+      assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+      assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA"))
+      assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+      assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO"))
+      metadataTable.close()
+    }
+  }
+
+  test("Test custom partition value extractor with partition pruning and 
filtering") {
+    withTempDir { tmp =>
+      val targetTable = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+      spark.sql(
+        s"""
+           |create table $targetTable (
+           |  `id` string,
+           |  `name` string,
+           |  `ts` bigint,
+           |  `datestr` string,
+           |  `country` string,
+           |  `state` string,
+           |  `city` string
+           |) using hudi
+           | tblproperties (
+           |  'primaryKey' = 'id',
+           |  'type' = 'COW',
+           |  'preCombineField'='ts',
+           |  'hoodie.datasource.write.hive_style_partitioning'='false',
+           |  
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+           |  
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+           | )
+           | partitioned by (`datestr`, `country`, `state`, `city`)
+           | location '$tablePath'
+        """.stripMargin)
+
+      // Insert data across multiple partitions
+      spark.sql(
+        s"""
+           | insert into $targetTable values
+           | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+           | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"),
+           | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"),
+           | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"),
+           | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"),
+           | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"),
+           | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"),
+           | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC")
+        """.stripMargin)
+
+      // Test partition pruning with single partition column filter (state)
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state = 'CA' order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+        Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA")
+      )
+
+      // Test partition pruning with multiple partition column filters
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where state = 'CA' and city = 'SFO' order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+      )
+
+      // Test partition pruning with date filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where datestr = '2024-01-01' order by id")(
+        Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+        Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+        Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU")
+      )
+
+      // Test partition pruning with country filter
+      checkAnswer(s"select id, name, ts, datestr, country, state, city from 
$targetTable where country = 'CAN' order by id")(
+        Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR")
+      )
+
+      // Test partition pruning with combined date and state filter

Review Comment:
   proper way to assert here is. we should corrupt one of the parquet files in 
one another partition which does not match the predicate. and then, the query 
will only succeed if the partition pruning really worked. if not, query will 
hit FileNotFoundException. 
   But we can't afford to do it for every query. since one of the data file 
will be corrupted. So, may be we can do it for 1 or 2 of the partition pruning 
queries you have here. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -267,6 +267,15 @@ object DataSourceReadOptions {
     .sinceVersion("1.1.0")
     .withDocumentation("Fully qualified class name of the catalog that is used 
by the Polaris spark client.")
 
+  val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = 
ConfigProperty
+    
.key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class")
+    .defaultValue("true")

Review Comment:
   since we have an infer function, this might get exercised out of the box. 
lets keep OOB behavior untouched. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to