This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch use-option-resolver in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
commit 59ecf51a65970bd7a08c6b16419acf8ffcabefe9 Author: Shiyan Xu <[email protected]> AuthorDate: Mon Jun 2 13:29:52 2025 -0500 refactor: use `OptionResolver` to handle options --- crates/core/src/table/builder.rs | 153 +++++++++---------------- crates/core/src/table/mod.rs | 1 + crates/core/src/table/validation.rs | 74 ++++++++++++ crates/core/src/util/{mod.rs => collection.rs} | 13 ++- crates/core/src/util/mod.rs | 1 + 5 files changed, 141 insertions(+), 101 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 35e805e..ebb72e2 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -20,30 +20,29 @@ use paste::paste; use std::collections::HashMap; use std::env; -use std::hash::Hash; use std::path::PathBuf; use std::sync::Arc; -use strum::IntoEnumIterator; -use crate::config::internal::HudiInternalConfig::SkipConfigValidation; -use crate::config::read::HudiReadConfig; use crate::config::table::HudiTableConfig; -use crate::config::table::HudiTableConfig::{ - DropsPartitionFields, TableVersion, TimelineLayoutVersion, -}; use crate::config::util::{parse_data_for_options, split_hudi_options_from_others}; use crate::config::{HudiConfigs, HUDI_CONF_DIR}; -use crate::error::CoreError; -use crate::merge::record_merger::RecordMerger; use crate::storage::Storage; use crate::table::fs_view::FileSystemView; +use crate::table::validation::validate_configs; use crate::table::Table; use crate::timeline::Timeline; +use crate::util::collection::extend_if_absent; use crate::Result; /// Builder for creating a [Table] instance. #[derive(Debug, Clone)] pub struct TableBuilder { + option_resolver: OptionResolver, +} + +/// Resolver for options including Hudi options, storage options, and generic options. +#[derive(Debug, Clone)] +pub struct OptionResolver { base_uri: String, hudi_options: HashMap<String, String>, storage_options: HashMap<String, String>, @@ -62,7 +61,8 @@ macro_rules! impl_with_options { K: AsRef<str>, V: Into<String>, { - self.$field.insert(k.as_ref().to_string(), v.into()); + let option_resolver = &mut self.option_resolver; + option_resolver.$field.insert(k.as_ref().to_string(), v.into()); self } @@ -74,7 +74,8 @@ macro_rules! impl_with_options { K: AsRef<str>, V: Into<String>, { - self.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into()))); + let option_resolver = &mut self.option_resolver; + option_resolver.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into()))); self } } @@ -96,23 +97,22 @@ impl_with_options!( impl TableBuilder { /// Create Hudi table builder from base table uri pub fn from_base_uri(base_uri: &str) -> Self { - TableBuilder { - base_uri: base_uri.to_string(), - storage_options: HashMap::new(), - hudi_options: HashMap::new(), - options: HashMap::new(), - } + let option_resolver = OptionResolver::new(base_uri); + TableBuilder { option_resolver } } pub async fn build(&mut self) -> Result<Table> { - self.resolve_options().await?; + let option_resolver = &mut self.option_resolver; + + option_resolver.resolve_options().await?; - let hudi_configs = HudiConfigs::new(self.hudi_options.iter()); + let hudi_configs = HudiConfigs::new(self.option_resolver.hudi_options.iter()); - Self::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); + validate_configs(&hudi_configs).expect("Hudi configs are not valid."); let hudi_configs = Arc::from(hudi_configs); - let storage_options = Arc::from(self.storage_options.clone()); + + let storage_options = Arc::from(self.option_resolver.storage_options.clone()); let timeline = Timeline::new_from_storage(hudi_configs.clone(), storage_options.clone()).await?; @@ -127,6 +127,18 @@ impl TableBuilder { file_system_view, }) } +} + +impl OptionResolver { + /// Create a new [OptionResolver] with the given base URI. + pub fn new(base_uri: &str) -> Self { + Self { + base_uri: base_uri.to_string(), + hudi_options: HashMap::new(), + storage_options: HashMap::new(), + options: HashMap::new(), + } + } /// Resolve all options by combining the ones from hoodie.properties, user-provided options, /// env vars, and global Hudi configs. The precedence order is as follows: @@ -162,8 +174,8 @@ impl TableBuilder { // Combine generic options (lower precedence) with explicit options. // Note that we treat all non-Hudi options as storage options - Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts); - Self::extend_if_absent(&mut self.storage_options, &generic_other_opts) + extend_if_absent(&mut self.hudi_options, &generic_hudi_opts); + extend_if_absent(&mut self.storage_options, &generic_other_opts) } fn resolve_cloud_env_vars(&mut self) { @@ -235,62 +247,6 @@ impl TableBuilder { Ok(()) } - - fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> { - if hudi_configs - .get_or_default(SkipConfigValidation) - .to::<bool>() - { - return Ok(()); - } - - for conf in HudiTableConfig::iter() { - hudi_configs.validate(conf)? - } - - for conf in HudiReadConfig::iter() { - hudi_configs.validate(conf)? - } - - // additional validation - let table_version = hudi_configs.get(TableVersion)?.to::<isize>(); - if !(5..=6).contains(&table_version) { - return Err(CoreError::Unsupported( - "Only support table version 5 and 6.".to_string(), - )); - } - - let timeline_layout_version = hudi_configs.get(TimelineLayoutVersion)?.to::<isize>(); - if timeline_layout_version != 1 { - return Err(CoreError::Unsupported( - "Only support timeline layout version 1.".to_string(), - )); - } - - let drops_partition_cols = hudi_configs - .get_or_default(DropsPartitionFields) - .to::<bool>(); - if drops_partition_cols { - return Err(CoreError::Unsupported(format!( - "Only support when `{}` is disabled", - DropsPartitionFields.as_ref() - ))); - } - - RecordMerger::validate_configs(hudi_configs)?; - - Ok(()) - } - - fn extend_if_absent<K, V>(target: &mut HashMap<K, V>, source: &HashMap<K, V>) - where - K: Eq + Hash + Clone, - V: Clone, - { - for (key, value) in source { - target.entry(key.clone()).or_insert_with(|| value.clone()); - } - } } #[cfg(test)] @@ -298,19 +254,15 @@ mod tests { use super::*; fn create_table_builder() -> TableBuilder { - TableBuilder { - base_uri: "test_uri".to_string(), - hudi_options: HashMap::new(), - storage_options: HashMap::new(), - options: HashMap::new(), - } + let option_resolver = OptionResolver::new("test_uri"); + TableBuilder { option_resolver } } #[test] fn test_with_hudi_option() { let builder = create_table_builder(); - let updated = builder.with_hudi_option("key", "value"); + let updated = builder.with_hudi_option("key", "value").option_resolver; assert_eq!(updated.hudi_options["key"], "value") } @@ -319,7 +271,7 @@ mod tests { let builder = create_table_builder(); let options = [("key1", "value1"), ("key2", "value2")]; - let updated = builder.with_hudi_options(options); + let updated = builder.with_hudi_options(options).option_resolver; assert_eq!(updated.hudi_options["key1"], "value1"); assert_eq!(updated.hudi_options["key2"], "value2") } @@ -328,7 +280,7 @@ mod tests { fn test_with_storage_option() { let builder = create_table_builder(); - let updated = builder.with_storage_option("key", "value"); + let updated = builder.with_storage_option("key", "value").option_resolver; assert_eq!(updated.storage_options["key"], "value") } @@ -337,7 +289,7 @@ mod tests { let builder = create_table_builder(); let options = [("key1", "value1"), ("key2", "value2")]; - let updated = builder.with_storage_options(options); + let updated = builder.with_storage_options(options).option_resolver; assert_eq!(updated.storage_options["key1"], "value1"); assert_eq!(updated.storage_options["key2"], "value2"); } @@ -346,7 +298,7 @@ mod tests { fn test_with_option() { let builder = create_table_builder(); - let updated = builder.with_option("key", "value"); + let updated = builder.with_option("key", "value").option_resolver; assert_eq!(updated.options["key"], "value") } @@ -355,7 +307,7 @@ mod tests { let builder = create_table_builder(); let options = [("key1", "value1"), ("key2", "value2")]; - let updated = builder.with_options(options); + let updated = builder.with_options(options).option_resolver; assert_eq!(updated.options["key1"], "value1"); assert_eq!(updated.options["key2"], "value2") } @@ -381,17 +333,18 @@ mod tests { ("hoodie.option2", "value2-1"), ]); - builder.resolve_user_provided_options(); + let resolver = &mut builder.option_resolver; + resolver.resolve_user_provided_options(); - assert_eq!(builder.hudi_options.len(), 4); - assert_eq!(builder.hudi_options["hoodie.base.path"], "/tmp/hudi_data"); - assert_eq!(builder.hudi_options["hoodie.option1"], "value1-2"); - assert_eq!(builder.hudi_options["hoodie.option2"], "value2-1"); - assert_eq!(builder.hudi_options["hoodie.option3"], "value3"); - assert_eq!(builder.storage_options.len(), 2); - assert_eq!(builder.storage_options["AWS_REGION"], "us-east-1"); + assert_eq!(resolver.hudi_options.len(), 4); + assert_eq!(resolver.hudi_options["hoodie.base.path"], "/tmp/hudi_data"); + assert_eq!(resolver.hudi_options["hoodie.option1"], "value1-2"); + assert_eq!(resolver.hudi_options["hoodie.option2"], "value2-1"); + assert_eq!(resolver.hudi_options["hoodie.option3"], "value3"); + assert_eq!(resolver.storage_options.len(), 2); + assert_eq!(resolver.storage_options["AWS_REGION"], "us-east-1"); assert_eq!( - builder.storage_options["AWS_ENDPOINT"], + resolver.storage_options["AWS_ENDPOINT"], "s3.us-east-1.amazonaws.com" ); } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index f1619e9..7cb72da 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -91,6 +91,7 @@ pub mod builder; mod fs_view; mod listing; pub mod partition; +mod validation; use crate::config::read::HudiReadConfig; use crate::config::table::HudiTableConfig::PartitionFields; diff --git a/crates/core/src/table/validation.rs b/crates/core/src/table/validation.rs new file mode 100644 index 0000000..2694c62 --- /dev/null +++ b/crates/core/src/table/validation.rs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::internal::HudiInternalConfig::SkipConfigValidation; +use crate::config::read::HudiReadConfig; +use crate::config::table::HudiTableConfig; +use crate::config::table::HudiTableConfig::{ + DropsPartitionFields, TableVersion, TimelineLayoutVersion, +}; +use crate::config::HudiConfigs; +use crate::error::CoreError; +use crate::merge::record_merger::RecordMerger; +use strum::IntoEnumIterator; + +pub fn validate_configs(hudi_configs: &HudiConfigs) -> crate::error::Result<()> { + if hudi_configs + .get_or_default(SkipConfigValidation) + .to::<bool>() + { + return Ok(()); + } + + for conf in HudiTableConfig::iter() { + hudi_configs.validate(conf)? + } + + for conf in HudiReadConfig::iter() { + hudi_configs.validate(conf)? + } + + // additional validation + let table_version = hudi_configs.get(TableVersion)?.to::<isize>(); + if !(5..=6).contains(&table_version) { + return Err(CoreError::Unsupported( + "Only support table version 5 and 6.".to_string(), + )); + } + + let timeline_layout_version = hudi_configs.get(TimelineLayoutVersion)?.to::<isize>(); + if timeline_layout_version != 1 { + return Err(CoreError::Unsupported( + "Only support timeline layout version 1.".to_string(), + )); + } + + let drops_partition_cols = hudi_configs + .get_or_default(DropsPartitionFields) + .to::<bool>(); + if drops_partition_cols { + return Err(CoreError::Unsupported(format!( + "Only support when `{}` is disabled", + DropsPartitionFields.as_ref() + ))); + } + + RecordMerger::validate_configs(hudi_configs)?; + + Ok(()) +} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/collection.rs similarity index 73% copy from crates/core/src/util/mod.rs copy to crates/core/src/util/collection.rs index 68bc85e..8c66ac3 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/collection.rs @@ -16,4 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -pub mod arrow; +use std::collections::HashMap; +use std::hash::Hash; + +pub fn extend_if_absent<K, V>(target: &mut HashMap<K, V>, source: &HashMap<K, V>) +where + K: Eq + Hash + Clone, + V: Clone, +{ + for (key, value) in source { + target.entry(key.clone()).or_insert_with(|| value.clone()); + } +} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 68bc85e..c9aa85b 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -17,3 +17,4 @@ * under the License. */ pub mod arrow; +pub mod collection;
