This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ef9557  refactor: improve APIs for handling options (#161)
2ef9557 is described below

commit 2ef95573c7ce6ec2937f5a3724b1bb4c426533be
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Oct 11 21:39:21 2024 -1000

    refactor: improve APIs for handling options (#161)
    
    - Refactor Config APIs to handle options better
    - Refactor Table and Storage APIs to improve option passing
    - Add a new table config `hoodie.base.path` to carry `base_uri` info
    - Improve variable names
      - When we say configs, it should refer to `HudiConfigs`
      - Let's call string configs in key value pairs "options", it can be 
storage options, hudi options, etc
---
 crates/core/src/config/mod.rs      | 114 ++++++++++++++++++++--
 crates/core/src/config/table.rs    |   7 +-
 crates/core/src/config/utils.rs    | 192 +++++++++++++++++++++++++++++++++++++
 crates/core/src/file_group/mod.rs  |   8 +-
 crates/core/src/storage/mod.rs     | 102 ++++++++++++++------
 crates/core/src/storage/utils.rs   |  33 +------
 crates/core/src/table/fs_view.rs   |  61 ++++++------
 crates/core/src/table/mod.rs       |  98 ++++++++++---------
 crates/core/src/table/partition.rs |  14 +--
 crates/core/src/table/timeline.rs  |  44 ++++-----
 crates/datafusion/src/lib.rs       |  13 ++-
 python/hudi/_internal.pyi          |   2 +-
 python/src/internal.rs             |  39 ++++----
 13 files changed, 512 insertions(+), 215 deletions(-)

diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 2893c19..580be44 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -21,11 +21,14 @@ use std::any::type_name;
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use crate::storage::utils::parse_uri;
 use anyhow::Result;
+use url::Url;
 
 pub mod internal;
 pub mod read;
 pub mod table;
+pub mod utils;
 
 pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
 
@@ -92,6 +95,17 @@ impl HudiConfigValue {
     pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T 
{
         T::from(self)
     }
+
+    pub fn to_url(self) -> Result<Url> {
+        match self {
+            HudiConfigValue::String(v) => parse_uri(&v),
+            _ => panic!(
+                "Cannot cast {:?} to {}",
+                type_name::<Self>(),
+                type_name::<Url>()
+            ),
+        }
+    }
 }
 
 impl From<HudiConfigValue> for bool {
@@ -145,33 +159,61 @@ impl From<HudiConfigValue> for Vec<String> {
 /// Hudi configuration container.
 #[derive(Clone, Debug)]
 pub struct HudiConfigs {
-    pub raw_configs: Arc<HashMap<String, String>>,
+    raw_options: Arc<HashMap<String, String>>,
+}
+
+impl From<HashMap<String, String>> for HudiConfigs {
+    fn from(options: HashMap<String, String>) -> Self {
+        Self {
+            raw_options: Arc::new(options),
+        }
+    }
 }
 
 impl HudiConfigs {
-    /// Create [HudiConfigs] with key-value pairs of [String]s.
-    pub fn new(raw_configs: HashMap<String, String>) -> Self {
+    /// Create [HudiConfigs] using opitons in the form of key-value pairs.
+    pub fn new<I, K, V>(options: I) -> Self
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: AsRef<str>,
+    {
+        let raw_options = options
+            .into_iter()
+            .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
+            .collect();
         Self {
-            raw_configs: Arc::new(raw_configs),
+            raw_options: Arc::new(raw_options),
         }
     }
 
-    /// Create empty [HudiConfigs].
+    /// Create an empty [HudiConfigs].
     pub fn empty() -> Self {
         Self {
-            raw_configs: Arc::new(HashMap::new()),
+            raw_options: Arc::new(HashMap::new()),
         }
     }
 
+    /// Create a deep-copy of the configs as [String] options in the form of 
key-value pairs.
+    pub fn as_options(&self) -> HashMap<String, String> {
+        self.raw_options.as_ref().clone()
+    }
+
     pub fn validate(&self, parser: impl ConfigParser<Output = 
HudiConfigValue>) -> Result<()> {
-        parser.validate(&self.raw_configs)
+        parser.validate(&self.raw_options)
+    }
+
+    pub fn contains(&self, key: impl AsRef<str>) -> bool {
+        self.raw_options.contains_key(key.as_ref())
     }
 
+    /// Get value for the given config. Return [Result] with the value.
+    /// If the config is not found or value was not parsed properly, return 
[Err].
     pub fn get(
         &self,
         parser: impl ConfigParser<Output = HudiConfigValue>,
     ) -> Result<HudiConfigValue> {
-        parser.parse_value(&self.raw_configs)
+        parser.parse_value(&self.raw_options)
     }
 
     /// Get value or default value. If the config has no default value, this 
will panic.
@@ -179,7 +221,7 @@ impl HudiConfigs {
         &self,
         parser: impl ConfigParser<Output = HudiConfigValue>,
     ) -> HudiConfigValue {
-        parser.parse_value_or_default(&self.raw_configs)
+        parser.parse_value_or_default(&self.raw_options)
     }
 
     /// Get value or default value. If the config has no default value, this 
will return [None].
@@ -187,10 +229,62 @@ impl HudiConfigs {
         &self,
         parser: impl ConfigParser<Output = HudiConfigValue>,
     ) -> Option<HudiConfigValue> {
-        let res = parser.parse_value(&self.raw_configs);
+        let res = parser.parse_value(&self.raw_options);
         match res {
             Ok(v) => Some(v),
             Err(_) => parser.default_value(),
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_from_hashmap() {
+        let mut options = HashMap::new();
+        options.insert("key1".to_string(), "value1".to_string());
+        options.insert("key2".to_string(), "value2".to_string());
+
+        let config = HudiConfigs::from(options.clone());
+
+        assert_eq!(*config.raw_options, options);
+    }
+
+    #[test]
+    fn test_new() {
+        let options = vec![("key1", "value1"), ("key2", "value2")];
+
+        let config = HudiConfigs::new(options);
+
+        let expected: HashMap<String, String> = vec![
+            ("key1".to_string(), "value1".to_string()),
+            ("key2".to_string(), "value2".to_string()),
+        ]
+        .into_iter()
+        .collect();
+
+        assert_eq!(*config.raw_options, expected);
+    }
+
+    #[test]
+    fn test_empty() {
+        let config = HudiConfigs::empty();
+
+        assert!(config.raw_options.is_empty());
+    }
+
+    #[test]
+    fn test_as_options() {
+        let mut options = HashMap::new();
+        options.insert("key1".to_string(), "value1".to_string());
+        options.insert("key2".to_string(), "value2".to_string());
+
+        let config = HudiConfigs::from(options.clone());
+
+        let result = config.as_options();
+
+        assert_eq!(result, options);
+    }
+}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 8a5d8d6..107e382 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -27,7 +27,7 @@ use strum_macros::{AsRefStr, EnumIter};
 
 use crate::config::{ConfigParser, HudiConfigValue};
 
-/// Configurations for Hudi tables, persisted in `hoodie.properties`.
+/// Configurations for Hudi tables, most of them are persisted in 
`hoodie.properties`.
 ///
 /// **Example**
 ///
@@ -47,6 +47,9 @@ pub enum HudiTableConfig {
     /// Currently only parquet is supported.
     BaseFileFormat,
 
+    /// Base path to the table.
+    BasePath,
+
     /// Table checksum is used to guard against partial writes in HDFS.
     /// It is added as the last entry in hoodie.properties and then used to 
validate while reading table config.
     Checksum,
@@ -105,6 +108,7 @@ impl AsRef<str> for HudiTableConfig {
     fn as_ref(&self) -> &str {
         match self {
             Self::BaseFileFormat => "hoodie.table.base.file.format",
+            Self::BasePath => "hoodie.base.path",
             Self::Checksum => "hoodie.table.checksum",
             Self::DatabaseName => "hoodie.database.name",
             Self::DropsPartitionFields => 
"hoodie.datasource.write.drop.partition.columns",
@@ -150,6 +154,7 @@ impl ConfigParser for HudiTableConfig {
             Self::BaseFileFormat => get_result
                 .and_then(BaseFileFormatValue::from_str)
                 .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+            Self::BasePath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::Checksum => get_result
                 .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
                 .map(HudiConfigValue::Integer),
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs
new file mode 100644
index 0000000..5fd84cd
--- /dev/null
+++ b/crates/core/src/config/utils.rs
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use anyhow::{Context, Result};
+use bytes::Bytes;
+use std::collections::HashMap;
+use std::io::{BufRead, BufReader, Cursor};
+
+pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
+    std::iter::empty::<(&str, &str)>()
+}
+
+pub fn split_hudi_options_from_others<I, K, V>(
+    all_options: I,
+) -> (HashMap<String, String>, HashMap<String, String>)
+where
+    I: IntoIterator<Item = (K, V)>,
+    K: AsRef<str>,
+    V: Into<String>,
+{
+    let mut hudi_options = HashMap::new();
+    let mut others = HashMap::new();
+    for (k, v) in all_options {
+        if k.as_ref().starts_with("hoodie.") {
+            hudi_options.insert(k.as_ref().to_string(), v.into());
+        } else {
+            others.insert(k.as_ref().to_string(), v.into());
+        }
+    }
+
+    (hudi_options, others)
+}
+
+pub fn parse_data_for_options(data: &Bytes, split_chars: &str) -> 
Result<HashMap<String, String>> {
+    let cursor = Cursor::new(data);
+    let lines = BufReader::new(cursor).lines();
+    let mut options = HashMap::new();
+
+    for line in lines {
+        let line = line.context("Failed to read line")?;
+        let trimmed_line = line.trim();
+        if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
+            continue;
+        }
+        let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
+        let key = parts
+            .next()
+            .context("Missing key in config line")?
+            .trim()
+            .to_owned();
+        let value = parts.next().unwrap_or("").trim().to_owned();
+        options.insert(key, value);
+    }
+
+    Ok(options)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_empty_options() {
+        let empty = empty_options();
+        assert_eq!(empty.count(), 0);
+    }
+
+    #[test]
+    fn test_split_hudi_options_from_others_empty() {
+        let (hudi, others) = split_hudi_options_from_others(Vec::<(&str, 
&str)>::new());
+        assert!(hudi.is_empty());
+        assert!(others.is_empty());
+    }
+
+    #[test]
+    fn test_split_hudi_options_from_others_mixed() {
+        let options = vec![
+            ("hoodie.option1", "value1"),
+            ("option2", "value2"),
+            ("hoodie.option3", "value3"),
+            ("option4", "value4"),
+        ];
+        let (hudi, others) = split_hudi_options_from_others(options);
+        assert_eq!(hudi.len(), 2);
+        assert_eq!(hudi["hoodie.option1"], "value1");
+        assert_eq!(hudi["hoodie.option3"], "value3");
+        assert_eq!(others.len(), 2);
+        assert_eq!(others["option2"], "value2");
+        assert_eq!(others["option4"], "value4");
+    }
+
+    #[test]
+    fn test_parse_data_comprehensive() {
+        let data = Bytes::from(
+            "key1=value1\n\
+             key2:value2\n\
+             key3 value3\n\
+             key4\tvalue4\n\
+             key5=value with spaces\n\
+             key6: another spaced value \n\
+             key7\tvalue with tab\n\
+             key8=value with = in it\n\
+             key9:value:with:colons\n\
+             empty_key=\n\
+             #comment line\n\
+             \n\
+             key10==double equals",
+        );
+
+        let result = parse_data_for_options(&data, "=: \t").unwrap();
+
+        let mut expected = HashMap::new();
+        expected.insert("key1".to_string(), "value1".to_string());
+        expected.insert("key2".to_string(), "value2".to_string());
+        expected.insert("key3".to_string(), "value3".to_string());
+        expected.insert("key4".to_string(), "value4".to_string());
+        expected.insert("key5".to_string(), "value with spaces".to_string());
+        expected.insert("key6".to_string(), "another spaced 
value".to_string());
+        expected.insert("key7".to_string(), "value with tab".to_string());
+        expected.insert("key8".to_string(), "value with = in it".to_string());
+        expected.insert("key9".to_string(), "value:with:colons".to_string());
+        expected.insert("empty_key".to_string(), "".to_string());
+        expected.insert("key10".to_string(), "=double equals".to_string());
+
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn test_parse_data_with_comments_and_empty_lines() {
+        let data = Bytes::from("key1=value1\n# This is a 
comment\n\nkey2:value2");
+        let result = parse_data_for_options(&data, "=:").unwrap();
+        let mut expected = HashMap::new();
+        expected.insert("key1".to_string(), "value1".to_string());
+        expected.insert("key2".to_string(), "value2".to_string());
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn test_parse_data_with_missing_value() {
+        let data = Bytes::from("key1=value1\nkey2=\nkey3:value3");
+        let result = parse_data_for_options(&data, "=:").unwrap();
+        let mut expected = HashMap::new();
+        expected.insert("key1".to_string(), "value1".to_string());
+        expected.insert("key2".to_string(), "".to_string());
+        expected.insert("key3".to_string(), "value3".to_string());
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn test_parse_data_with_trailing_spaces() {
+        let data = Bytes::from("key1 = value1  \n  key2:  value2  ");
+        let result = parse_data_for_options(&data, "=:").unwrap();
+        let mut expected = HashMap::new();
+        expected.insert("key1".to_string(), "value1".to_string());
+        expected.insert("key2".to_string(), "value2".to_string());
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn test_parse_data_empty_input() {
+        let data = Bytes::from("");
+        let result = parse_data_for_options(&data, "=:").unwrap();
+        assert!(result.is_empty());
+    }
+
+    #[test]
+    fn test_parse_data_invalid_line() {
+        let data = Bytes::from("key1=value1\ninvalid_line\nkey2=value2");
+        let result = parse_data_for_options(&data, "=").unwrap();
+        let mut expected = HashMap::new();
+        expected.insert("key1".to_string(), "value1".to_string());
+        expected.insert("invalid_line".to_string(), "".to_string());
+        expected.insert("key2".to_string(), "value2".to_string());
+        assert_eq!(result, expected);
+    }
+}
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 7cd1f47..4605052 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -204,20 +204,20 @@ impl FileGroup {
 
     pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
         let as_of = timestamp.to_string();
-        return if let Some((_, file_slice)) = 
self.file_slices.range(..=as_of).next_back() {
+        if let Some((_, file_slice)) = 
self.file_slices.range(..=as_of).next_back() {
             Some(file_slice)
         } else {
             None
-        };
+        }
     }
 
     pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut 
FileSlice> {
         let as_of = timestamp.to_string();
-        return if let Some((_, file_slice)) = 
self.file_slices.range_mut(..=as_of).next_back() {
+        if let Some((_, file_slice)) = 
self.file_slices.range_mut(..=as_of).next_back() {
             Some(file_slice)
         } else {
             None
-        };
+        }
     }
 }
 
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 9985d0c..45c0699 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -22,6 +22,10 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use crate::storage::file_info::FileInfo;
+use crate::storage::utils::join_url_segments;
 use anyhow::{anyhow, Context, Result};
 use arrow::compute::concat_batches;
 use arrow::record_batch::RecordBatch;
@@ -35,30 +39,57 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
 use parquet::file::metadata::ParquetMetaData;
 use url::Url;
 
-use crate::storage::file_info::FileInfo;
-use crate::storage::utils::join_url_segments;
-
 pub mod file_info;
 pub mod file_stats;
 pub mod utils;
 
+#[allow(dead_code)]
 #[derive(Clone, Debug)]
 pub struct Storage {
-    base_url: Arc<Url>,
-    object_store: Arc<dyn ObjectStore>,
+    pub(crate) base_url: Arc<Url>,
+    pub(crate) object_store: Arc<dyn ObjectStore>,
+    pub(crate) options: Arc<HashMap<String, String>>,
+    pub(crate) hudi_configs: Arc<HudiConfigs>,
 }
 
 impl Storage {
-    pub fn new(base_url: Arc<Url>, options: &HashMap<String, String>) -> 
Result<Arc<Storage>> {
-        match parse_url_opts(&base_url, options) {
+    pub fn new(
+        options: Arc<HashMap<String, String>>,
+        hudi_configs: Arc<HudiConfigs>,
+    ) -> Result<Arc<Storage>> {
+        if !hudi_configs.contains(HudiTableConfig::BasePath) {
+            return Err(anyhow!(
+                "Failed to create storage: {} is required.",
+                HudiTableConfig::BasePath.as_ref()
+            ));
+        }
+
+        let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?;
+
+        match parse_url_opts(&base_url, options.as_ref()) {
             Ok((object_store, _)) => Ok(Arc::new(Storage {
-                base_url,
+                base_url: Arc::new(base_url),
                 object_store: Arc::new(object_store),
+                options,
+                hudi_configs,
             })),
             Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
         }
     }
 
+    #[cfg(test)]
+    pub fn new_with_base_url(base_url: Url) -> Result<Arc<Storage>> {
+        let mut hudi_options = HashMap::new();
+        hudi_options.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            base_url.as_str().to_string(),
+        );
+        Self::new(
+            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::new(hudi_options)),
+        )
+    }
+
     #[cfg(feature = "datafusion")]
     pub fn register_object_store(
         &self,
@@ -220,17 +251,32 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: 
Option<&str>) -> Result<Ve
 
 #[cfg(test)]
 mod tests {
-    use std::collections::{HashMap, HashSet};
+    use super::*;
+    use std::collections::HashSet;
     use std::fs::canonicalize;
     use std::path::Path;
-    use std::sync::Arc;
-
-    use object_store::path::Path as ObjPath;
-    use url::Url;
 
     use crate::storage::file_info::FileInfo;
     use crate::storage::utils::join_url_segments;
     use crate::storage::{get_leaf_dirs, Storage};
+    use object_store::path::Path as ObjPath;
+    use url::Url;
+
+    #[test]
+    fn test_storage_new_error_no_base_path() {
+        let options = Arc::new(HashMap::new());
+        let hudi_configs = Arc::new(HudiConfigs::empty());
+        let result = Storage::new(options, hudi_configs);
+
+        assert!(
+            result.is_err(),
+            "Should return error when no base path is provided."
+        );
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Failed to create storage"));
+    }
 
     #[tokio::test]
     async fn storage_list_dirs() {
@@ -238,7 +284,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let first_level_dirs: HashSet<String> =
             storage.list_dirs(None).await.unwrap().into_iter().collect();
         assert_eq!(
@@ -260,7 +306,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let first_level_dirs: HashSet<ObjPath> = storage
             .list_dirs_as_obj_paths(None)
             .await
@@ -283,7 +329,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let file_info_1: Vec<FileInfo> = storage
             .list_files(None)
             .await
@@ -293,7 +339,9 @@ mod tests {
         assert_eq!(
             file_info_1,
             vec![FileInfo {
-                uri: storage.base_url.join("a.parquet").unwrap().to_string(),
+                uri: join_url_segments(&storage.base_url, &["a.parquet"])
+                    .unwrap()
+                    .to_string(),
                 name: "a.parquet".to_string(),
                 size: 0,
             }]
@@ -307,9 +355,7 @@ mod tests {
         assert_eq!(
             file_info_2,
             vec![FileInfo {
-                uri: storage
-                    .base_url
-                    .join("part1/b.parquet")
+                uri: join_url_segments(&storage.base_url, &["part1/b.parquet"])
                     .unwrap()
                     .to_string(),
                 name: "b.parquet".to_string(),
@@ -325,9 +371,7 @@ mod tests {
         assert_eq!(
             file_info_3,
             vec![FileInfo {
-                uri: storage
-                    .base_url
-                    .join("part2/part22/c.parquet")
+                uri: join_url_segments(&storage.base_url, 
&["part2/part22/c.parquet"])
                     .unwrap()
                     .to_string(),
                 name: "c.parquet".to_string(),
@@ -342,7 +386,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
         assert_eq!(
             leaf_dirs,
@@ -355,7 +399,7 @@ mod tests {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data/leaf_dir")).unwrap())
                 .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
         assert_eq!(
             leaf_dirs,
@@ -368,12 +412,14 @@ mod tests {
     async fn storage_get_file_info() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let file_info = storage.get_file_info("a.parquet").await.unwrap();
         assert_eq!(file_info.name, "a.parquet");
         assert_eq!(
             file_info.uri,
-            storage.base_url.join("a.parquet").unwrap().as_ref()
+            join_url_segments(&storage.base_url, &["a.parquet"])
+                .unwrap()
+                .to_string()
         );
         assert_eq!(file_info.size, 866);
     }
@@ -382,7 +428,7 @@ mod tests {
     async fn storage_get_parquet_file_data() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let file_data = 
storage.get_parquet_file_data("a.parquet").await.unwrap();
         assert_eq!(file_data.num_rows(), 5);
     }
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index 80c86c6..ba670f2 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -16,13 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use std::collections::HashMap;
-use std::io::{BufRead, BufReader, Cursor};
 use std::path::{Path, PathBuf};
 use std::str::FromStr;
 
-use anyhow::{anyhow, Context, Result};
-use bytes::Bytes;
+use anyhow::{anyhow, Result};
 use url::{ParseError, Url};
 
 pub fn split_filename(filename: &str) -> Result<(String, String)> {
@@ -78,34 +75,6 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) 
-> Result<Url> {
     Ok(url)
 }
 
-pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
-    std::iter::empty::<(&str, &str)>()
-}
-
-pub async fn parse_config_data(data: &Bytes, split_chars: &str) -> 
Result<HashMap<String, String>> {
-    let cursor = Cursor::new(data);
-    let lines = BufReader::new(cursor).lines();
-    let mut configs = HashMap::new();
-
-    for line in lines {
-        let line = line.context("Failed to read line")?;
-        let trimmed_line = line.trim();
-        if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
-            continue;
-        }
-        let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
-        let key = parts
-            .next()
-            .context("Missing key in config line")?
-            .trim()
-            .to_owned();
-        let value = parts.next().unwrap_or("").trim().to_owned();
-        configs.insert(key, value);
-    }
-
-    Ok(configs)
-}
-
 #[cfg(test)]
 mod tests {
     use std::str::FromStr;
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 4172f12..0cc6ad1 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -29,26 +29,24 @@ use anyhow::Result;
 use arrow::record_batch::RecordBatch;
 use dashmap::DashMap;
 use futures::stream::{self, StreamExt, TryStreamExt};
-use url::Url;
 
 #[derive(Clone, Debug)]
 #[allow(dead_code)]
 pub struct FileSystemView {
-    configs: Arc<HudiConfigs>,
+    hudi_configs: Arc<HudiConfigs>,
     pub(crate) storage: Arc<Storage>,
     partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
 }
 
 impl FileSystemView {
     pub async fn new(
-        base_url: Arc<Url>,
+        hudi_configs: Arc<HudiConfigs>,
         storage_options: Arc<HashMap<String, String>>,
-        configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
-        let storage = Storage::new(base_url, &storage_options)?;
+        let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
         let partition_to_file_groups = Arc::new(DashMap::new());
         Ok(FileSystemView {
-            configs,
+            hudi_configs,
             storage,
             partition_to_file_groups,
         })
@@ -189,20 +187,30 @@ impl FileSystemView {
 
 #[cfg(test)]
 mod tests {
-    use hudi_tests::TestTable;
-    use std::collections::{HashMap, HashSet};
-    use std::sync::Arc;
-
+    use crate::config::table::HudiTableConfig;
     use crate::config::HudiConfigs;
     use crate::storage::Storage;
     use crate::table::fs_view::FileSystemView;
     use crate::table::partition::PartitionPruner;
     use crate::table::Table;
+    use hudi_tests::TestTable;
+    use std::collections::{HashMap, HashSet};
+    use std::sync::Arc;
+    use url::Url;
+
+    async fn create_test_fs_view(base_url: Url) -> FileSystemView {
+        FileSystemView::new(
+            Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, 
base_url)])),
+            Arc::new(HashMap::new()),
+        )
+        .await
+        .unwrap()
+    }
 
     #[tokio::test]
     async fn get_partition_paths_for_nonpartitioned_table() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let partition_pruner = PartitionPruner::empty();
         let partition_paths = FileSystemView::load_partition_paths(&storage, 
&partition_pruner)
             .await
@@ -215,7 +223,7 @@ mod tests {
     #[tokio::test]
     async fn get_partition_paths_for_complexkeygen_table() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
         let partition_pruner = PartitionPruner::empty();
         let partition_paths = FileSystemView::load_partition_paths(&storage, 
&partition_pruner)
             .await
@@ -235,13 +243,7 @@ mod tests {
     #[tokio::test]
     async fn fs_view_get_latest_file_slices() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let fs_view = FileSystemView::new(
-            Arc::new(base_url),
-            Arc::new(HashMap::new()),
-            Arc::new(HudiConfigs::empty()),
-        )
-        .await
-        .unwrap();
+        let fs_view = create_test_fs_view(base_url).await;
 
         assert!(fs_view.partition_to_file_groups.is_empty());
         let partition_pruner = PartitionPruner::empty();
@@ -266,13 +268,7 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_replace_commit() {
         let base_url = 
TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let fs_view = FileSystemView::new(
-            Arc::new(base_url),
-            Arc::new(HashMap::new()),
-            Arc::new(HudiConfigs::empty()),
-        )
-        .await
-        .unwrap();
+        let fs_view = create_test_fs_view(base_url).await;
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
         let partition_pruner = PartitionPruner::empty();
@@ -301,14 +297,10 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_partition_filters() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let fs_view = FileSystemView::new(
-            Arc::new(base_url),
-            Arc::new(HashMap::new()),
-            Arc::new(HudiConfigs::empty()),
-        )
-        .await
-        .unwrap();
+        let fs_view = create_test_fs_view(base_url).await;
+
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
+
         let excludes = &hudi_table
             .timeline
             .get_replaced_file_groups()
@@ -318,7 +310,7 @@ mod tests {
         let partition_pruner = PartitionPruner::new(
             &["byteField < 20", "shortField = 300"],
             &partition_schema,
-            hudi_table.configs.as_ref(),
+            hudi_table.hudi_configs.as_ref(),
         )
         .unwrap();
         let file_slices = fs_view
@@ -327,6 +319,7 @@ mod tests {
             .unwrap();
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
         assert_eq!(file_slices.len(), 1);
+
         let fg_ids = file_slices
             .iter()
             .map(|fsl| fsl.file_group_id())
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 931f2c2..9ca5817 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -39,7 +39,7 @@
 //!
 //! pub async fn test() {
 //!     use arrow_schema::Schema;
-//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
+//!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
 //!     let hudi_table = Table::new(base_uri.path()).await.unwrap();
 //!     let schema = hudi_table.get_schema().await.unwrap();
 //! }
@@ -95,7 +95,6 @@ use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use strum::IntoEnumIterator;
 use url::Url;
-
 use HudiInternalConfig::SkipConfigValidation;
 use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
 use TableTypeValue::CopyOnWrite;
@@ -105,10 +104,11 @@ use crate::config::read::HudiReadConfig;
 use crate::config::read::HudiReadConfig::AsOfTimestamp;
 use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
+use crate::config::utils::parse_data_for_options;
+use crate::config::utils::{empty_options, split_hudi_options_from_others};
 use crate::config::HudiConfigs;
 use crate::config::HUDI_CONF_DIR;
 use crate::file_group::FileSlice;
-use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
 use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
@@ -121,9 +121,8 @@ mod timeline;
 /// Hudi Table in-memory
 #[derive(Clone, Debug)]
 pub struct Table {
-    pub base_url: Arc<Url>,
-    pub configs: Arc<HudiConfigs>,
-    pub extra_options: Arc<HashMap<String, String>>,
+    pub hudi_configs: Arc<HudiConfigs>,
+    pub storage_options: Arc<HashMap<String, String>>,
     pub timeline: Timeline,
     pub file_system_view: FileSystemView,
 }
@@ -135,38 +134,38 @@ impl Table {
     }
 
     /// Create hudi table with options
-    pub async fn new_with_options<I, K, V>(base_uri: &str, all_options: I) -> 
Result<Self>
+    pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
     where
         I: IntoIterator<Item = (K, V)>,
         K: AsRef<str>,
         V: Into<String>,
     {
-        let base_url = Arc::new(parse_uri(base_uri)?);
-
-        let (configs, extra_options) = Self::load_configs(base_url.clone(), 
all_options)
+        let (hudi_configs, storage_options) = Self::load_configs(base_uri, 
options)
             .await
             .context("Failed to load table properties")?;
-        let configs = Arc::new(configs);
-        let extra_options = Arc::new(extra_options);
+        let hudi_configs = Arc::new(hudi_configs);
+        let storage_options = Arc::new(storage_options);
 
-        let timeline = Timeline::new(base_url.clone(), extra_options.clone(), 
configs.clone())
+        let timeline = Timeline::new(hudi_configs.clone(), 
storage_options.clone())
             .await
             .context("Failed to load timeline")?;
 
-        let file_system_view =
-            FileSystemView::new(base_url.clone(), extra_options.clone(), 
configs.clone())
-                .await
-                .context("Failed to load file system view")?;
+        let file_system_view = FileSystemView::new(hudi_configs.clone(), 
storage_options.clone())
+            .await
+            .context("Failed to load file system view")?;
 
         Ok(Table {
-            base_url,
-            configs,
-            extra_options,
+            hudi_configs,
+            storage_options,
             timeline,
             file_system_view,
         })
     }
 
+    pub fn base_url(&self) -> Result<Url> {
+        self.hudi_configs.get(HudiTableConfig::BasePath)?.to_url()
+    }
+
     #[cfg(feature = "datafusion")]
     pub fn register_storage(
         &self,
@@ -181,7 +180,7 @@ impl Table {
     }
 
     async fn load_configs<I, K, V>(
-        base_url: Arc<Url>,
+        base_uri: &str,
         all_options: I,
     ) -> Result<(HudiConfigs, HashMap<String, String>)>
     where
@@ -190,27 +189,32 @@ impl Table {
         V: Into<String>,
     {
         let mut hudi_options = HashMap::new();
-        let mut extra_options = HashMap::new();
+        let mut other_options = HashMap::new();
 
-        Self::imbue_cloud_env_vars(&mut extra_options);
+        Self::imbue_cloud_env_vars(&mut other_options);
 
-        for (k, v) in all_options {
-            if k.as_ref().starts_with("hoodie.") {
-                hudi_options.insert(k.as_ref().to_string(), v.into());
-            } else {
-                extra_options.insert(k.as_ref().to_string(), v.into());
-            }
-        }
+        let (hudi_opts, others) = split_hudi_options_from_others(all_options);
+        hudi_options.extend(hudi_opts);
+        other_options.extend(others);
+
+        hudi_options.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            base_uri.to_string(),
+        );
 
-        let storage = Storage::new(base_url, &extra_options)?;
+        // create a [Storage] instance to load properties from storage layer.
+        let storage = Storage::new(
+            Arc::new(other_options.clone()),
+            Arc::new(HudiConfigs::new(&hudi_options)),
+        )?;
 
         Self::imbue_table_properties(&mut hudi_options, 
storage.clone()).await?;
 
-        Self::imbue_global_hudi_configs(&mut hudi_options, 
storage.clone()).await?;
+        Self::imbue_global_hudi_configs_if_not_present(&mut hudi_options, 
storage.clone()).await?;
 
         let hudi_configs = HudiConfigs::new(hudi_options);
 
-        Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, 
extra_options))
+        Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, 
other_options))
     }
 
     fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
@@ -230,7 +234,7 @@ impl Table {
         storage: Arc<Storage>,
     ) -> Result<()> {
         let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
-        let table_properties = parse_config_data(&bytes, "=").await?;
+        let table_properties = parse_data_for_options(&bytes, "=")?;
 
         // TODO: handle the case where the same key is present in both table 
properties and options
         for (k, v) in table_properties {
@@ -240,7 +244,7 @@ impl Table {
         Ok(())
     }
 
-    async fn imbue_global_hudi_configs(
+    async fn imbue_global_hudi_configs_if_not_present(
         options: &mut HashMap<String, String>,
         storage: Arc<Storage>,
     ) -> Result<()> {
@@ -253,7 +257,7 @@ impl Table {
             
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
             .await
         {
-            if let Ok(global_configs) = parse_config_data(&bytes, " 
\t=").await {
+            if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") 
{
                 for (key, value) in global_configs {
                     if key.starts_with("hoodie.") && 
!options.contains_key(&key) {
                         options.insert(key.to_string(), value.to_string());
@@ -313,7 +317,7 @@ impl Table {
     /// Get the latest partition [Schema] of the table
     pub async fn get_partition_schema(&self) -> Result<Schema> {
         let partition_fields: HashSet<String> = self
-            .configs
+            .hudi_configs
             .get_or_default(PartitionFields)
             .to::<Vec<String>>()
             .into_iter()
@@ -350,7 +354,7 @@ impl Table {
     ///
     /// If the [AsOfTimestamp] configuration is set, the file slices at the 
specified timestamp will be returned.
     pub async fn get_file_slices(&self, filters: &[&str]) -> 
Result<Vec<FileSlice>> {
-        if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+        if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
             self.get_file_slices_as_of(timestamp.to::<String>().as_str(), 
filters)
                 .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
@@ -369,7 +373,7 @@ impl Table {
         let excludes = self.timeline.get_replaced_file_groups().await?;
         let partition_schema = self.get_partition_schema().await?;
         let partition_pruner =
-            PartitionPruner::new(filters, &partition_schema, 
self.configs.as_ref())?;
+            PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
         self.file_system_view
             .get_file_slices_as_of(timestamp, &partition_pruner, &excludes)
             .await
@@ -379,7 +383,7 @@ impl Table {
     ///
     /// If the [AsOfTimestamp] configuration is set, the records at the 
specified timestamp will be returned.
     pub async fn read_snapshot(&self, filters: &[&str]) -> 
Result<Vec<RecordBatch>> {
-        if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+        if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
             self.read_snapshot_as_of(timestamp.to::<String>().as_str(), 
filters)
                 .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
@@ -555,7 +559,7 @@ mod tests {
     #[tokio::test]
     async fn validate_invalid_table_props() {
         let table = 
get_test_table_without_validation("table_props_invalid").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert!(
             configs.validate(BaseFileFormat).is_err(),
             "required config is missing"
@@ -603,7 +607,7 @@ mod tests {
     #[tokio::test]
     async fn get_invalid_table_props() {
         let table = 
get_test_table_without_validation("table_props_invalid").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert!(configs.get(BaseFileFormat).is_err());
         assert!(configs.get(Checksum).is_err());
         assert!(configs.get(DatabaseName).is_err());
@@ -624,7 +628,7 @@ mod tests {
     #[tokio::test]
     async fn get_default_for_invalid_table_props() {
         let table = 
get_test_table_without_validation("table_props_invalid").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert!(panic::catch_unwind(|| 
configs.get_or_default(BaseFileFormat)).is_err());
         assert!(panic::catch_unwind(|| 
configs.get_or_default(Checksum)).is_err());
         assert_eq!(
@@ -654,7 +658,7 @@ mod tests {
     #[tokio::test]
     async fn get_valid_table_props() {
         let table = 
get_test_table_without_validation("table_props_valid").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert_eq!(
             configs.get(BaseFileFormat).unwrap().to::<String>(),
             "parquet"
@@ -691,7 +695,7 @@ mod tests {
     async fn get_global_table_props() {
         // Without the environment variable HUDI_CONF_DIR
         let table = 
get_test_table_without_validation("table_props_partial").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert!(configs.get(DatabaseName).is_err());
         assert!(configs.get(TableType).is_err());
         assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
@@ -701,7 +705,7 @@ mod tests {
         let hudi_conf_dir = base_path.join("random/wrong/dir");
         env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
         let table = 
get_test_table_without_validation("table_props_partial").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert!(configs.get(DatabaseName).is_err());
         assert!(configs.get(TableType).is_err());
         assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
@@ -711,7 +715,7 @@ mod tests {
         let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
         env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
         let table = 
get_test_table_without_validation("table_props_partial").await;
-        let configs = table.configs;
+        let configs = table.hudi_configs;
         assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
         assert_eq!(
             configs.get(TableType).unwrap().to::<String>(),
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index e34b738..7dcf482 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -406,16 +406,10 @@ mod tests {
     }
 
     fn create_hudi_configs(is_hive_style: bool, is_url_encoded: bool) -> 
HudiConfigs {
-        HudiConfigs::new(HashMap::from([
-            (
-                IsHiveStylePartitioning.as_ref().to_string(),
-                is_hive_style.to_string(),
-            ),
-            (
-                IsPartitionPathUrlencoded.as_ref().to_string(),
-                is_url_encoded.to_string(),
-            ),
-        ]))
+        HudiConfigs::new([
+            (IsHiveStylePartitioning, is_hive_style.to_string()),
+            (IsPartitionPathUrlencoded, is_url_encoded.to_string()),
+        ])
     }
     #[test]
     fn test_partition_pruner_new() {
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index e502cd6..6dc9df5 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -27,7 +27,6 @@ use anyhow::{anyhow, Context, Result};
 use arrow_schema::Schema;
 use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
-use url::Url;
 
 use crate::config::HudiConfigs;
 use crate::file_group::FileGroup;
@@ -91,22 +90,21 @@ impl Instant {
 #[derive(Clone, Debug)]
 #[allow(dead_code)]
 pub struct Timeline {
-    configs: Arc<HudiConfigs>,
+    hudi_configs: Arc<HudiConfigs>,
     pub(crate) storage: Arc<Storage>,
     pub instants: Vec<Instant>,
 }
 
 impl Timeline {
     pub async fn new(
-        base_url: Arc<Url>,
+        hudi_configs: Arc<HudiConfigs>,
         storage_options: Arc<HashMap<String, String>>,
-        configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
-        let storage = Storage::new(base_url, &storage_options)?;
+        let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
         let instants = Self::load_completed_commits(&storage).await?;
         Ok(Self {
             storage,
-            configs,
+            hudi_configs,
             instants,
         })
     }
@@ -222,19 +220,23 @@ mod tests {
 
     use hudi_tests::TestTable;
 
+    use crate::config::table::HudiTableConfig;
     use crate::config::HudiConfigs;
     use crate::table::timeline::{Instant, State, Timeline};
 
-    #[tokio::test]
-    async fn timeline_read_latest_schema() {
-        let base_url = TestTable::V6Nonpartitioned.url();
-        let timeline = Timeline::new(
-            Arc::new(base_url),
+    async fn create_test_timeline(base_url: Url) -> Timeline {
+        Timeline::new(
+            Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, 
base_url)])),
             Arc::new(HashMap::new()),
-            Arc::new(HudiConfigs::empty()),
         )
         .await
-        .unwrap();
+        .unwrap()
+    }
+
+    #[tokio::test]
+    async fn timeline_read_latest_schema() {
+        let base_url = TestTable::V6Nonpartitioned.url();
+        let timeline = create_test_timeline(base_url).await;
         let table_schema = timeline.get_latest_schema().await.unwrap();
         assert_eq!(table_schema.fields.len(), 21)
     }
@@ -242,13 +244,7 @@ mod tests {
     #[tokio::test]
     async fn timeline_read_latest_schema_from_empty_table() {
         let base_url = TestTable::V6Empty.url();
-        let timeline = Timeline::new(
-            Arc::new(base_url),
-            Arc::new(HashMap::new()),
-            Arc::new(HudiConfigs::empty()),
-        )
-        .await
-        .unwrap();
+        let timeline = create_test_timeline(base_url).await;
         let table_schema = timeline.get_latest_schema().await;
         assert!(table_schema.is_err());
         assert_eq!(
@@ -263,13 +259,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let timeline = Timeline::new(
-            Arc::new(base_url),
-            Arc::new(HashMap::new()),
-            Arc::new(HudiConfigs::empty()),
-        )
-        .await
-        .unwrap();
+        let timeline = create_test_timeline(base_url).await;
         assert_eq!(
             timeline.instants,
             vec![
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 6aa2035..b090696 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -39,7 +39,8 @@ use datafusion_expr::{Expr, TableType};
 use datafusion_physical_expr::create_physical_expr;
 
 use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::storage::utils::{empty_options, get_scheme_authority, 
parse_uri};
+use hudi_core::config::utils::empty_options;
+use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
 use hudi_core::table::Table as HudiTable;
 
 #[derive(Clone, Debug)]
@@ -66,7 +67,7 @@ impl HudiDataSource {
 
     fn get_input_partitions(&self) -> usize {
         self.table
-            .configs
+            .hudi_configs
             .get_or_default(InputPartitions)
             .to::<usize>()
     }
@@ -119,7 +120,13 @@ impl TableProvider for HudiDataSource {
             parquet_file_groups.push(parquet_file_group_vec)
         }
 
-        let url = 
ObjectStoreUrl::parse(get_scheme_authority(&self.table.base_url))?;
+        let base_url = self.table.base_url().map_err(|e| {
+            Execution(format!(
+                "Failed to get base path config from Hudi table: {}",
+                e
+            ))
+        })?;
+        let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
         let fsc = FileScanConfig::new(url, self.schema())
             .with_file_groups(parquet_file_groups)
             .with_projection(projection.cloned())
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index bc6023b..47da6aa 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -37,7 +37,7 @@ class HudiFileSlice:
 class HudiTable:
     def __init__(
         self,
-        table_uri: str,
+        base_uri: str,
         options: Optional[Dict[str, str]] = None,
     ): ...
     def get_schema(self) -> "pyarrow.Schema": ...
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 70b9f56..4c448b5 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -28,6 +28,12 @@ use tokio::runtime::Runtime;
 use hudi::file_group::FileSlice;
 use hudi::table::Table;
 
+macro_rules! vec_string_to_slice {
+    ($vec:expr) => {
+        &$vec.iter().map(AsRef::as_ref).collect::<Vec<_>>()
+    };
+}
+
 #[cfg(not(tarpaulin))]
 #[derive(Clone, Debug)]
 #[pyclass]
@@ -85,37 +91,34 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
     }
 }
 
-macro_rules! vec_string_to_slice {
-    ($vec:expr) => {
-        &$vec.iter().map(AsRef::as_ref).collect::<Vec<_>>()
-    };
-}
-
 #[cfg(not(tarpaulin))]
 #[pyclass]
 pub struct HudiTable {
-    _table: Table,
+    inner: Table,
 }
 
 #[cfg(not(tarpaulin))]
 #[pymethods]
 impl HudiTable {
     #[new]
-    #[pyo3(signature = (table_uri, options = None))]
-    fn new(table_uri: &str, options: Option<HashMap<String, String>>) -> 
PyResult<Self> {
-        let _table = rt().block_on(Table::new_with_options(
-            table_uri,
+    #[pyo3(signature = (base_uri, options=None))]
+    fn new_with_options(
+        base_uri: &str,
+        options: Option<HashMap<String, String>>,
+    ) -> PyResult<Self> {
+        let inner: Table = rt().block_on(Table::new_with_options(
+            base_uri,
             options.unwrap_or_default(),
         ))?;
-        Ok(HudiTable { _table })
+        Ok(HudiTable { inner })
     }
 
     fn get_schema(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self._table.get_schema())?.to_pyarrow(py)
+        rt().block_on(self.inner.get_schema())?.to_pyarrow(py)
     }
 
     fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self._table.get_partition_schema())?
+        rt().block_on(self.inner.get_partition_schema())?
             .to_pyarrow(py)
     }
 
@@ -128,7 +131,7 @@ impl HudiTable {
     ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
         py.allow_threads(|| {
             let file_slices = rt().block_on(
-                self._table
+                self.inner
                     .split_file_slices(n, 
vec_string_to_slice!(filters.unwrap_or_default())),
             )?;
             Ok(file_slices
@@ -146,7 +149,7 @@ impl HudiTable {
     ) -> PyResult<Vec<HudiFileSlice>> {
         py.allow_threads(|| {
             let file_slices = rt().block_on(
-                self._table
+                self.inner
                     
.get_file_slices(vec_string_to_slice!(filters.unwrap_or_default())),
             )?;
             Ok(file_slices.iter().map(convert_file_slice).collect())
@@ -154,14 +157,14 @@ impl HudiTable {
     }
 
     fn read_file_slice(&self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
-        rt().block_on(self._table.read_file_slice_by_path(relative_path))?
+        rt().block_on(self.inner.read_file_slice_by_path(relative_path))?
             .to_pyarrow(py)
     }
 
     #[pyo3(signature = (filters=None))]
     fn read_snapshot(&self, filters: Option<Vec<String>>, py: Python) -> 
PyResult<PyObject> {
         rt().block_on(
-            self._table
+            self.inner
                 
.read_snapshot(vec_string_to_slice!(filters.unwrap_or_default())),
         )?
         .to_pyarrow(py)

Reply via email to