This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 01ef2fc feat: support loading hudi global configs (#118)
01ef2fc is described below
commit 01ef2fc196b77a4a55a8d2f6b6f74fd011938968
Author: zzhpro <[email protected]>
AuthorDate: Tue Sep 10 15:02:20 2024 +0800
feat: support loading hudi global configs (#118)
Load `hudi-defaults.conf` from `HUDI_CONF_DIR` when creating table
instances.
---------
Co-authored-by: zzhpro <[email protected]>
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/core/src/config/mod.rs | 2 +
crates/core/src/storage/mod.rs | 7 +
crates/core/src/storage/utils.rs | 30 ++++-
crates/core/src/table/mod.rs | 144 +++++++++++++++++----
.../tests/data/hudi_conf_dir/hudi-defaults.conf | 22 ++++
.../table_props_partial/.hoodie/hoodie.properties | 37 ++++++
6 files changed, 215 insertions(+), 27 deletions(-)
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 2b37dc7..3ba5750 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -26,6 +26,8 @@ pub mod internal;
pub mod read;
pub mod table;
+pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
+
pub trait ConfigParser: AsRef<str> {
type Output;
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index c7eb1ec..5ddfde4 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -101,6 +101,13 @@ impl Storage {
Ok(bytes)
}
+ pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str)
-> Result<Bytes> {
+ let obj_path =
ObjPath::from_absolute_path(PathBuf::from(absolute_path))?;
+ let result = self.object_store.get(&obj_path).await?;
+ let bytes = result.bytes().await?;
+ Ok(bytes)
+ }
+
pub async fn get_parquet_file_data(&self, relative_path: &str) ->
Result<RecordBatch> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index a38f813..80c86c6 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-
+use std::collections::HashMap;
+use std::io::{BufRead, BufReader, Cursor};
use std::path::{Path, PathBuf};
use std::str::FromStr;
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
+use bytes::Bytes;
use url::{ParseError, Url};
pub fn split_filename(filename: &str) -> Result<(String, String)> {
@@ -80,6 +82,30 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a
str)> {
std::iter::empty::<(&str, &str)>()
}
+pub async fn parse_config_data(data: &Bytes, split_chars: &str) ->
Result<HashMap<String, String>> {
+ let cursor = Cursor::new(data);
+ let lines = BufReader::new(cursor).lines();
+ let mut configs = HashMap::new();
+
+ for line in lines {
+ let line = line.context("Failed to read line")?;
+ let trimmed_line = line.trim();
+ if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
+ continue;
+ }
+ let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
+ let key = parts
+ .next()
+ .context("Missing key in config line")?
+ .trim()
+ .to_owned();
+ let value = parts.next().unwrap_or("").trim().to_owned();
+ configs.insert(key, value);
+ }
+
+ Ok(configs)
+}
+
#[cfg(test)]
mod tests {
use std::str::FromStr;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e4e1a93..82df7ed 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -19,7 +19,7 @@
use std::collections::HashMap;
use std::env;
-use std::io::{BufRead, BufReader};
+use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
@@ -38,8 +38,9 @@ use crate::config::read::HudiReadConfig;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
+use crate::config::HUDI_CONF_DIR;
use crate::file_group::FileSlice;
-use crate::storage::utils::{empty_options, parse_uri};
+use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
@@ -115,7 +116,6 @@ impl Table {
K: AsRef<str>,
V: Into<String>,
{
- // TODO: load hudi global config
let mut hudi_options = HashMap::new();
let mut extra_options = HashMap::new();
@@ -128,34 +128,68 @@ impl Table {
extra_options.insert(k.as_ref().to_string(), v.into());
}
}
+
let storage = Storage::new(base_url, &extra_options)?;
- let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
- let cursor = std::io::Cursor::new(data);
- let lines = BufReader::new(cursor).lines();
- for line in lines {
- let line = line?;
- let trimmed_line = line.trim();
- if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
- continue;
- }
- let mut parts = trimmed_line.splitn(2, '=');
- let key = parts.next().unwrap().to_owned();
- let value = parts.next().unwrap_or("").to_owned();
- // `hoodie.properties` takes precedence TODO handle conflicts
where applicable
- hudi_options.insert(key, value);
- }
+
+ Self::imbue_table_properties(&mut hudi_options,
storage.clone()).await?;
+
+ Self::imbue_global_hudi_configs(&mut hudi_options,
storage.clone()).await?;
+
let hudi_configs = HudiConfigs::new(hudi_options);
Self::validate_configs(&hudi_configs).map(|_| (hudi_configs,
extra_options))
}
fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
- let prefixes = ["AWS_", "AZURE_", "GOOGLE_"];
- options.extend(
- env::vars()
- .filter(|(key, _)| prefixes.iter().any(|prefix|
key.starts_with(prefix)))
- .map(|(k, v)| (k.to_ascii_lowercase(), v)),
- );
+ const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"];
+
+ for (key, value) in env::vars() {
+ if PREFIXES.iter().any(|prefix| key.starts_with(prefix))
+ && !options.contains_key(&key.to_ascii_lowercase())
+ {
+ options.insert(key.to_ascii_lowercase(), value);
+ }
+ }
+ }
+
+ async fn imbue_table_properties(
+ options: &mut HashMap<String, String>,
+ storage: Arc<Storage>,
+ ) -> Result<()> {
+ let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
+ let table_properties = parse_config_data(&bytes, "=").await?;
+
+ // TODO: handle the case where the same key is present in both table
properties and options
+ for (k, v) in table_properties {
+ options.insert(k.to_string(), v.to_string());
+ }
+
+ Ok(())
+ }
+
+ async fn imbue_global_hudi_configs(
+ options: &mut HashMap<String, String>,
+ storage: Arc<Storage>,
+ ) -> Result<()> {
+ let global_config_path = env::var(HUDI_CONF_DIR)
+ .map(PathBuf::from)
+ .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
+ .join("hudi-defaults.conf");
+
+ if let Ok(bytes) = storage
+
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
+ .await
+ {
+ if let Ok(global_configs) = parse_config_data(&bytes, "
\t=").await {
+ for (key, value) in global_configs {
+ if key.starts_with("hoodie.") &&
!options.contains_key(&key) {
+ options.insert(key.to_string(), value.to_string());
+ }
+ }
+ }
+ }
+
+ Ok(())
}
fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {
@@ -278,8 +312,8 @@ impl Table {
mod tests {
use std::collections::HashSet;
use std::fs::canonicalize;
- use std::panic;
use std::path::Path;
+ use std::{env, panic};
use url::Url;
@@ -292,6 +326,7 @@ mod tests {
PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
TimelineLayoutVersion,
};
+ use crate::config::HUDI_CONF_DIR;
use crate::storage::utils::join_url_segments;
use crate::table::Table;
@@ -599,4 +634,63 @@ mod tests {
assert_eq!(configs.get(TableVersion).unwrap().to::<isize>(), 6);
assert_eq!(configs.get(TimelineLayoutVersion).unwrap().to::<isize>(),
1);
}
+
+ #[tokio::test]
+ async fn get_global_table_props() {
+ // Without the environment variable HUDI_CONF_DIR
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap())
+ .unwrap();
+ let table = Table::new_with_options(
+ base_url.as_str(),
+ [("hoodie.internal.skip.config.validation", "true")],
+ )
+ .await
+ .unwrap();
+ let configs = table.configs;
+ assert!(configs.get(DatabaseName).is_err());
+ assert!(configs.get(TableType).is_err());
+ assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+
+ // Environment variable HUDI_CONF_DIR points to nothing
+ let base_path = env::current_dir().unwrap();
+ let hudi_conf_dir = base_path.join("random/wrong/dir");
+ env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap())
+ .unwrap();
+ let table = Table::new_with_options(
+ base_url.as_str(),
+ [("hoodie.internal.skip.config.validation", "true")],
+ )
+ .await
+ .unwrap();
+ let configs = table.configs;
+ assert!(configs.get(DatabaseName).is_err());
+ assert!(configs.get(TableType).is_err());
+ assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+
+ // With global config
+ let base_path = env::current_dir().unwrap();
+ let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
+ env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
+
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap())
+ .unwrap();
+ let table = Table::new_with_options(
+ base_url.as_str(),
+ [("hoodie.internal.skip.config.validation", "true")],
+ )
+ .await
+ .unwrap();
+ let configs = table.configs;
+ assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
+ assert_eq!(
+ configs.get(TableType).unwrap().to::<String>(),
+ "MERGE_ON_READ"
+ );
+ assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+ env::remove_var(HUDI_CONF_DIR)
+ }
}
diff --git a/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf
b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf
new file mode 100644
index 0000000..d264bc0
--- /dev/null
+++ b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default system properties included when running Hudi jobs.
+# This is useful for setting default environmental settings.
+
+hoodie.database.name tmpdb
+hoodie.table.type= mor
\ No newline at end of file
diff --git
a/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties
b/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties
new file mode 100644
index 0000000..3ceea57
--- /dev/null
+++ b/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.metadata.partitions=files
+hoodie.table.precombine.field=ts
+hoodie.table.partition.fields=city
+hoodie.archivelog.folder=archived
+hoodie.table.cdc.enabled=false
+hoodie.timeline.layout.version=1
+hoodie.table.checksum=3761586722
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.partition.metafile.use.base.format=false
+hoodie.datasource.write.hive_style_partitioning=false
+hoodie.table.metadata.partitions.inflight=
+hoodie.populate.meta.fields=true
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
+hoodie.table.base.file.format=PARQUET
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.version=6