This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4bf6fd7 refactor: use `OptionResolver` to handle options (#341)
4bf6fd7 is described below
commit 4bf6fd70b8039d68b52a07876208b4ae9aa0ff35
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jun 2 13:44:50 2025 -0500
refactor: use `OptionResolver` to handle options (#341)
Add `OptionResolver` to handle resolving options during table instance
creation. The logic can be re-used for creating file group reader.
---
crates/core/src/table/builder.rs | 153 +++++++++----------------
crates/core/src/table/mod.rs | 1 +
crates/core/src/table/validation.rs | 74 ++++++++++++
crates/core/src/util/{mod.rs => collection.rs} | 13 ++-
crates/core/src/util/mod.rs | 1 +
5 files changed, 141 insertions(+), 101 deletions(-)
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index 35e805e..ebb72e2 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -20,30 +20,29 @@
use paste::paste;
use std::collections::HashMap;
use std::env;
-use std::hash::Hash;
use std::path::PathBuf;
use std::sync::Arc;
-use strum::IntoEnumIterator;
-use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
-use crate::config::read::HudiReadConfig;
use crate::config::table::HudiTableConfig;
-use crate::config::table::HudiTableConfig::{
- DropsPartitionFields, TableVersion, TimelineLayoutVersion,
-};
use crate::config::util::{parse_data_for_options,
split_hudi_options_from_others};
use crate::config::{HudiConfigs, HUDI_CONF_DIR};
-use crate::error::CoreError;
-use crate::merge::record_merger::RecordMerger;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
+use crate::table::validation::validate_configs;
use crate::table::Table;
use crate::timeline::Timeline;
+use crate::util::collection::extend_if_absent;
use crate::Result;
/// Builder for creating a [Table] instance.
#[derive(Debug, Clone)]
pub struct TableBuilder {
+ option_resolver: OptionResolver,
+}
+
+/// Resolver for options including Hudi options, storage options, and generic
options.
+#[derive(Debug, Clone)]
+pub struct OptionResolver {
base_uri: String,
hudi_options: HashMap<String, String>,
storage_options: HashMap<String, String>,
@@ -62,7 +61,8 @@ macro_rules! impl_with_options {
K: AsRef<str>,
V: Into<String>,
{
- self.$field.insert(k.as_ref().to_string(), v.into());
+ let option_resolver = &mut self.option_resolver;
+ option_resolver.$field.insert(k.as_ref().to_string(),
v.into());
self
}
@@ -74,7 +74,8 @@ macro_rules! impl_with_options {
K: AsRef<str>,
V: Into<String>,
{
- self.$field.extend(options.into_iter().map(|(k, v)|
(k.as_ref().to_string(), v.into())));
+ let option_resolver = &mut self.option_resolver;
+
option_resolver.$field.extend(options.into_iter().map(|(k, v)|
(k.as_ref().to_string(), v.into())));
self
}
}
@@ -96,23 +97,22 @@ impl_with_options!(
impl TableBuilder {
/// Create Hudi table builder from base table uri
pub fn from_base_uri(base_uri: &str) -> Self {
- TableBuilder {
- base_uri: base_uri.to_string(),
- storage_options: HashMap::new(),
- hudi_options: HashMap::new(),
- options: HashMap::new(),
- }
+ let option_resolver = OptionResolver::new(base_uri);
+ TableBuilder { option_resolver }
}
pub async fn build(&mut self) -> Result<Table> {
- self.resolve_options().await?;
+ let option_resolver = &mut self.option_resolver;
+
+ option_resolver.resolve_options().await?;
- let hudi_configs = HudiConfigs::new(self.hudi_options.iter());
+ let hudi_configs =
HudiConfigs::new(self.option_resolver.hudi_options.iter());
- Self::validate_configs(&hudi_configs).expect("Hudi configs are not
valid.");
+ validate_configs(&hudi_configs).expect("Hudi configs are not valid.");
let hudi_configs = Arc::from(hudi_configs);
- let storage_options = Arc::from(self.storage_options.clone());
+
+ let storage_options =
Arc::from(self.option_resolver.storage_options.clone());
let timeline =
Timeline::new_from_storage(hudi_configs.clone(),
storage_options.clone()).await?;
@@ -127,6 +127,18 @@ impl TableBuilder {
file_system_view,
})
}
+}
+
+impl OptionResolver {
+ /// Create a new [OptionResolver] with the given base URI.
+ pub fn new(base_uri: &str) -> Self {
+ Self {
+ base_uri: base_uri.to_string(),
+ hudi_options: HashMap::new(),
+ storage_options: HashMap::new(),
+ options: HashMap::new(),
+ }
+ }
/// Resolve all options by combining the ones from hoodie.properties,
user-provided options,
/// env vars, and global Hudi configs. The precedence order is as follows:
@@ -162,8 +174,8 @@ impl TableBuilder {
// Combine generic options (lower precedence) with explicit options.
// Note that we treat all non-Hudi options as storage options
- Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts);
- Self::extend_if_absent(&mut self.storage_options, &generic_other_opts)
+ extend_if_absent(&mut self.hudi_options, &generic_hudi_opts);
+ extend_if_absent(&mut self.storage_options, &generic_other_opts)
}
fn resolve_cloud_env_vars(&mut self) {
@@ -235,62 +247,6 @@ impl TableBuilder {
Ok(())
}
-
- fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {
- if hudi_configs
- .get_or_default(SkipConfigValidation)
- .to::<bool>()
- {
- return Ok(());
- }
-
- for conf in HudiTableConfig::iter() {
- hudi_configs.validate(conf)?
- }
-
- for conf in HudiReadConfig::iter() {
- hudi_configs.validate(conf)?
- }
-
- // additional validation
- let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
- if !(5..=6).contains(&table_version) {
- return Err(CoreError::Unsupported(
- "Only support table version 5 and 6.".to_string(),
- ));
- }
-
- let timeline_layout_version =
hudi_configs.get(TimelineLayoutVersion)?.to::<isize>();
- if timeline_layout_version != 1 {
- return Err(CoreError::Unsupported(
- "Only support timeline layout version 1.".to_string(),
- ));
- }
-
- let drops_partition_cols = hudi_configs
- .get_or_default(DropsPartitionFields)
- .to::<bool>();
- if drops_partition_cols {
- return Err(CoreError::Unsupported(format!(
- "Only support when `{}` is disabled",
- DropsPartitionFields.as_ref()
- )));
- }
-
- RecordMerger::validate_configs(hudi_configs)?;
-
- Ok(())
- }
-
- fn extend_if_absent<K, V>(target: &mut HashMap<K, V>, source: &HashMap<K,
V>)
- where
- K: Eq + Hash + Clone,
- V: Clone,
- {
- for (key, value) in source {
- target.entry(key.clone()).or_insert_with(|| value.clone());
- }
- }
}
#[cfg(test)]
@@ -298,19 +254,15 @@ mod tests {
use super::*;
fn create_table_builder() -> TableBuilder {
- TableBuilder {
- base_uri: "test_uri".to_string(),
- hudi_options: HashMap::new(),
- storage_options: HashMap::new(),
- options: HashMap::new(),
- }
+ let option_resolver = OptionResolver::new("test_uri");
+ TableBuilder { option_resolver }
}
#[test]
fn test_with_hudi_option() {
let builder = create_table_builder();
- let updated = builder.with_hudi_option("key", "value");
+ let updated = builder.with_hudi_option("key", "value").option_resolver;
assert_eq!(updated.hudi_options["key"], "value")
}
@@ -319,7 +271,7 @@ mod tests {
let builder = create_table_builder();
let options = [("key1", "value1"), ("key2", "value2")];
- let updated = builder.with_hudi_options(options);
+ let updated = builder.with_hudi_options(options).option_resolver;
assert_eq!(updated.hudi_options["key1"], "value1");
assert_eq!(updated.hudi_options["key2"], "value2")
}
@@ -328,7 +280,7 @@ mod tests {
fn test_with_storage_option() {
let builder = create_table_builder();
- let updated = builder.with_storage_option("key", "value");
+ let updated = builder.with_storage_option("key",
"value").option_resolver;
assert_eq!(updated.storage_options["key"], "value")
}
@@ -337,7 +289,7 @@ mod tests {
let builder = create_table_builder();
let options = [("key1", "value1"), ("key2", "value2")];
- let updated = builder.with_storage_options(options);
+ let updated = builder.with_storage_options(options).option_resolver;
assert_eq!(updated.storage_options["key1"], "value1");
assert_eq!(updated.storage_options["key2"], "value2");
}
@@ -346,7 +298,7 @@ mod tests {
fn test_with_option() {
let builder = create_table_builder();
- let updated = builder.with_option("key", "value");
+ let updated = builder.with_option("key", "value").option_resolver;
assert_eq!(updated.options["key"], "value")
}
@@ -355,7 +307,7 @@ mod tests {
let builder = create_table_builder();
let options = [("key1", "value1"), ("key2", "value2")];
- let updated = builder.with_options(options);
+ let updated = builder.with_options(options).option_resolver;
assert_eq!(updated.options["key1"], "value1");
assert_eq!(updated.options["key2"], "value2")
}
@@ -381,17 +333,18 @@ mod tests {
("hoodie.option2", "value2-1"),
]);
- builder.resolve_user_provided_options();
+ let resolver = &mut builder.option_resolver;
+ resolver.resolve_user_provided_options();
- assert_eq!(builder.hudi_options.len(), 4);
- assert_eq!(builder.hudi_options["hoodie.base.path"], "/tmp/hudi_data");
- assert_eq!(builder.hudi_options["hoodie.option1"], "value1-2");
- assert_eq!(builder.hudi_options["hoodie.option2"], "value2-1");
- assert_eq!(builder.hudi_options["hoodie.option3"], "value3");
- assert_eq!(builder.storage_options.len(), 2);
- assert_eq!(builder.storage_options["AWS_REGION"], "us-east-1");
+ assert_eq!(resolver.hudi_options.len(), 4);
+ assert_eq!(resolver.hudi_options["hoodie.base.path"],
"/tmp/hudi_data");
+ assert_eq!(resolver.hudi_options["hoodie.option1"], "value1-2");
+ assert_eq!(resolver.hudi_options["hoodie.option2"], "value2-1");
+ assert_eq!(resolver.hudi_options["hoodie.option3"], "value3");
+ assert_eq!(resolver.storage_options.len(), 2);
+ assert_eq!(resolver.storage_options["AWS_REGION"], "us-east-1");
assert_eq!(
- builder.storage_options["AWS_ENDPOINT"],
+ resolver.storage_options["AWS_ENDPOINT"],
"s3.us-east-1.amazonaws.com"
);
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index f1619e9..7cb72da 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -91,6 +91,7 @@ pub mod builder;
mod fs_view;
mod listing;
pub mod partition;
+mod validation;
use crate::config::read::HudiReadConfig;
use crate::config::table::HudiTableConfig::PartitionFields;
diff --git a/crates/core/src/table/validation.rs
b/crates/core/src/table/validation.rs
new file mode 100644
index 0000000..2694c62
--- /dev/null
+++ b/crates/core/src/table/validation.rs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
+use crate::config::read::HudiReadConfig;
+use crate::config::table::HudiTableConfig;
+use crate::config::table::HudiTableConfig::{
+ DropsPartitionFields, TableVersion, TimelineLayoutVersion,
+};
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::merge::record_merger::RecordMerger;
+use strum::IntoEnumIterator;
+
+pub fn validate_configs(hudi_configs: &HudiConfigs) ->
crate::error::Result<()> {
+ if hudi_configs
+ .get_or_default(SkipConfigValidation)
+ .to::<bool>()
+ {
+ return Ok(());
+ }
+
+ for conf in HudiTableConfig::iter() {
+ hudi_configs.validate(conf)?
+ }
+
+ for conf in HudiReadConfig::iter() {
+ hudi_configs.validate(conf)?
+ }
+
+ // additional validation
+ let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
+ if !(5..=6).contains(&table_version) {
+ return Err(CoreError::Unsupported(
+ "Only support table version 5 and 6.".to_string(),
+ ));
+ }
+
+ let timeline_layout_version =
hudi_configs.get(TimelineLayoutVersion)?.to::<isize>();
+ if timeline_layout_version != 1 {
+ return Err(CoreError::Unsupported(
+ "Only support timeline layout version 1.".to_string(),
+ ));
+ }
+
+ let drops_partition_cols = hudi_configs
+ .get_or_default(DropsPartitionFields)
+ .to::<bool>();
+ if drops_partition_cols {
+ return Err(CoreError::Unsupported(format!(
+ "Only support when `{}` is disabled",
+ DropsPartitionFields.as_ref()
+ )));
+ }
+
+ RecordMerger::validate_configs(hudi_configs)?;
+
+ Ok(())
+}
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/collection.rs
similarity index 73%
copy from crates/core/src/util/mod.rs
copy to crates/core/src/util/collection.rs
index 68bc85e..8c66ac3 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/collection.rs
@@ -16,4 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-pub mod arrow;
+use std::collections::HashMap;
+use std::hash::Hash;
+
+pub fn extend_if_absent<K, V>(target: &mut HashMap<K, V>, source: &HashMap<K,
V>)
+where
+ K: Eq + Hash + Clone,
+ V: Clone,
+{
+ for (key, value) in source {
+ target.entry(key.clone()).or_insert_with(|| value.clone());
+ }
+}
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 68bc85e..c9aa85b 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -17,3 +17,4 @@
* under the License.
*/
pub mod arrow;
+pub mod collection;