This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 01ef2fc  feat: support loading hudi global configs (#118)
01ef2fc is described below

commit 01ef2fc196b77a4a55a8d2f6b6f74fd011938968
Author: zzhpro <[email protected]>
AuthorDate: Tue Sep 10 15:02:20 2024 +0800

    feat: support loading hudi global configs (#118)
    
    Load `hudi-defaults.conf` from `HUDI_CONF_DIR` when creating table 
instances.
    
    ---------
    
    Co-authored-by: zzhpro <[email protected]>
    Co-authored-by: Shiyan Xu <[email protected]>
---
 crates/core/src/config/mod.rs                      |   2 +
 crates/core/src/storage/mod.rs                     |   7 +
 crates/core/src/storage/utils.rs                   |  30 ++++-
 crates/core/src/table/mod.rs                       | 144 +++++++++++++++++----
 .../tests/data/hudi_conf_dir/hudi-defaults.conf    |  22 ++++
 .../table_props_partial/.hoodie/hoodie.properties  |  37 ++++++
 6 files changed, 215 insertions(+), 27 deletions(-)

diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 2b37dc7..3ba5750 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -26,6 +26,8 @@ pub mod internal;
 pub mod read;
 pub mod table;
 
+pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
+
 pub trait ConfigParser: AsRef<str> {
     type Output;
 
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index c7eb1ec..5ddfde4 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -101,6 +101,13 @@ impl Storage {
         Ok(bytes)
     }
 
+    pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) 
-> Result<Bytes> {
+        let obj_path = 
ObjPath::from_absolute_path(PathBuf::from(absolute_path))?;
+        let result = self.object_store.get(&obj_path).await?;
+        let bytes = result.bytes().await?;
+        Ok(bytes)
+    }
+
     pub async fn get_parquet_file_data(&self, relative_path: &str) -> 
Result<RecordBatch> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index a38f813..80c86c6 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
+use std::collections::HashMap;
+use std::io::{BufRead, BufReader, Cursor};
 use std::path::{Path, PathBuf};
 use std::str::FromStr;
 
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
+use bytes::Bytes;
 use url::{ParseError, Url};
 
 pub fn split_filename(filename: &str) -> Result<(String, String)> {
@@ -80,6 +82,30 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a 
str)> {
     std::iter::empty::<(&str, &str)>()
 }
 
+pub async fn parse_config_data(data: &Bytes, split_chars: &str) -> 
Result<HashMap<String, String>> {
+    let cursor = Cursor::new(data);
+    let lines = BufReader::new(cursor).lines();
+    let mut configs = HashMap::new();
+
+    for line in lines {
+        let line = line.context("Failed to read line")?;
+        let trimmed_line = line.trim();
+        if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
+            continue;
+        }
+        let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
+        let key = parts
+            .next()
+            .context("Missing key in config line")?
+            .trim()
+            .to_owned();
+        let value = parts.next().unwrap_or("").trim().to_owned();
+        configs.insert(key, value);
+    }
+
+    Ok(configs)
+}
+
 #[cfg(test)]
 mod tests {
     use std::str::FromStr;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e4e1a93..82df7ed 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -19,7 +19,7 @@
 
 use std::collections::HashMap;
 use std::env;
-use std::io::{BufRead, BufReader};
+use std::path::PathBuf;
 use std::str::FromStr;
 use std::sync::Arc;
 
@@ -38,8 +38,9 @@ use crate::config::read::HudiReadConfig;
 use crate::config::read::HudiReadConfig::AsOfTimestamp;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
+use crate::config::HUDI_CONF_DIR;
 use crate::file_group::FileSlice;
-use crate::storage::utils::{empty_options, parse_uri};
+use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
 use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
 use crate::table::timeline::Timeline;
@@ -115,7 +116,6 @@ impl Table {
         K: AsRef<str>,
         V: Into<String>,
     {
-        // TODO: load hudi global config
         let mut hudi_options = HashMap::new();
         let mut extra_options = HashMap::new();
 
@@ -128,34 +128,68 @@ impl Table {
                 extra_options.insert(k.as_ref().to_string(), v.into());
             }
         }
+
         let storage = Storage::new(base_url, &extra_options)?;
-        let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
-        let cursor = std::io::Cursor::new(data);
-        let lines = BufReader::new(cursor).lines();
-        for line in lines {
-            let line = line?;
-            let trimmed_line = line.trim();
-            if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
-                continue;
-            }
-            let mut parts = trimmed_line.splitn(2, '=');
-            let key = parts.next().unwrap().to_owned();
-            let value = parts.next().unwrap_or("").to_owned();
-            // `hoodie.properties` takes precedence TODO handle conflicts 
where applicable
-            hudi_options.insert(key, value);
-        }
+
+        Self::imbue_table_properties(&mut hudi_options, 
storage.clone()).await?;
+
+        Self::imbue_global_hudi_configs(&mut hudi_options, 
storage.clone()).await?;
+
         let hudi_configs = HudiConfigs::new(hudi_options);
 
         Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, 
extra_options))
     }
 
     fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
-        let prefixes = ["AWS_", "AZURE_", "GOOGLE_"];
-        options.extend(
-            env::vars()
-                .filter(|(key, _)| prefixes.iter().any(|prefix| 
key.starts_with(prefix)))
-                .map(|(k, v)| (k.to_ascii_lowercase(), v)),
-        );
+        const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"];
+
+        for (key, value) in env::vars() {
+            if PREFIXES.iter().any(|prefix| key.starts_with(prefix))
+                && !options.contains_key(&key.to_ascii_lowercase())
+            {
+                options.insert(key.to_ascii_lowercase(), value);
+            }
+        }
+    }
+
+    async fn imbue_table_properties(
+        options: &mut HashMap<String, String>,
+        storage: Arc<Storage>,
+    ) -> Result<()> {
+        let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
+        let table_properties = parse_config_data(&bytes, "=").await?;
+
+        // TODO: handle the case where the same key is present in both table 
properties and options
+        for (k, v) in table_properties {
+            options.insert(k.to_string(), v.to_string());
+        }
+
+        Ok(())
+    }
+
+    async fn imbue_global_hudi_configs(
+        options: &mut HashMap<String, String>,
+        storage: Arc<Storage>,
+    ) -> Result<()> {
+        let global_config_path = env::var(HUDI_CONF_DIR)
+            .map(PathBuf::from)
+            .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
+            .join("hudi-defaults.conf");
+
+        if let Ok(bytes) = storage
+            
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
+            .await
+        {
+            if let Ok(global_configs) = parse_config_data(&bytes, " 
\t=").await {
+                for (key, value) in global_configs {
+                    if key.starts_with("hoodie.") && 
!options.contains_key(&key) {
+                        options.insert(key.to_string(), value.to_string());
+                    }
+                }
+            }
+        }
+
+        Ok(())
     }
 
     fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {
@@ -278,8 +312,8 @@ impl Table {
 mod tests {
     use std::collections::HashSet;
     use std::fs::canonicalize;
-    use std::panic;
     use std::path::Path;
+    use std::{env, panic};
 
     use url::Url;
 
@@ -292,6 +326,7 @@ mod tests {
         PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
         TimelineLayoutVersion,
     };
+    use crate::config::HUDI_CONF_DIR;
     use crate::storage::utils::join_url_segments;
     use crate::table::Table;
 
@@ -599,4 +634,63 @@ mod tests {
         assert_eq!(configs.get(TableVersion).unwrap().to::<isize>(), 6);
         assert_eq!(configs.get(TimelineLayoutVersion).unwrap().to::<isize>(), 
1);
     }
+
+    #[tokio::test]
+    async fn get_global_table_props() {
+        // Without the environment variable HUDI_CONF_DIR
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap())
+                .unwrap();
+        let table = Table::new_with_options(
+            base_url.as_str(),
+            [("hoodie.internal.skip.config.validation", "true")],
+        )
+        .await
+        .unwrap();
+        let configs = table.configs;
+        assert!(configs.get(DatabaseName).is_err());
+        assert!(configs.get(TableType).is_err());
+        assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+
+        // Environment variable HUDI_CONF_DIR points to nothing
+        let base_path = env::current_dir().unwrap();
+        let hudi_conf_dir = base_path.join("random/wrong/dir");
+        env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap())
+                .unwrap();
+        let table = Table::new_with_options(
+            base_url.as_str(),
+            [("hoodie.internal.skip.config.validation", "true")],
+        )
+        .await
+        .unwrap();
+        let configs = table.configs;
+        assert!(configs.get(DatabaseName).is_err());
+        assert!(configs.get(TableType).is_err());
+        assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+
+        // With global config
+        let base_path = env::current_dir().unwrap();
+        let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
+        env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
+
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap())
+                .unwrap();
+        let table = Table::new_with_options(
+            base_url.as_str(),
+            [("hoodie.internal.skip.config.validation", "true")],
+        )
+        .await
+        .unwrap();
+        let configs = table.configs;
+        assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
+        assert_eq!(
+            configs.get(TableType).unwrap().to::<String>(),
+            "MERGE_ON_READ"
+        );
+        assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+        env::remove_var(HUDI_CONF_DIR)
+    }
 }
diff --git a/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf 
b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf
new file mode 100644
index 0000000..d264bc0
--- /dev/null
+++ b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default system properties included when running Hudi jobs.
+# This is useful for setting default environmental settings.
+
+hoodie.database.name        tmpdb
+hoodie.table.type= mor
\ No newline at end of file
diff --git 
a/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties 
b/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties
new file mode 100644
index 0000000..3ceea57
--- /dev/null
+++ b/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.metadata.partitions=files
+hoodie.table.precombine.field=ts
+hoodie.table.partition.fields=city
+hoodie.archivelog.folder=archived
+hoodie.table.cdc.enabled=false
+hoodie.timeline.layout.version=1
+hoodie.table.checksum=3761586722
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.partition.metafile.use.base.format=false
+hoodie.datasource.write.hive_style_partitioning=false
+hoodie.table.metadata.partitions.inflight=
+hoodie.populate.meta.fields=true
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
+hoodie.table.base.file.format=PARQUET
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.version=6

Reply via email to