nsivabalan commented on code in PR #17850:
URL: https://github.com/apache/hudi/pull/17850#discussion_r2770846900
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
.sinceVersion("1.0.0")
.withDocumentation("Key Generator type to determine key generator
class");
+ public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS =
ConfigProperty
+ .key("hoodie.datasource.hive_sync.partition_extractor_class")
Review Comment:
none of the table property will have `"hoodie.datasource.` as prefixes.
we should define two configs.
`hoodie.datasource.hive_sync.partition_extractor_class` for writer property.
and
`hoodie.table.hive_sync.partition_extractor_class` for table config.
users should not be able to directly set the table property. they should
always set the writer property only i.e.
`hoodie.datasource.hive_sync.partition_extractor_class`
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -238,10 +242,25 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
// we couldn't reconstruct initial partition column values from
partition paths due to lost data after formatting.
// But the output for these cases is in a string format, so we can pass
partitionPath as UTF8String
Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+ } else if(usePartitionValueExtractorOnRead &&
!StringUtils.isNullOrEmpty(partitionValueExtractorClass)) {
+ try {
+ val partitionValueExtractor =
Class.forName(partitionValueExtractorClass)
Review Comment:
can we move this to a private method to keep this method lean
may be, `parsePartitionValuesBasedOnPartitionValueExtrator`
##########
hudi-common/src/main/java/org/apache/hudi/hive/sync/PartitionValueExtractor.java:
##########
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.sync.common.model;
+package org.apache.hudi.hive.sync;
Review Comment:
lets not change this
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
.sinceVersion("1.0.0")
.withDocumentation("Key Generator type to determine key generator
class");
+ public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS =
ConfigProperty
+ .key("hoodie.datasource.hive_sync.partition_extractor_class")
+ .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
+ .withInferFunction(cfg -> {
+ Option<String> partitionFieldsOpt =
HoodieTableConfig.getPartitionFieldProp(cfg)
+ .or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)));
+
+ if (!partitionFieldsOpt.isPresent()) {
+ return Option.empty();
Review Comment:
is this not NonPartitionedExtractor ?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashKeyGenerator.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.keygen.{BuiltinKeyGenerator, ComplexAvroKeyGenerator,
KeyGenUtils}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.Arrays
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Key generator that converts partition values to slash-separated partition
paths.
+ *
+ * This generator is useful when you have partition columns like:
+ * - datestr: "yyyy-mm-dd" format
+ * - country, state, city: regular string values
+ *
+ * And you want to create partition paths like: yyyy/mm/dd/country/state/city
+ *
+ * The first partition field (typically a date) will have its hyphens replaced
with slashes.
+ * All partition fields are then combined with "/" as the separator.
+ */
+class TestCustomSlashKeyGenerator(props: TypedProperties) extends
BuiltinKeyGenerator(props) {
Review Comment:
we should name this class differently.
only test class should typically be named like this.
how about `MockSlashKeyGenerator` or `UserDefinedSlashKeyGenerator`
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -267,6 +267,15 @@ object DataSourceReadOptions {
.sinceVersion("1.1.0")
.withDocumentation("Fully qualified class name of the catalog that is used
by the Polaris spark client.")
+ val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] =
ConfigProperty
+
.key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class")
+ .defaultValue("true")
Review Comment:
can we disable by default.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomSlashPartitionValueExtractor.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.hive.sync.PartitionValueExtractor
+
+import java.util
+
+class TestCustomSlashPartitionValueExtractor extends PartitionValueExtractor {
Review Comment:
same here
##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java:
##########
@@ -116,43 +114,7 @@ public class HoodieSyncConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Field in the table to use for determining hive
partition columns.");
- public static final ConfigProperty<String>
META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
Review Comment:
lets leave this as is w/o any changes
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ val usePartitionValueExtractorForReaders = getConfigValue(options, sqlConf,
Review Comment:
again, lets try to align method names, variables names across the board so
that its easier to maintain.
It could lead to confusion.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,9 +527,17 @@ object HoodieFileIndex extends Logging {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ val usePartitionValueExtractorForReaders = getConfigValue(options, sqlConf,
Review Comment:
`usePartitionValueExtractorForReaders` -> `usePartitionValueExtractorOnRead`
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.sync.common.HoodieSyncConfig
+
+import org.junit.jupiter.api.Assertions.assertTrue
+
+class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase {
+ test("Test custom partition value extractor interface") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+ |
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+ |
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+ // yyyy/mm/dd/country/state/city
+ spark.sql(
+ s"""
+ | insert into $targetTable values
+ | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+ | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"),
+ | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"),
+ | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"),
+ | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO")
+ """.stripMargin)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable"
+ + s" where state = 'CA'")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"),
+ Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO")
+ )
+
+ // Verify table config has custom partition value extractor class set
+ val metaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build()
+ val tableConfig = metaClient.getTableConfig
+ val partitionExtractorClass = tableConfig.getProps.getProperty(
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key())
+ assertTrue(partitionExtractorClass ==
"org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor",
+ s"Table config should have custom partition value extractor class set
to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass")
+
+ // Verify that partition paths are created with slash separated format
(yyyy/MM/dd/country/state/city)
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/01/USA/CA/SFO")),
+ s"Partition path 2024/01/01/USA/CA/SFO should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/01/USA/TX/AU")),
+ s"Partition path 2024/01/01/USA/TX/AU should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/02/USA/CA/LA")),
+ s"Partition path 2024/01/02/USA/CA/LA should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/02/USA/WA/SEA")),
+ s"Partition path 2024/01/02/USA/WA/SEA should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/03/USA/CA/SFO")),
+ s"Partition path 2024/01/03/USA/CA/SFO should exist")
+
+ val engine = new HoodieSparkEngineContext(spark.sparkContext)
+ val storage = metaClient.getStorage()
+ val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+ val metadataTable = new HoodieBackedTableMetadata(engine, storage,
metadataConfig, tablePath)
+ val partitionPaths = metadataTable.getAllPartitionPaths
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+ assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO"))
+ metadataTable.close()
+ }
+ }
+
+ test("Test custom partition value extractor with partition pruning and
filtering") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+ |
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+
+ // Insert data across multiple partitions
+ spark.sql(
+ s"""
+ | insert into $targetTable values
+ | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+ | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"),
+ | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"),
+ | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"),
+ | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"),
+ | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"),
+ | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"),
+ | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC")
+ """.stripMargin)
+
+ // Test partition pruning with single partition column filter (state)
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state = 'CA' order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+ Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA")
+ )
+
+ // Test partition pruning with multiple partition column filters
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state = 'CA' and city = 'SFO' order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+ )
+
+ // Test partition pruning with date filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where datestr = '2024-01-01' order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU")
+ )
+
+ // Test partition pruning with country filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where country = 'CAN' order by id")(
+ Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR")
+ )
+
+ // Test partition pruning with combined date and state filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where datestr = '2024-01-02' and state = 'CA' order by id")(
+ Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+ )
+
+ // Test partition pruning with IN clause
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state IN ('CA', 'NY') order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+ Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA"),
+ Seq(8, "a8", 8000, "2024-01-04", "USA", "NY", "NYC")
+ )
+
+ // Test reading with _hoodie_partition_path to verify custom partition
format
+ checkAnswer(s"select id, _hoodie_partition_path from $targetTable where
state = 'CA' order by id")(
+ Seq("1", "2024/01/01/USA/CA/SFO"),
+ Seq("2", "2024/01/01/USA/CA/LA"),
+ Seq("4", "2024/01/02/USA/CA/SFO"),
+ Seq("6", "2024/01/03/USA/CA/LA")
+ )
+
+ // Create DataFrame and analyze query plan to verify partition pruning
+ val dfWithStateFilter = spark.sql(s"select * from $targetTable where
state = 'CA'")
+ val planWithStateFilter =
dfWithStateFilter.queryExecution.executedPlan.toString()
+ // Verify partition filters are pushed down
+ assertTrue(planWithStateFilter.contains("PartitionFilters") ||
planWithStateFilter.contains("PushedFilters"),
+ s"Query plan should contain partition filters for state column")
+
+ // Test DataFrame API with multiple partition filters
+ val dfWithMultipleFilters = spark.table(targetTable)
+ .filter("state = 'CA' and datestr = '2024-01-01'")
+ val planWithMultipleFilters =
dfWithMultipleFilters.queryExecution.executedPlan.toString()
+ assertTrue(planWithMultipleFilters.contains("PartitionFilters") ||
planWithMultipleFilters.contains("PushedFilters"),
+ s"Query plan should contain partition filters for multiple columns")
+
+ // Verify the filtered results
+ val multiFilterResults = dfWithMultipleFilters.select("id", "name",
"state", "datestr").orderBy("id").collect()
+ assertTrue(multiFilterResults.length == 2, s"Expected 2 rows but got
${multiFilterResults.length}")
+ assertTrue(multiFilterResults(0).getString(0) == "1", s"First row id
should be 1")
+ assertTrue(multiFilterResults(0).getString(1) == "a1", s"First row name
should be a1")
+ assertTrue(multiFilterResults(0).getString(2) == "CA", s"First row state
should be CA")
+ assertTrue(multiFilterResults(0).getString(3) == "2024-01-01", s"First
row datestr should be 2024-01-01")
+ assertTrue(multiFilterResults(1).getString(0) == "2", s"Second row id
should be 2")
+ assertTrue(multiFilterResults(1).getString(1) == "a2", s"Second row name
should be a2")
+
+ // Test DataFrame with country filter
+ val dfWithCountryFilter = spark.table(targetTable).filter("country =
'CAN'")
+ val planWithCountryFilter =
dfWithCountryFilter.queryExecution.executedPlan.toString()
+ assertTrue(planWithCountryFilter.contains("PartitionFilters") ||
planWithCountryFilter.contains("PushedFilters"),
+ s"Query plan should contain partition filters for country column")
+
+ val countryFilterResults = dfWithCountryFilter.select("id",
"country").orderBy("id").collect()
+ assertTrue(countryFilterResults.length == 1, s"Expected 1 row but got
${countryFilterResults.length}")
+ assertTrue(countryFilterResults(0).getString(0) == "7", s"Row id should
be 7")
+ assertTrue(countryFilterResults(0).getString(1) == "CAN", s"Row country
should be CAN")
+
+ // Verify all partitions exist as expected
+ val metaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build()
+
+ val engine = new HoodieSparkEngineContext(spark.sparkContext)
+ val storage = metaClient.getStorage()
+ val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+ val metadataTable = new HoodieBackedTableMetadata(engine, storage,
metadataConfig, tablePath)
+ val partitionPaths = metadataTable.getAllPartitionPaths
+
+ // Verify expected partition paths
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+ assertTrue(partitionPaths.contains("2024/01/03/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/03/CAN/ON/TOR"))
+ assertTrue(partitionPaths.contains("2024/01/04/USA/NY/NYC"))
+
+ metadataTable.close()
+ }
+ }
Review Comment:
I understand hive style partitioning and the custom partition value
extractor are mutually exclusive.
but can we add a test w/ custom partition value extractor and url encoding
enabled.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -335,6 +335,37 @@ public static final String getDefaultPayloadClassName() {
.sinceVersion("1.0.0")
.withDocumentation("Key Generator type to determine key generator
class");
+ public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS =
ConfigProperty
+ .key("hoodie.datasource.hive_sync.partition_extractor_class")
+ .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
+ .withInferFunction(cfg -> {
Review Comment:
we should not be setting any default here right?
ok to have the infer function.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestCustomParitionValueExtractor.scala:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.sync.common.HoodieSyncConfig
+
+import org.junit.jupiter.api.Assertions.assertTrue
+
+class TestCustomParitionValueExtractor extends HoodieSparkSqlTestBase {
+ test("Test custom partition value extractor interface") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+ |
'hoodie.table.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+ |
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+ // yyyy/mm/dd/country/state/city
+ spark.sql(
+ s"""
+ | insert into $targetTable values
+ | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+ | (2, 'a2', 2000, '2024-01-01', "USA", "TX", "AU"),
+ | (3, 'a3', 3000, '2024-01-02', "USA", "CA", "LA"),
+ | (4, 'a4', 4000, '2024-01-02', "USA", "WA", "SEA"),
+ | (5, 'a5', 5000, '2024-01-03', "USA", "CA", "SFO")
+ """.stripMargin)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable"
+ + s" where state = 'CA'")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(3, "a3", 3000, "2024-01-02", "USA", "CA", "LA"),
+ Seq(5, "a5", 5000, "2024-01-03", "USA", "CA", "SFO")
+ )
+
+ // Verify table config has custom partition value extractor class set
+ val metaClient = HoodieTableMetaClient.builder()
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build()
+ val tableConfig = metaClient.getTableConfig
+ val partitionExtractorClass = tableConfig.getProps.getProperty(
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key())
+ assertTrue(partitionExtractorClass ==
"org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor",
+ s"Table config should have custom partition value extractor class set
to TestCustomSlashPartitionValueExtractor, but got $partitionExtractorClass")
+
+ // Verify that partition paths are created with slash separated format
(yyyy/MM/dd/country/state/city)
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/01/USA/CA/SFO")),
+ s"Partition path 2024/01/01/USA/CA/SFO should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/01/USA/TX/AU")),
+ s"Partition path 2024/01/01/USA/TX/AU should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/02/USA/CA/LA")),
+ s"Partition path 2024/01/02/USA/CA/LA should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/02/USA/WA/SEA")),
+ s"Partition path 2024/01/02/USA/WA/SEA should exist")
+ assertTrue(metaClient.getStorage.exists(new StoragePath(tablePath,
"2024/01/03/USA/CA/SFO")),
+ s"Partition path 2024/01/03/USA/CA/SFO should exist")
+
+ val engine = new HoodieSparkEngineContext(spark.sparkContext)
+ val storage = metaClient.getStorage()
+ val metadataConfig = HoodieMetadataConfig.newBuilder().build()
+ val metadataTable = new HoodieBackedTableMetadata(engine, storage,
metadataConfig, tablePath)
+ val partitionPaths = metadataTable.getAllPartitionPaths
+ assertTrue(partitionPaths.contains("2024/01/01/USA/CA/SFO"))
+ assertTrue(partitionPaths.contains("2024/01/01/USA/TX/AU"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/CA/LA"))
+ assertTrue(partitionPaths.contains("2024/01/02/USA/WA/SEA"))
+ assertTrue(partitionPaths.contains("2024/01/03/USA/CA/SFO"))
+ metadataTable.close()
+ }
+ }
+
+ test("Test custom partition value extractor with partition pruning and
filtering") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | `id` string,
+ | `name` string,
+ | `ts` bigint,
+ | `datestr` string,
+ | `country` string,
+ | `state` string,
+ | `city` string
+ |) using hudi
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'COW',
+ | 'preCombineField'='ts',
+ | 'hoodie.datasource.write.hive_style_partitioning'='false',
+ |
'hoodie.datasource.write.keygenerator.class'='org.apache.spark.sql.hudi.common.TestCustomSlashKeyGenerator',
+ |
'${HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key()}'='org.apache.spark.sql.hudi.common.TestCustomSlashPartitionValueExtractor'
+ | )
+ | partitioned by (`datestr`, `country`, `state`, `city`)
+ | location '$tablePath'
+ """.stripMargin)
+
+ // Insert data across multiple partitions
+ spark.sql(
+ s"""
+ | insert into $targetTable values
+ | (1, 'a1', 1000, '2024-01-01', "USA", "CA", "SFO"),
+ | (2, 'a2', 2000, '2024-01-01', "USA", "CA", "LA"),
+ | (3, 'a3', 3000, '2024-01-01', "USA", "TX", "AU"),
+ | (4, 'a4', 4000, '2024-01-02', "USA", "CA", "SFO"),
+ | (5, 'a5', 5000, '2024-01-02', "USA", "WA", "SEA"),
+ | (6, 'a6', 6000, '2024-01-03', "USA", "CA", "LA"),
+ | (7, 'a7', 7000, '2024-01-03', "CAN", "ON", "TOR"),
+ | (8, 'a8', 8000, '2024-01-04', "USA", "NY", "NYC")
+ """.stripMargin)
+
+ // Test partition pruning with single partition column filter (state)
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state = 'CA' order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO"),
+ Seq(6, "a6", 6000, "2024-01-03", "USA", "CA", "LA")
+ )
+
+ // Test partition pruning with multiple partition column filters
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where state = 'CA' and city = 'SFO' order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(4, "a4", 4000, "2024-01-02", "USA", "CA", "SFO")
+ )
+
+ // Test partition pruning with date filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where datestr = '2024-01-01' order by id")(
+ Seq(1, "a1", 1000, "2024-01-01", "USA", "CA", "SFO"),
+ Seq(2, "a2", 2000, "2024-01-01", "USA", "CA", "LA"),
+ Seq(3, "a3", 3000, "2024-01-01", "USA", "TX", "AU")
+ )
+
+ // Test partition pruning with country filter
+ checkAnswer(s"select id, name, ts, datestr, country, state, city from
$targetTable where country = 'CAN' order by id")(
+ Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR")
+ )
+
+ // Test partition pruning with combined date and state filter
Review Comment:
proper way to assert here is. we should corrupt one of the parquet files in
one another partition which does not match the predicate. and then, the query
will only succeed if the partition pruning really worked. if not, query will
hit FileNotFoundException.
But we can't afford to do it for every query. since one of the data file
will be corrupted. So, may be we can do it for 1 or 2 of the partition pruning
queries you have here.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -267,6 +267,15 @@ object DataSourceReadOptions {
.sinceVersion("1.1.0")
.withDocumentation("Fully qualified class name of the catalog that is used
by the Polaris spark client.")
+ val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] =
ConfigProperty
+
.key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class")
+ .defaultValue("true")
Review Comment:
since we have an infer function, this might get exercised out of the box.
lets keep OOB behavior untouched.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]