This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fcc964  feat: add `HudiConfigs` for parsing and managing named 
configs (#37)
2fcc964 is described below

commit 2fcc964e945bbc9129af1955aa6b623f4a1367ed
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jul 5 05:07:30 2024 -0500

    feat: add `HudiConfigs` for parsing and managing named configs (#37)
    
    - introduce trait `ConfigParser` and `HudiConfigs`
    - introduce enums `HudiReadConfig` and `HudiTableConfig` to manage 
different categories of configs
    - make use of `HudiConfigs` and config enums to parse, validate, and 
retrieve hudi configs
    
    Fixes #38
---
 crates/core/Cargo.toml                             |   1 +
 crates/core/src/config/mod.rs                      | 171 +++++++-----
 crates/core/src/config/read.rs                     | 101 +++++++
 crates/core/src/config/table.rs                    | 226 ++++++++++++++++
 crates/core/src/table/config.rs                    | 144 ----------
 crates/core/src/table/fs_view.rs                   |  10 +-
 crates/core/src/table/metadata.rs                  |  55 ----
 crates/core/src/table/mod.rs                       | 292 +++++++++++----------
 crates/core/src/table/timeline.rs                  |  14 +-
 .../.hoodie/hoodie.properties                      |  27 +-
 .../.hoodie/hoodie.properties                      |   3 +-
 crates/datafusion/src/lib.rs                       |  20 +-
 crates/tests/src/utils.rs                          |  14 +
 13 files changed, 639 insertions(+), 439 deletions(-)

diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 5d04337..06e15a4 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -61,6 +61,7 @@ bytes = { workspace = true }
 chrono = { workspace = true, default-features = false, features = ["clock"] }
 hashbrown = "0.14.3"
 regex = { workspace = true }
+strum_macros = { workspace = true }
 uuid = { workspace = true, features = ["serde", "v4"] }
 url = { workspace = true }
 
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 3322df3..2399f93 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -16,103 +16,144 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use std::any::type_name;
 use std::collections::HashMap;
+use std::sync::Arc;
 
-use anyhow::{anyhow, Context, Result};
+use anyhow::Result;
 
-pub trait OptionsParser {
+pub mod read;
+pub mod table;
+
+pub trait ConfigParser: AsRef<str> {
     type Output;
 
-    fn parse_value(&self, options: &HashMap<String, String>) -> 
Result<Self::Output>;
+    fn default_value(&self) -> Option<Self::Output>;
 
-    fn parse_value_or_default(&self, options: &HashMap<String, String>) -> 
Self::Output;
-}
+    fn is_required(&self) -> bool {
+        false
+    }
+
+    fn validate(&self, configs: &HashMap<String, String>) -> Result<()> {
+        match self.parse_value(configs) {
+            Ok(_) => Ok(()),
+            Err(e) => {
+                if !self.is_required() && e.to_string().ends_with("not found") 
{
+                    // TODO: introduce error type to avoid checking "not found"
+                    Ok(())
+                } else {
+                    Err(e)
+                }
+            }
+        }
+    }
 
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
-pub enum HudiConfig {
-    ReadInputPartitions,
+    fn parse_value(&self, configs: &HashMap<String, String>) -> 
Result<Self::Output>;
+
+    fn parse_value_or_default(&self, configs: &HashMap<String, String>) -> 
Self::Output {
+        self.parse_value(configs).unwrap_or_else(|_| {
+            self.default_value()
+                .unwrap_or_else(|| panic!("No default value for config '{}'", 
self.as_ref()))
+        })
+    }
 }
 
 #[derive(Debug)]
 pub enum HudiConfigValue {
+    Boolean(bool),
     Integer(isize),
+    UInteger(usize),
+    String(String),
+    List(Vec<String>),
 }
 
 impl HudiConfigValue {
-    pub fn cast<T: 'static + TryFrom<isize> + TryFrom<usize> + 
std::fmt::Debug>(&self) -> T {
-        match self {
-            HudiConfigValue::Integer(value) => 
T::try_from(*value).unwrap_or_else(|_| {
-                panic!("Failed to convert isize to {}", 
std::any::type_name::<T>())
-            }),
+    pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T 
{
+        T::from(self)
+    }
+}
+
+impl From<HudiConfigValue> for bool {
+    fn from(value: HudiConfigValue) -> Self {
+        match value {
+            HudiConfigValue::Boolean(v) => v,
+            _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
         }
     }
 }
 
-impl HudiConfig {
-    fn default_value(&self) -> Option<HudiConfigValue> {
-        match self {
-            Self::ReadInputPartitions => Some(HudiConfigValue::Integer(0)),
+impl From<HudiConfigValue> for isize {
+    fn from(value: HudiConfigValue) -> Self {
+        match value {
+            HudiConfigValue::Integer(v) => v,
+            _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
         }
     }
 }
 
-impl AsRef<str> for HudiConfig {
-    fn as_ref(&self) -> &str {
-        match self {
-            Self::ReadInputPartitions => "hoodie.read.input.partitions",
+impl From<HudiConfigValue> for usize {
+    fn from(value: HudiConfigValue) -> Self {
+        match value {
+            HudiConfigValue::UInteger(v) => v,
+            _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
         }
     }
 }
 
-impl OptionsParser for HudiConfig {
-    type Output = HudiConfigValue;
-
-    fn parse_value(&self, options: &HashMap<String, String>) -> 
Result<Self::Output> {
-        match self {
-            HudiConfig::ReadInputPartitions => 
options.get(self.as_ref()).map_or_else(
-                || Err(anyhow!("Config '{}' not found", self.as_ref())),
-                |v| {
-                    v.parse::<isize>()
-                        .map(HudiConfigValue::Integer)
-                        .with_context(|| {
-                            format!("Failed to parse '{}' for config '{}'", v, 
self.as_ref())
-                        })
-                },
-            ),
+impl From<HudiConfigValue> for String {
+    fn from(value: HudiConfigValue) -> Self {
+        match value {
+            HudiConfigValue::Boolean(v) => v.to_string(),
+            HudiConfigValue::Integer(v) => v.to_string(),
+            HudiConfigValue::UInteger(v) => v.to_string(),
+            HudiConfigValue::String(v) => v,
+            _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
         }
     }
+}
 
-    fn parse_value_or_default(&self, options: &HashMap<String, String>) -> 
Self::Output {
-        self.parse_value(options).unwrap_or_else(|_| {
-            self.default_value()
-                .unwrap_or_else(|| panic!("No default value for config '{}'", 
self.as_ref()))
-        })
+impl From<HudiConfigValue> for Vec<String> {
+    fn from(value: HudiConfigValue) -> Self {
+        match value {
+            HudiConfigValue::List(v) => v,
+            _ => panic!("Cannot cast {:?} to {}", value, type_name::<Self>()),
+        }
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use crate::config::HudiConfig::ReadInputPartitions;
-    use crate::config::OptionsParser;
-    use std::collections::HashMap;
-
-    #[test]
-    fn parse_invalid_config_value() {
-        let options =
-            HashMap::from([(ReadInputPartitions.as_ref().to_string(), 
"foo".to_string())]);
-        let value = ReadInputPartitions.parse_value(&options);
-        assert_eq!(
-            value.err().unwrap().to_string(),
-            format!(
-                "Failed to parse 'foo' for config '{}'",
-                ReadInputPartitions.as_ref()
-            )
-        );
-        assert_eq!(
-            ReadInputPartitions
-                .parse_value_or_default(&options)
-                .cast::<isize>(),
-            0
-        );
+#[derive(Clone, Debug)]
+pub struct HudiConfigs {
+    pub raw_configs: Arc<HashMap<String, String>>,
+}
+
+impl HudiConfigs {
+    pub fn new(raw_configs: HashMap<String, String>) -> Self {
+        Self {
+            raw_configs: Arc::new(raw_configs),
+        }
+    }
+
+    pub fn empty() -> Self {
+        Self {
+            raw_configs: Arc::new(HashMap::new()),
+        }
+    }
+
+    pub fn validate(&self, parser: impl ConfigParser<Output = 
HudiConfigValue>) -> Result<()> {
+        parser.validate(&self.raw_configs)
+    }
+
+    pub fn get(
+        &self,
+        parser: impl ConfigParser<Output = HudiConfigValue>,
+    ) -> Result<HudiConfigValue> {
+        parser.parse_value(&self.raw_configs)
+    }
+
+    pub fn get_or_default(
+        &self,
+        parser: impl ConfigParser<Output = HudiConfigValue>,
+    ) -> HudiConfigValue {
+        parser.parse_value_or_default(&self.raw_configs)
     }
 }
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
new file mode 100644
index 0000000..43879de
--- /dev/null
+++ b/crates/core/src/config/read.rs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use anyhow::{anyhow, Result};
+
+use crate::config::{ConfigParser, HudiConfigValue};
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+pub enum HudiReadConfig {
+    InputPartitions,
+}
+
+impl AsRef<str> for HudiReadConfig {
+    fn as_ref(&self) -> &str {
+        match self {
+            Self::InputPartitions => "hoodie.read.input.partitions",
+        }
+    }
+}
+
+impl ConfigParser for HudiReadConfig {
+    type Output = HudiConfigValue;
+
+    fn default_value(&self) -> Option<HudiConfigValue> {
+        match self {
+            HudiReadConfig::InputPartitions => 
Some(HudiConfigValue::UInteger(0usize)),
+        }
+    }
+
+    fn parse_value(&self, configs: &HashMap<String, String>) -> 
Result<Self::Output> {
+        let get_result = configs
+            .get(self.as_ref())
+            .map(|v| v.as_str())
+            .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+
+        match self {
+            HudiReadConfig::InputPartitions => get_result
+                .and_then(|v| {
+                    usize::from_str(v).map_err(|e| {
+                        anyhow!(
+                            "Failed to parse '{}' for config '{}': {}",
+                            v,
+                            self.as_ref(),
+                            e
+                        )
+                    })
+                })
+                .map(HudiConfigValue::UInteger),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use crate::config::read::HudiReadConfig::InputPartitions;
+    use crate::config::ConfigParser;
+
+    #[test]
+    fn parse_valid_config_value() {
+        let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"100".to_string())]);
+        let value = 
InputPartitions.parse_value(&options).unwrap().to::<usize>();
+        assert_eq!(value, 100usize);
+    }
+
+    #[test]
+    fn parse_invalid_config_value() {
+        let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"foo".to_string())]);
+        let value = InputPartitions.parse_value(&options);
+        assert!(value.err().unwrap().to_string().starts_with(&format!(
+            "Failed to parse 'foo' for config '{}'",
+            InputPartitions.as_ref()
+        )));
+        assert_eq!(
+            InputPartitions
+                .parse_value_or_default(&options)
+                .to::<usize>(),
+            0
+        );
+    }
+}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
new file mode 100644
index 0000000..d503e32
--- /dev/null
+++ b/crates/core/src/config/table.rs
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use anyhow::anyhow;
+use anyhow::Result;
+use strum_macros::AsRefStr;
+
+use crate::config::{ConfigParser, HudiConfigValue};
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+pub enum HudiTableConfig {
+    BaseFileFormat,
+    Checksum,
+    DatabaseName,
+    DropsPartitionFields,
+    IsHiveStylePartitioning,
+    IsPartitionPathUrlencoded,
+    KeyGeneratorClass,
+    PartitionFields,
+    PrecombineField,
+    PopulatesMetaFields,
+    RecordKeyFields,
+    TableName,
+    TableType,
+    TableVersion,
+    TimelineLayoutVersion,
+}
+
+impl AsRef<str> for HudiTableConfig {
+    fn as_ref(&self) -> &str {
+        match self {
+            Self::BaseFileFormat => "hoodie.table.base.file.format",
+            Self::Checksum => "hoodie.table.checksum",
+            Self::DatabaseName => "hoodie.database.name",
+            Self::DropsPartitionFields => 
"hoodie.datasource.write.drop.partition.columns",
+            Self::IsHiveStylePartitioning => 
"hoodie.datasource.write.hive_style_partitioning",
+            Self::IsPartitionPathUrlencoded => 
"hoodie.datasource.write.partitionpath.urlencode",
+            Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
+            Self::PartitionFields => "hoodie.table.partition.fields",
+            Self::PrecombineField => "hoodie.table.precombine.field",
+            Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
+            Self::RecordKeyFields => "hoodie.table.recordkey.fields",
+            Self::TableName => "hoodie.table.name",
+            Self::TableType => "hoodie.table.type",
+            Self::TableVersion => "hoodie.table.version",
+            Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
+        }
+    }
+}
+
+impl ConfigParser for HudiTableConfig {
+    type Output = HudiConfigValue;
+
+    fn default_value(&self) -> Option<Self::Output> {
+        match self {
+            Self::DatabaseName => 
Some(HudiConfigValue::String("default".to_string())),
+            Self::DropsPartitionFields => 
Some(HudiConfigValue::Boolean(false)),
+            Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
+            _ => None,
+        }
+    }
+
+    fn is_required(&self) -> bool {
+        matches!(
+            self,
+            Self::BaseFileFormat | Self::TableName | Self::TableType | 
Self::TableVersion
+        )
+    }
+
+    fn parse_value(&self, configs: &HashMap<String, String>) -> 
Result<Self::Output> {
+        let get_result = configs
+            .get(self.as_ref())
+            .map(|v| v.as_str())
+            .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+
+        match self {
+            Self::BaseFileFormat => get_result
+                .and_then(BaseFileFormatValue::from_str)
+                .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+            Self::Checksum => get_result
+                .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Integer),
+            Self::DatabaseName => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::DropsPartitionFields => get_result
+                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Boolean),
+            Self::IsHiveStylePartitioning => get_result
+                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Boolean),
+            Self::IsPartitionPathUrlencoded => get_result
+                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Boolean),
+            Self::KeyGeneratorClass => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::PartitionFields => get_result
+                .map(|v| 
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
+            Self::PrecombineField => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::PopulatesMetaFields => get_result
+                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Boolean),
+            Self::RecordKeyFields => get_result
+                .map(|v| 
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
+            Self::TableName => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::TableType => get_result
+                .and_then(TableTypeValue::from_str)
+                .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+            Self::TableVersion => get_result
+                .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Integer),
+            Self::TimelineLayoutVersion => get_result
+                .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+                .map(HudiConfigValue::Integer),
+        }
+    }
+}
+
+#[derive(Clone, Debug, PartialEq, AsRefStr)]
+pub enum TableTypeValue {
+    #[strum(serialize = "COPY_ON_WRITE")]
+    CopyOnWrite,
+    #[strum(serialize = "MERGE_ON_READ")]
+    MergeOnRead,
+}
+
+impl FromStr for TableTypeValue {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> Result<Self> {
+        match s.to_ascii_lowercase().as_str() {
+            "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
+            "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
+            _ => Err(anyhow!("Unsupported table type: {}", s)),
+        }
+    }
+}
+
+#[derive(Clone, Debug, PartialEq, AsRefStr)]
+pub enum BaseFileFormatValue {
+    #[strum(serialize = "parquet")]
+    Parquet,
+}
+
+impl FromStr for BaseFileFormatValue {
+    type Err = anyhow::Error;
+
+    fn from_str(s: &str) -> Result<Self> {
+        match s.to_ascii_lowercase().as_str() {
+            "parquet" => Ok(Self::Parquet),
+            _ => Err(anyhow!("Unsupported base file format: {}", s)),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::str::FromStr;
+
+    use crate::config::table::{BaseFileFormatValue, TableTypeValue};
+
+    #[test]
+    fn create_table_type() {
+        assert_eq!(
+            TableTypeValue::from_str("cow").unwrap(),
+            TableTypeValue::CopyOnWrite
+        );
+        assert_eq!(
+            TableTypeValue::from_str("copy_on_write").unwrap(),
+            TableTypeValue::CopyOnWrite
+        );
+        assert_eq!(
+            TableTypeValue::from_str("COPY-ON-WRITE").unwrap(),
+            TableTypeValue::CopyOnWrite
+        );
+        assert_eq!(
+            TableTypeValue::from_str("MOR").unwrap(),
+            TableTypeValue::MergeOnRead
+        );
+        assert_eq!(
+            TableTypeValue::from_str("Merge_on_read").unwrap(),
+            TableTypeValue::MergeOnRead
+        );
+        assert_eq!(
+            TableTypeValue::from_str("Merge-on-read").unwrap(),
+            TableTypeValue::MergeOnRead
+        );
+        assert!(TableTypeValue::from_str("").is_err());
+        assert!(TableTypeValue::from_str("copyonwrite").is_err());
+        assert!(TableTypeValue::from_str("MERGEONREAD").is_err());
+        assert!(TableTypeValue::from_str("foo").is_err());
+    }
+
+    #[test]
+    fn create_base_file_format() {
+        assert_eq!(
+            BaseFileFormatValue::from_str("parquet").unwrap(),
+            BaseFileFormatValue::Parquet
+        );
+        assert_eq!(
+            BaseFileFormatValue::from_str("PArquet").unwrap(),
+            BaseFileFormatValue::Parquet
+        );
+        assert!(TableTypeValue::from_str("").is_err());
+        assert!(
+            TableTypeValue::from_str("orc").is_err(),
+            "orc is not yet supported."
+        );
+    }
+}
diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs
deleted file mode 100644
index 6a1a6ce..0000000
--- a/crates/core/src/table/config.rs
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use anyhow::anyhow;
-use anyhow::Result;
-use std::str::FromStr;
-
-pub enum ConfigKey {
-    BaseFileFormat,
-    Checksum,
-    DatabaseName,
-    DropsPartitionFields,
-    IsHiveStylePartitioning,
-    IsPartitionPathUrlencoded,
-    KeyGeneratorClass,
-    PartitionFields,
-    PrecombineField,
-    PopulatesMetaFields,
-    RecordKeyFields,
-    TableName,
-    TableType,
-    TableVersion,
-    TimelineLayoutVersion,
-}
-
-impl AsRef<str> for ConfigKey {
-    fn as_ref(&self) -> &str {
-        match self {
-            Self::BaseFileFormat => "hoodie.table.base.file.format",
-            Self::Checksum => "hoodie.table.checksum",
-            Self::DatabaseName => "hoodie.database.name",
-            Self::DropsPartitionFields => 
"hoodie.datasource.write.drop.partition.columns",
-            Self::IsHiveStylePartitioning => 
"hoodie.datasource.write.hive_style_partitioning",
-            Self::IsPartitionPathUrlencoded => 
"hoodie.datasource.write.partitionpath.urlencode",
-            Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
-            Self::PartitionFields => "hoodie.table.partition.fields",
-            Self::PrecombineField => "hoodie.table.precombine.field",
-            Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
-            Self::RecordKeyFields => "hoodie.table.recordkey.fields",
-            Self::TableName => "hoodie.table.name",
-            Self::TableType => "hoodie.table.type",
-            Self::TableVersion => "hoodie.table.version",
-            Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
-        }
-    }
-}
-
-#[derive(Debug, PartialEq)]
-pub enum TableType {
-    CopyOnWrite,
-    MergeOnRead,
-}
-
-impl FromStr for TableType {
-    type Err = anyhow::Error;
-
-    fn from_str(s: &str) -> Result<Self> {
-        match s.to_ascii_lowercase().as_str() {
-            "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
-            "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
-            _ => Err(anyhow!("Unsupported table type: {}", s)),
-        }
-    }
-}
-
-#[derive(Debug, PartialEq)]
-pub enum BaseFileFormat {
-    Parquet,
-}
-
-impl FromStr for BaseFileFormat {
-    type Err = anyhow::Error;
-
-    fn from_str(s: &str) -> Result<Self> {
-        match s.to_ascii_lowercase().as_str() {
-            "parquet" => Ok(Self::Parquet),
-            _ => Err(anyhow!("Unsupported base file format: {}", s)),
-        }
-    }
-}
-#[cfg(test)]
-mod tests {
-    use crate::table::config::{BaseFileFormat, TableType};
-    use std::str::FromStr;
-
-    #[test]
-    fn create_table_type() {
-        assert_eq!(TableType::from_str("cow").unwrap(), 
TableType::CopyOnWrite);
-        assert_eq!(
-            TableType::from_str("copy_on_write").unwrap(),
-            TableType::CopyOnWrite
-        );
-        assert_eq!(
-            TableType::from_str("COPY-ON-WRITE").unwrap(),
-            TableType::CopyOnWrite
-        );
-        assert_eq!(TableType::from_str("MOR").unwrap(), 
TableType::MergeOnRead);
-        assert_eq!(
-            TableType::from_str("Merge_on_read").unwrap(),
-            TableType::MergeOnRead
-        );
-        assert_eq!(
-            TableType::from_str("Merge-on-read").unwrap(),
-            TableType::MergeOnRead
-        );
-        assert!(TableType::from_str("").is_err());
-        assert!(TableType::from_str("copyonwrite").is_err());
-        assert!(TableType::from_str("MERGEONREAD").is_err());
-        assert!(TableType::from_str("foo").is_err());
-    }
-
-    #[test]
-    fn create_base_file_format() {
-        assert_eq!(
-            BaseFileFormat::from_str("parquet").unwrap(),
-            BaseFileFormat::Parquet
-        );
-        assert_eq!(
-            BaseFileFormat::from_str("PArquet").unwrap(),
-            BaseFileFormat::Parquet
-        );
-        assert!(TableType::from_str("").is_err());
-        assert!(
-            TableType::from_str("orc").is_err(),
-            "orc is not yet supported."
-        );
-    }
-}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index ad4f5f5..02d3ca4 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,6 +25,7 @@ use arrow::record_batch::RecordBatch;
 use dashmap::DashMap;
 use url::Url;
 
+use crate::config::HudiConfigs;
 use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
@@ -32,7 +33,7 @@ use crate::storage::{get_leaf_dirs, Storage};
 #[derive(Clone, Debug)]
 #[allow(dead_code)]
 pub struct FileSystemView {
-    props: Arc<HashMap<String, String>>,
+    configs: Arc<HudiConfigs>,
     storage: Arc<Storage>,
     partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
 }
@@ -41,7 +42,7 @@ impl FileSystemView {
     pub async fn new(
         base_url: Arc<Url>,
         storage_options: Arc<HashMap<String, String>>,
-        props: Arc<HashMap<String, String>>,
+        configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
         let storage = Storage::new(base_url, storage_options)?;
         let partition_paths = Self::load_partition_paths(&storage).await?;
@@ -49,7 +50,7 @@ impl FileSystemView {
             Self::load_file_groups_for_partitions(&storage, 
partition_paths).await?;
         let partition_to_file_groups = 
Arc::new(DashMap::from_iter(partition_to_file_groups));
         Ok(FileSystemView {
-            props,
+            configs,
             storage,
             partition_to_file_groups,
         })
@@ -168,6 +169,7 @@ mod tests {
 
     use hudi_tests::TestTable;
 
+    use crate::config::HudiConfigs;
     use crate::storage::Storage;
     use crate::table::fs_view::FileSystemView;
 
@@ -208,7 +210,7 @@ mod tests {
         let fs_view = FileSystemView::new(
             Arc::new(base_url),
             Arc::new(HashMap::new()),
-            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::empty()),
         )
         .await
         .unwrap();
diff --git a/crates/core/src/table/metadata.rs 
b/crates/core/src/table/metadata.rs
deleted file mode 100644
index fb2d41b..0000000
--- a/crates/core/src/table/metadata.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use crate::table::config::{BaseFileFormat, TableType};
-
-pub trait ProvidesTableMetadata {
-    fn base_file_format(&self) -> BaseFileFormat;
-
-    fn checksum(&self) -> i64;
-
-    fn database_name(&self) -> String;
-
-    fn drops_partition_fields(&self) -> bool;
-
-    fn is_hive_style_partitioning(&self) -> bool;
-
-    fn is_partition_path_urlencoded(&self) -> bool;
-
-    fn is_partitioned(&self) -> bool;
-
-    fn key_generator_class(&self) -> String;
-
-    fn location(&self) -> String;
-
-    fn partition_fields(&self) -> Vec<String>;
-
-    fn precombine_field(&self) -> String;
-
-    fn populates_meta_fields(&self) -> bool;
-
-    fn record_key_fields(&self) -> Vec<String>;
-
-    fn table_name(&self) -> String;
-
-    fn table_type(&self) -> TableType;
-
-    fn table_version(&self) -> u32;
-
-    fn timeline_layout_version(&self) -> u32;
-}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 849f0e0..e09fd10 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -19,7 +19,6 @@
 
 use std::collections::HashMap;
 use std::io::{BufRead, BufReader};
-use std::str::FromStr;
 use std::sync::Arc;
 
 use anyhow::{anyhow, Context, Result};
@@ -27,25 +26,21 @@ use arrow::record_batch::RecordBatch;
 use arrow_schema::Schema;
 use url::Url;
 
+use crate::config::HudiConfigs;
 use crate::file_group::FileSlice;
 use crate::storage::utils::parse_uri;
 use crate::storage::Storage;
-use crate::table::config::BaseFileFormat;
-use crate::table::config::{ConfigKey, TableType};
 use crate::table::fs_view::FileSystemView;
-use crate::table::metadata::ProvidesTableMetadata;
 use crate::table::timeline::Timeline;
 
-mod config;
 mod fs_view;
-mod metadata;
 mod timeline;
 
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
 pub struct Table {
     pub base_url: Arc<Url>,
     pub storage_options: Arc<HashMap<String, String>>,
-    pub props: Arc<HashMap<String, String>>,
+    pub configs: Arc<HudiConfigs>,
     pub timeline: Timeline,
     pub file_system_view: FileSystemView,
 }
@@ -55,24 +50,24 @@ impl Table {
         let base_url = Arc::new(parse_uri(base_uri)?);
         let storage_options = Arc::new(storage_options);
 
-        let props = Self::load_properties(base_url.clone(), 
storage_options.clone())
+        let configs = Self::load_properties(base_url.clone(), 
storage_options.clone())
             .await
             .context("Failed to load table properties")?;
+        let configs = Arc::new(configs);
 
-        let props = Arc::new(props);
-        let timeline = Timeline::new(base_url.clone(), 
storage_options.clone(), props.clone())
+        let timeline = Timeline::new(base_url.clone(), 
storage_options.clone(), configs.clone())
             .await
             .context("Failed to load timeline")?;
 
         let file_system_view =
-            FileSystemView::new(base_url.clone(), storage_options.clone(), 
props.clone())
+            FileSystemView::new(base_url.clone(), storage_options.clone(), 
configs.clone())
                 .await
                 .context("Failed to load file system view")?;
 
         Ok(Table {
             base_url,
             storage_options,
-            props,
+            configs,
             timeline,
             file_system_view,
         })
@@ -81,7 +76,7 @@ impl Table {
     async fn load_properties(
         base_url: Arc<Url>,
         storage_options: Arc<HashMap<String, String>>,
-    ) -> Result<HashMap<String, String>> {
+    ) -> Result<HudiConfigs> {
         let storage = Storage::new(base_url, storage_options)?;
         let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
         let cursor = std::io::Cursor::new(data);
@@ -98,14 +93,7 @@ impl Table {
             let value = parts.next().unwrap_or("").to_owned();
             properties.insert(key, value);
         }
-        Ok(properties)
-    }
-
-    pub fn get_property(&self, key: &str) -> &str {
-        match self.props.get(key) {
-            Some(value) => value,
-            None => panic!("Failed to retrieve property {}", key),
-        }
+        Ok(HudiConfigs::new(properties))
     }
 
     pub async fn get_schema(&self) -> Result<Schema> {
@@ -178,101 +166,24 @@ impl Table {
     }
 }
 
-impl ProvidesTableMetadata for Table {
-    fn base_file_format(&self) -> BaseFileFormat {
-        
BaseFileFormat::from_str(self.get_property(ConfigKey::BaseFileFormat.as_ref())).unwrap()
-    }
-
-    fn checksum(&self) -> i64 {
-        i64::from_str(self.get_property(ConfigKey::Checksum.as_ref())).unwrap()
-    }
-
-    fn database_name(&self) -> String {
-        match self.props.get(ConfigKey::DatabaseName.as_ref()) {
-            Some(value) => value.to_string(),
-            None => "default".to_string(),
-        }
-    }
-
-    fn drops_partition_fields(&self) -> bool {
-        
bool::from_str(self.get_property(ConfigKey::DropsPartitionFields.as_ref())).unwrap()
-    }
-
-    fn is_hive_style_partitioning(&self) -> bool {
-        
bool::from_str(self.get_property(ConfigKey::IsHiveStylePartitioning.as_ref())).unwrap()
-    }
-
-    fn is_partition_path_urlencoded(&self) -> bool {
-        
bool::from_str(self.get_property(ConfigKey::IsPartitionPathUrlencoded.as_ref())).unwrap()
-    }
-
-    fn is_partitioned(&self) -> bool {
-        !self
-            .key_generator_class()
-            .ends_with("NonpartitionedKeyGenerator")
-    }
-
-    fn key_generator_class(&self) -> String {
-        self.get_property(ConfigKey::KeyGeneratorClass.as_ref())
-            .to_string()
-    }
-
-    fn location(&self) -> String {
-        self.base_url.path().to_string()
-    }
-
-    fn partition_fields(&self) -> Vec<String> {
-        self.get_property(ConfigKey::PartitionFields.as_ref())
-            .split(',')
-            .map(str::to_string)
-            .collect()
-    }
-
-    fn precombine_field(&self) -> String {
-        self.get_property(ConfigKey::PrecombineField.as_ref())
-            .to_string()
-    }
-
-    fn populates_meta_fields(&self) -> bool {
-        
bool::from_str(self.get_property(ConfigKey::PopulatesMetaFields.as_ref())).unwrap()
-    }
-
-    fn record_key_fields(&self) -> Vec<String> {
-        self.get_property(ConfigKey::RecordKeyFields.as_ref())
-            .split(',')
-            .map(str::to_string)
-            .collect()
-    }
-
-    fn table_name(&self) -> String {
-        self.get_property(ConfigKey::TableName.as_ref()).to_string()
-    }
-
-    fn table_type(&self) -> TableType {
-        
TableType::from_str(self.get_property(ConfigKey::TableType.as_ref())).unwrap()
-    }
-
-    fn table_version(&self) -> u32 {
-        
u32::from_str(self.get_property(ConfigKey::TableVersion.as_ref())).unwrap()
-    }
-
-    fn timeline_layout_version(&self) -> u32 {
-        
u32::from_str(self.get_property(ConfigKey::TimelineLayoutVersion.as_ref())).unwrap()
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::collections::{HashMap, HashSet};
     use std::fs::canonicalize;
+    use std::panic;
     use std::path::Path;
 
-    use hudi_tests::TestTable;
+    use url::Url;
 
+    use hudi_tests::{assert_not, TestTable};
+
+    use crate::config::table::HudiTableConfig::{
+        BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields, 
IsHiveStylePartitioning,
+        IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields, 
PopulatesMetaFields,
+        PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
+        TimelineLayoutVersion,
+    };
     use crate::storage::utils::join_url_segments;
-    use crate::table::config::BaseFileFormat::Parquet;
-    use crate::table::config::TableType::CopyOnWrite;
-    use crate::table::metadata::ProvidesTableMetadata;
     use crate::table::Table;
 
     #[tokio::test]
@@ -411,33 +322,146 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn hudi_table_get_table_metadata() {
-        let base_path = canonicalize(Path::new(
-            "tests/data/table_metadata/sample_table_properties",
-        ))
-        .unwrap();
-        let table = Table::new(base_path.to_str().unwrap(), HashMap::new())
-            .await
-            .unwrap();
-        assert_eq!(table.base_file_format(), Parquet);
-        assert_eq!(table.checksum(), 3761586722);
-        assert_eq!(table.database_name(), "default");
-        assert!(!table.drops_partition_fields());
-        assert!(!table.is_hive_style_partitioning());
-        assert!(!table.is_partition_path_urlencoded());
-        assert!(table.is_partitioned());
+    async fn validate_invalid_table_props() {
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
+                .unwrap();
+        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let configs = table.configs;
+        assert!(
+            configs.validate(BaseFileFormat).is_err(),
+            "required config is missing"
+        );
+        assert!(configs.validate(Checksum).is_err());
+        assert!(
+            configs.validate(DatabaseName).is_ok(),
+            "non-required config is missing"
+        );
+        assert!(configs.validate(DropsPartitionFields).is_err());
+        assert!(configs.validate(IsHiveStylePartitioning).is_err());
+        assert!(configs.validate(IsPartitionPathUrlencoded).is_err());
+        assert!(
+            configs.validate(KeyGeneratorClass).is_ok(),
+            "non-required config is missing"
+        );
+        assert!(
+            configs.validate(PartitionFields).is_ok(),
+            "non-required config is missing"
+        );
+        assert!(
+            configs.validate(PrecombineField).is_ok(),
+            "non-required config is missing"
+        );
+        assert!(
+            configs.validate(PopulatesMetaFields).is_ok(),
+            "non-required config is missing"
+        );
+        assert!(
+            configs.validate(RecordKeyFields).is_ok(),
+            "non-required config is missing"
+        );
+        assert!(
+            configs.validate(TableName).is_err(),
+            "required config is missing"
+        );
+        assert!(
+            configs.validate(TableType).is_ok(),
+            "Valid table type value"
+        );
+        assert!(configs.validate(TableVersion).is_err());
+        assert!(configs.validate(TimelineLayoutVersion).is_err());
+    }
+
+    #[tokio::test]
+    async fn get_invalid_table_props() {
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
+                .unwrap();
+        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let configs = table.configs;
+        assert!(configs.get(BaseFileFormat).is_err());
+        assert!(configs.get(Checksum).is_err());
+        assert!(configs.get(DatabaseName).is_err());
+        assert!(configs.get(DropsPartitionFields).is_err());
+        assert!(configs.get(IsHiveStylePartitioning).is_err());
+        assert!(configs.get(IsPartitionPathUrlencoded).is_err());
+        assert!(configs.get(KeyGeneratorClass).is_err());
+        assert!(configs.get(PartitionFields).is_err());
+        assert!(configs.get(PrecombineField).is_err());
+        assert!(configs.get(PopulatesMetaFields).is_err());
+        assert!(configs.get(RecordKeyFields).is_err());
+        assert!(configs.get(TableName).is_err());
+        assert!(configs.get(TableType).is_ok(), "Valid table type value");
+        assert!(configs.get(TableVersion).is_err());
+        assert!(configs.get(TimelineLayoutVersion).is_err());
+    }
+
+    #[tokio::test]
+    async fn get_default_for_invalid_table_props() {
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
+                .unwrap();
+        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let configs = table.configs;
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(BaseFileFormat)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(Checksum)).is_err());
+        assert_eq!(
+            configs.get_or_default(DatabaseName).to::<String>(),
+            "default"
+        );
+        assert_not!(configs.get_or_default(DropsPartitionFields).to::<bool>());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(IsHiveStylePartitioning)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(KeyGeneratorClass)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(PartitionFields)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(PrecombineField)).is_err());
+        assert!(configs.get_or_default(PopulatesMetaFields).to::<bool>());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(RecordKeyFields)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(TableName)).is_err());
+        assert_eq!(
+            configs.get_or_default(TableType).to::<String>(),
+            "COPY_ON_WRITE"
+        );
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(TableVersion)).is_err());
+        assert!(panic::catch_unwind(|| 
configs.get_or_default(TimelineLayoutVersion)).is_err());
+    }
+
+    #[tokio::test]
+    async fn get_valid_table_props() {
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
+                .unwrap();
+        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let configs = table.configs;
+        assert_eq!(
+            configs.get(BaseFileFormat).unwrap().to::<String>(),
+            "parquet"
+        );
+        assert_eq!(configs.get(Checksum).unwrap().to::<isize>(), 3761586722);
+        assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "db");
+        assert!(!configs.get(DropsPartitionFields).unwrap().to::<bool>());
+        assert!(!configs.get(IsHiveStylePartitioning).unwrap().to::<bool>());
+        assert!(!configs.get(IsPartitionPathUrlencoded).unwrap().to::<bool>());
         assert_eq!(
-            table.key_generator_class(),
+            configs.get(KeyGeneratorClass).unwrap().to::<String>(),
             "org.apache.hudi.keygen.SimpleKeyGenerator"
         );
-        assert_eq!(table.location(), base_path.to_str().unwrap());
-        assert_eq!(table.partition_fields(), vec!["city"]);
-        assert_eq!(table.precombine_field(), "ts");
-        assert!(table.populates_meta_fields());
-        assert_eq!(table.record_key_fields(), vec!["uuid"]);
-        assert_eq!(table.table_name(), "trips");
-        assert_eq!(table.table_type(), CopyOnWrite);
-        assert_eq!(table.table_version(), 6);
-        assert_eq!(table.timeline_layout_version(), 1);
+        assert_eq!(
+            configs.get(PartitionFields).unwrap().to::<Vec<String>>(),
+            vec!["city"]
+        );
+        assert_eq!(configs.get(PrecombineField).unwrap().to::<String>(), "ts");
+        assert!(configs.get(PopulatesMetaFields).unwrap().to::<bool>());
+        assert_eq!(
+            configs.get(RecordKeyFields).unwrap().to::<Vec<String>>(),
+            vec!["uuid"]
+        );
+        assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+        assert_eq!(
+            configs.get(TableType).unwrap().to::<String>(),
+            "COPY_ON_WRITE"
+        );
+        assert_eq!(configs.get(TableVersion).unwrap().to::<isize>(), 6);
+        assert_eq!(configs.get(TimelineLayoutVersion).unwrap().to::<isize>(), 
1);
     }
 }
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index 51601a4..d2d476d 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -29,6 +29,7 @@ use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
 use url::Url;
 
+use crate::config::HudiConfigs;
 use crate::storage::utils::split_filename;
 use crate::storage::Storage;
 
@@ -76,7 +77,7 @@ impl Instant {
 #[derive(Clone, Debug)]
 #[allow(dead_code)]
 pub struct Timeline {
-    props: Arc<HashMap<String, String>>,
+    configs: Arc<HudiConfigs>,
     storage: Arc<Storage>,
     pub instants: Vec<Instant>,
 }
@@ -85,13 +86,13 @@ impl Timeline {
     pub async fn new(
         base_url: Arc<Url>,
         storage_options: Arc<HashMap<String, String>>,
-        props: Arc<HashMap<String, String>>,
+        configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
         let storage = Storage::new(base_url, storage_options)?;
         let instants = Self::load_completed_commit_instants(&storage).await?;
         Ok(Self {
             storage,
-            props,
+            configs,
             instants,
         })
     }
@@ -179,6 +180,7 @@ mod tests {
 
     use hudi_tests::TestTable;
 
+    use crate::config::HudiConfigs;
     use crate::table::timeline::{Instant, State, Timeline};
 
     #[tokio::test]
@@ -187,7 +189,7 @@ mod tests {
         let timeline = Timeline::new(
             Arc::new(base_url),
             Arc::new(HashMap::new()),
-            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::empty()),
         )
         .await
         .unwrap();
@@ -201,7 +203,7 @@ mod tests {
         let timeline = Timeline::new(
             Arc::new(base_url),
             Arc::new(HashMap::new()),
-            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::empty()),
         )
         .await
         .unwrap();
@@ -222,7 +224,7 @@ mod tests {
         let timeline = Timeline::new(
             Arc::new(base_url),
             Arc::new(HashMap::new()),
-            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::empty()),
         )
         .await
         .unwrap();
diff --git 
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
 b/crates/core/tests/data/table_props_invalid/.hoodie/hoodie.properties
similarity index 51%
copy from 
crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
copy to crates/core/tests/data/table_props_invalid/.hoodie/hoodie.properties
index 9c0c7e2..242d34f 100644
--- 
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
+++ b/crates/core/tests/data/table_props_invalid/.hoodie/hoodie.properties
@@ -17,22 +17,11 @@
 # under the License.
 #
 
-hoodie.table.type=COPY_ON_WRITE
-hoodie.table.metadata.partitions=files
-hoodie.table.precombine.field=ts
-hoodie.table.partition.fields=city
-hoodie.archivelog.folder=archived
-hoodie.table.cdc.enabled=false
-hoodie.timeline.layout.version=1
-hoodie.table.checksum=3761586722
-hoodie.datasource.write.drop.partition.columns=false
-hoodie.table.recordkey.fields=uuid
-hoodie.table.name=trips
-hoodie.partition.metafile.use.base.format=false
-hoodie.datasource.write.hive_style_partitioning=false
-hoodie.table.metadata.partitions.inflight=
-hoodie.populate.meta.fields=true
-hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
-hoodie.table.base.file.format=PARQUET
-hoodie.datasource.write.partitionpath.urlencode=false
-hoodie.table.version=6
+hoodie.table.type=cow
+hoodie.table.base.file.format=foo
+hoodie.table.checksum=bar
+hoodie.table.version=baz
+hoodie.timeline.layout.version=_
+hoodie.datasource.write.drop.partition.columns=1
+hoodie.datasource.write.hive_style_partitioning=0
+hoodie.datasource.write.partitionpath.urlencode=-1
diff --git 
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
 b/crates/core/tests/data/table_props_valid/.hoodie/hoodie.properties
similarity index 96%
rename from 
crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
rename to crates/core/tests/data/table_props_valid/.hoodie/hoodie.properties
index 9c0c7e2..d1e5ac6 100644
--- 
a/crates/core/tests/data/table_metadata/sample_table_properties/.hoodie/hoodie.properties
+++ b/crates/core/tests/data/table_props_valid/.hoodie/hoodie.properties
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-hoodie.table.type=COPY_ON_WRITE
+hoodie.table.type=copy_on_write
 hoodie.table.metadata.partitions=files
 hoodie.table.precombine.field=ts
 hoodie.table.partition.fields=city
@@ -28,6 +28,7 @@ hoodie.table.checksum=3761586722
 hoodie.datasource.write.drop.partition.columns=false
 hoodie.table.recordkey.fields=uuid
 hoodie.table.name=trips
+hoodie.database.name=db
 hoodie.partition.metafile.use.base.format=false
 hoodie.datasource.write.hive_style_partitioning=false
 hoodie.table.metadata.partitions.inflight=
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 3b0a73a..1435d94 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -38,8 +38,8 @@ use datafusion_expr::{Expr, TableType};
 use datafusion_physical_expr::create_physical_expr;
 use DataFusionError::Execution;
 
-use hudi_core::config::HudiConfig::ReadInputPartitions;
-use hudi_core::config::OptionsParser;
+use hudi_core::config::read::HudiReadConfig::InputPartitions;
+use hudi_core::config::ConfigParser;
 use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
 use hudi_core::HudiTable;
 
@@ -51,9 +51,9 @@ pub struct HudiDataSource {
 
 impl HudiDataSource {
     pub async fn new(base_uri: &str, options: HashMap<String, String>) -> 
Result<Self> {
-        let input_partitions = ReadInputPartitions
+        let input_partitions = InputPartitions
             .parse_value_or_default(&options)
-            .cast::<usize>();
+            .to::<usize>();
         match HudiTable::new(base_uri, options).await {
             Ok(t) => Ok(Self {
                 table: Arc::new(t),
@@ -136,15 +136,13 @@ mod tests {
     use datafusion::prelude::{SessionConfig, SessionContext};
     use datafusion_common::ScalarValue;
 
-    use hudi_core::config::HudiConfig::ReadInputPartitions;
-    use hudi_tests::utils::get_bool_column;
+    use hudi_core::config::read::HudiReadConfig::InputPartitions;
     use hudi_tests::TestTable::{
-        V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields, 
V6SimplekeygenNonhivestyle,
-        V6TimebasedkeygenNonhivestyle,
+        V6ComplexkeygenHivestyle, V6Nonpartitioned, 
V6SimplekeygenHivestyleNoMetafields,
+        V6SimplekeygenNonhivestyle, V6TimebasedkeygenNonhivestyle,
     };
     use hudi_tests::{utils, TestTable};
-    use utils::{get_i32_column, get_str_column};
-    use TestTable::V6ComplexkeygenHivestyle;
+    use utils::{get_bool_column, get_i32_column, get_str_column};
 
     use crate::HudiDataSource;
 
@@ -215,7 +213,7 @@ mod tests {
             println!(">>> testing for {}", test_table.as_ref());
             let ctx = prepare_session_context(
                 test_table,
-                &[(ReadInputPartitions.as_ref().to_string(), "2".to_string())],
+                &[(InputPartitions.as_ref().to_string(), "2".to_string())],
             )
             .await;
 
diff --git a/crates/tests/src/utils.rs b/crates/tests/src/utils.rs
index 0f7d44b..73fafd9 100644
--- a/crates/tests/src/utils.rs
+++ b/crates/tests/src/utils.rs
@@ -20,6 +20,20 @@
 use arrow::record_batch::RecordBatch;
 use arrow_array::{Array, BooleanArray, Int32Array, StringArray};
 
+#[macro_export]
+macro_rules! assert_not {
+    ($cond:expr) => {
+        if $cond {
+            panic!("assertion failed: condition is true");
+        }
+    };
+    ($cond:expr, $($arg:tt)+) => {
+        if $cond {
+            panic!("assertion failed: condition is true: {}", 
format_args!($($arg)+));
+        }
+    };
+}
+
 #[macro_export]
 macro_rules! assert_approx_eq {
     ($a:expr, $b:expr, $delta:expr) => {{

Reply via email to