This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new f1c2818 feat: add config validation when creating table (#49)
f1c2818 is described below
commit f1c2818d212e3bca37c08c883508953875535718
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jul 5 23:54:56 2024 -0500
feat: add config validation when creating table (#49)
Guard table create by validating all recognizable hudi configs.
Fixes #40
---
crates/core/Cargo.toml | 1 +
crates/core/src/config/read.rs | 6 +-
crates/core/src/config/table.rs | 9 +--
crates/core/src/storage/mod.rs | 25 ++++----
crates/core/src/table/fs_view.rs | 6 +-
crates/core/src/table/mod.rs | 128 ++++++++++++++++++++++++++++++++------
crates/core/src/table/timeline.rs | 2 +-
7 files changed, 132 insertions(+), 45 deletions(-)
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 06e15a4..5e94cf3 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -61,6 +61,7 @@ bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "0.14.3"
regex = { workspace = true }
+strum = { workspace = true }
strum_macros = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 43879de..2af57e8 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -20,11 +20,11 @@
use std::collections::HashMap;
use std::str::FromStr;
-use anyhow::{anyhow, Result};
-
use crate::config::{ConfigParser, HudiConfigValue};
+use anyhow::{anyhow, Result};
+use strum_macros::EnumIter;
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
InputPartitions,
}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index d503e32..a23ecbd 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -22,11 +22,11 @@ use std::str::FromStr;
use anyhow::anyhow;
use anyhow::Result;
-use strum_macros::AsRefStr;
+use strum_macros::{AsRefStr, EnumIter};
use crate::config::{ConfigParser, HudiConfigValue};
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiTableConfig {
BaseFileFormat,
Checksum,
@@ -80,10 +80,7 @@ impl ConfigParser for HudiTableConfig {
}
fn is_required(&self) -> bool {
- matches!(
- self,
- Self::BaseFileFormat | Self::TableName | Self::TableType |
Self::TableVersion
- )
+ matches!(self, Self::TableName | Self::TableType | Self::TableVersion)
}
fn parse_value(&self, configs: &HashMap<String, String>) ->
Result<Self::Output> {
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 42cb147..374f334 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -41,21 +41,18 @@ pub mod file_info;
pub mod file_stats;
pub mod utils;
-#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct Storage {
base_url: Arc<Url>,
- options: Arc<HashMap<String, String>>,
object_store: Arc<dyn ObjectStore>,
}
impl Storage {
- pub fn new(base_url: Arc<Url>, options: Arc<HashMap<String, String>>) ->
Result<Arc<Storage>> {
- match parse_url_opts(&base_url, &*options) {
- Ok(object_store) => Ok(Arc::new(Storage {
+ pub fn new(base_url: Arc<Url>, options: &HashMap<String, String>) ->
Result<Arc<Storage>> {
+ match parse_url_opts(&base_url, options) {
+ Ok((object_store, _)) => Ok(Arc::new(Storage {
base_url,
- options,
- object_store: Arc::new(object_store.0),
+ object_store: Arc::new(object_store),
})),
Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
}
@@ -216,7 +213,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.unwrap().into_iter().collect();
assert_eq!(
@@ -238,7 +235,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let first_level_dirs: HashSet<ObjPath> = storage
.list_dirs_as_obj_paths(None)
.await
@@ -261,7 +258,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let file_info_1: Vec<FileInfo> = storage
.list_files(None)
.await
@@ -320,7 +317,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
@@ -333,7 +330,7 @@ mod tests {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data/leaf_dir")).unwrap())
.unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
@@ -346,7 +343,7 @@ mod tests {
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let file_info = storage.get_file_info("a.parquet").await.unwrap();
assert_eq!(file_info.name, "a.parquet");
assert_eq!(
@@ -360,7 +357,7 @@ mod tests {
async fn storage_get_parquet_file_data() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let file_data =
storage.get_parquet_file_data("a.parquet").await.unwrap();
assert_eq!(file_data.num_rows(), 5);
}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 02d3ca4..bdc05fc 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -44,7 +44,7 @@ impl FileSystemView {
storage_options: Arc<HashMap<String, String>>,
configs: Arc<HudiConfigs>,
) -> Result<Self> {
- let storage = Storage::new(base_url, storage_options)?;
+ let storage = Storage::new(base_url, &storage_options)?;
let partition_paths = Self::load_partition_paths(&storage).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&storage,
partition_paths).await?;
@@ -176,7 +176,7 @@ mod tests {
#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
.await
.unwrap();
@@ -188,7 +188,7 @@ mod tests {
#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let storage = Storage::new(Arc::new(base_url),
&HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
.await
.unwrap();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e09fd10..1493394 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -19,13 +19,20 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
+use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
+use strum::IntoEnumIterator;
use url::Url;
+use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
+use TableTypeValue::CopyOnWrite;
+
+use crate::config::read::HudiReadConfig;
+use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
use crate::storage::utils::parse_uri;
@@ -39,49 +46,57 @@ mod timeline;
#[derive(Clone, Debug)]
pub struct Table {
pub base_url: Arc<Url>,
- pub storage_options: Arc<HashMap<String, String>>,
pub configs: Arc<HudiConfigs>,
+ pub extra_options: Arc<HashMap<String, String>>,
pub timeline: Timeline,
pub file_system_view: FileSystemView,
}
impl Table {
- pub async fn new(base_uri: &str, storage_options: HashMap<String, String>)
-> Result<Self> {
+ pub async fn new(base_uri: &str, all_options: HashMap<String, String>) ->
Result<Self> {
let base_url = Arc::new(parse_uri(base_uri)?);
- let storage_options = Arc::new(storage_options);
- let configs = Self::load_properties(base_url.clone(),
storage_options.clone())
+ let (configs, extra_options) = Self::load_configs(base_url.clone(),
&all_options)
.await
.context("Failed to load table properties")?;
let configs = Arc::new(configs);
+ let extra_options = Arc::new(extra_options);
- let timeline = Timeline::new(base_url.clone(),
storage_options.clone(), configs.clone())
+ let timeline = Timeline::new(base_url.clone(), extra_options.clone(),
configs.clone())
.await
.context("Failed to load timeline")?;
let file_system_view =
- FileSystemView::new(base_url.clone(), storage_options.clone(),
configs.clone())
+ FileSystemView::new(base_url.clone(), extra_options.clone(),
configs.clone())
.await
.context("Failed to load file system view")?;
Ok(Table {
base_url,
- storage_options,
configs,
+ extra_options,
timeline,
file_system_view,
})
}
- async fn load_properties(
+ async fn load_configs(
base_url: Arc<Url>,
- storage_options: Arc<HashMap<String, String>>,
- ) -> Result<HudiConfigs> {
- let storage = Storage::new(base_url, storage_options)?;
+ all_options: &HashMap<String, String>,
+ ) -> Result<(HudiConfigs, HashMap<String, String>)> {
+ let mut hudi_options = HashMap::new();
+ let mut extra_options = HashMap::new();
+ for (k, v) in all_options {
+ if k.starts_with("hoodie.") {
+ hudi_options.insert(k.clone(), v.clone());
+ } else {
+ extra_options.insert(k.clone(), v.clone());
+ }
+ }
+ let storage = Storage::new(base_url, &extra_options)?;
let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
let cursor = std::io::Cursor::new(data);
let lines = BufReader::new(cursor).lines();
- let mut properties: HashMap<String, String> = HashMap::new();
for line in lines {
let line = line?;
let trimmed_line = line.trim();
@@ -91,9 +106,54 @@ impl Table {
let mut parts = trimmed_line.splitn(2, '=');
let key = parts.next().unwrap().to_owned();
let value = parts.next().unwrap_or("").to_owned();
- properties.insert(key, value);
+ // `hoodie.properties` takes precedence TODO handle conflicts
where applicable
+ hudi_options.insert(key, value);
+ }
+ let hudi_configs = HudiConfigs::new(hudi_options);
+
+ Self::validate_configs(&hudi_configs, &extra_options).map(|_|
(hudi_configs, extra_options))
+ }
+
+ fn validate_configs(
+ hudi_configs: &HudiConfigs,
+ extra_options: &HashMap<String, String>,
+ ) -> Result<()> {
+ if extra_options
+ .get("hoodie_internal.skip.config.validation")
+ .and_then(|v| bool::from_str(v).ok())
+ .unwrap_or(false)
+ {
+ return Ok(());
+ }
+
+ for conf in HudiTableConfig::iter() {
+ hudi_configs.validate(conf)?
}
- Ok(HudiConfigs::new(properties))
+
+ for conf in HudiReadConfig::iter() {
+ hudi_configs.validate(conf)?
+ }
+
+ // additional validation
+ let table_type = hudi_configs.get(TableType)?.to::<String>();
+ if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
+ return Err(anyhow!("Only support copy-on-write table."));
+ }
+
+ let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
+ if !(5..=6).contains(&table_version) {
+ return Err(anyhow!("Only support table version 5 and 6."));
+ }
+
+ let drops_partition_cols =
hudi_configs.get(DropsPartitionFields)?.to::<bool>();
+ if drops_partition_cols {
+ return Err(anyhow!(
+ "Only support when `{}` is disabled",
+ DropsPartitionFields.as_ref()
+ ));
+ }
+
+ Ok(())
}
pub async fn get_schema(&self) -> Result<Schema> {
@@ -326,7 +386,15 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
.unwrap();
- let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let table = Table::new(
+ base_url.as_str(),
+ HashMap::from_iter(vec![(
+ "hoodie_internal.skip.config.validation".to_string(),
+ "true".to_string(),
+ )]),
+ )
+ .await
+ .unwrap();
let configs = table.configs;
assert!(
configs.validate(BaseFileFormat).is_err(),
@@ -377,7 +445,15 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
.unwrap();
- let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let table = Table::new(
+ base_url.as_str(),
+ HashMap::from_iter(vec![(
+ "hoodie_internal.skip.config.validation".to_string(),
+ "true".to_string(),
+ )]),
+ )
+ .await
+ .unwrap();
let configs = table.configs;
assert!(configs.get(BaseFileFormat).is_err());
assert!(configs.get(Checksum).is_err());
@@ -401,7 +477,15 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
.unwrap();
- let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let table = Table::new(
+ base_url.as_str(),
+ HashMap::from_iter(vec![(
+ "hoodie_internal.skip.config.validation".to_string(),
+ "true".to_string(),
+ )]),
+ )
+ .await
+ .unwrap();
let configs = table.configs;
assert!(panic::catch_unwind(||
configs.get_or_default(BaseFileFormat)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(Checksum)).is_err());
@@ -431,7 +515,15 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
.unwrap();
- let table = Table::new(base_url.as_str(),
HashMap::new()).await.unwrap();
+ let table = Table::new(
+ base_url.as_str(),
+ HashMap::from_iter(vec![(
+ "hoodie_internal.skip.config.validation".to_string(),
+ "true".to_string(),
+ )]),
+ )
+ .await
+ .unwrap();
let configs = table.configs;
assert_eq!(
configs.get(BaseFileFormat).unwrap().to::<String>(),
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index d2d476d..9010738 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -88,7 +88,7 @@ impl Timeline {
storage_options: Arc<HashMap<String, String>>,
configs: Arc<HudiConfigs>,
) -> Result<Self> {
- let storage = Storage::new(base_url, storage_options)?;
+ let storage = Storage::new(base_url, &storage_options)?;
let instants = Self::load_completed_commit_instants(&storage).await?;
Ok(Self {
storage,