This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cab8ac  refactor: reorganize custom error types (#215)
0cab8ac is described below

commit 0cab8ac1b35cf5024a82649e56216f2836b10de9
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Dec 5 23:15:08 2024 -1000

    refactor: reorganize custom error types (#215)
---
 crates/core/src/config/error.rs                    | 47 +++++++++++
 crates/core/src/config/internal.rs                 | 15 ++--
 crates/core/src/config/mod.rs                      | 17 ++--
 crates/core/src/config/read.rs                     | 24 ++----
 crates/core/src/config/table.rs                    | 96 ++++++++++------------
 crates/core/src/config/{utils.rs => util.rs}       | 13 +--
 crates/core/src/error.rs                           | 56 +++++++++++++
 crates/core/src/file_group/mod.rs                  | 38 ++++++---
 crates/core/src/file_group/reader.rs               | 26 +++++-
 crates/core/src/lib.rs                             | 47 +----------
 crates/core/src/storage/error.rs                   | 45 ++++++++++
 crates/core/src/storage/mod.rs                     | 67 ++++++++-------
 crates/core/src/storage/{utils.rs => util.rs}      | 39 +++------
 crates/core/src/table/builder.rs                   |  9 +-
 crates/core/src/table/fs_view.rs                   |  3 +-
 crates/core/src/table/mod.rs                       | 33 +++-----
 crates/core/src/table/partition.rs                 | 38 +++++----
 crates/core/src/table/timeline.rs                  | 61 ++++++++++++--
 .../.hoodie/20240402123035233.commit               |  0
 .../.hoodie/20240402123035233.commit.requested     |  0
 .../.hoodie/20240402123035233.inflight             |  0
 .../.hoodie/20240402144910683.commit               |  7 ++
 .../.hoodie/20240402144910683.commit.requested     |  0
 .../.hoodie/20240402144910683.inflight             |  0
 .../.hoodie/hoodie.properties                      |  0
 crates/datafusion/src/lib.rs                       |  4 +-
 python/src/internal.rs                             | 21 ++---
 27 files changed, 436 insertions(+), 270 deletions(-)

diff --git a/crates/core/src/config/error.rs b/crates/core/src/config/error.rs
new file mode 100644
index 0000000..6d852f6
--- /dev/null
+++ b/crates/core/src/config/error.rs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::num::ParseIntError;
+use std::str::ParseBoolError;
+use thiserror::Error;
+
+pub type Result<T, E = ConfigError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum ConfigError {
+    /// ParseBool(key, value, source)
+    #[error("Failed to parse '{1}' for config '{0}': {2}")]
+    ParseBool(String, String, ParseBoolError),
+
+    /// ParseInt(key, value, source)
+    #[error("Failed to parse '{1}' for config '{0}': {2}")]
+    ParseInt(String, String, ParseIntError),
+
+    /// ParseLine(message)
+    #[error("Failed to parse line: {0}'")]
+    ParseLine(String),
+
+    #[error("Config value '{0}' is not valid")]
+    InvalidValue(String),
+
+    #[error("Config '{0}' not found")]
+    NotFound(String),
+
+    #[error("Config value '{0}' is not supported")]
+    UnsupportedValue(String),
+}
diff --git a/crates/core/src/config/internal.rs 
b/crates/core/src/config/internal.rs
index 4fdc276..61e1a88 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -23,11 +23,9 @@ use std::str::FromStr;
 
 use strum_macros::EnumIter;
 
-use crate::{
-    config::{ConfigParser, HudiConfigValue},
-    CoreError::{ConfigNotFound, InvalidConfig},
-    Result,
-};
+use crate::config::error::ConfigError::{NotFound, ParseBool};
+use crate::config::Result;
+use crate::config::{ConfigParser, HudiConfigValue};
 
 /// Configurations for internal use.
 ///
@@ -67,15 +65,12 @@ impl ConfigParser for HudiInternalConfig {
         let get_result = configs
             .get(self.as_ref())
             .map(|v| v.as_str())
-            .ok_or(ConfigNotFound(self.as_ref().to_string()));
+            .ok_or(NotFound(self.key()));
 
         match self {
             Self::SkipConfigValidation => get_result
                 .and_then(|v| {
-                    bool::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::SkipConfigValidation.as_ref(),
-                        source: Box::new(e),
-                    })
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
         }
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index f586a89..bc554bd 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -21,14 +21,18 @@ use std::any::type_name;
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use crate::{storage::utils::parse_uri, Result};
+use crate::config::error::{ConfigError, Result};
+use crate::storage::error::Result as StorageResult;
+use crate::storage::util::parse_uri;
 use serde::{Deserialize, Serialize};
 use url::Url;
+use ConfigError::NotFound;
 
+pub mod error;
 pub mod internal;
 pub mod read;
 pub mod table;
-pub mod utils;
+pub mod util;
 
 pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
 
@@ -40,6 +44,10 @@ pub trait ConfigParser: AsRef<str> {
     /// Supplies the default value of the configuration.
     fn default_value(&self) -> Option<Self::Output>;
 
+    fn key(&self) -> String {
+        self.as_ref().to_string()
+    }
+
     /// To indicate if the configuration is required or not, this helps in 
validation.
     fn is_required(&self) -> bool {
         false
@@ -50,8 +58,7 @@ pub trait ConfigParser: AsRef<str> {
         match self.parse_value(configs) {
             Ok(_) => Ok(()),
             Err(e) => {
-                if !self.is_required() && e.to_string().ends_with("not found") 
{
-                    // TODO: introduce error type to avoid checking "not found"
+                if !self.is_required() && matches!(e, NotFound(_)) {
                     Ok(())
                 } else {
                     Err(e)
@@ -98,7 +105,7 @@ impl HudiConfigValue {
 
     /// A convenience method to convert [HudiConfigValue] to [Url] when the 
value is a [String] and is intended to be a URL.
     /// Panic if the value is not a [String].
-    pub fn to_url(self) -> Result<Url> {
+    pub fn to_url(self) -> StorageResult<Url> {
         match self {
             HudiConfigValue::String(v) => parse_uri(&v),
             _ => panic!(
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index a938953..ece692e 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -23,11 +23,9 @@ use std::str::FromStr;
 
 use strum_macros::EnumIter;
 
-use crate::{
-    config::{ConfigParser, HudiConfigValue},
-    CoreError::{ConfigNotFound, InvalidConfig},
-    Result,
-};
+use crate::config::error::ConfigError::{NotFound, ParseInt};
+use crate::config::Result;
+use crate::config::{ConfigParser, HudiConfigValue};
 
 /// Configurations for reading Hudi tables.
 ///
@@ -79,15 +77,12 @@ impl ConfigParser for HudiReadConfig {
         let get_result = configs
             .get(self.as_ref())
             .map(|v| v.as_str())
-            .ok_or(ConfigNotFound(self.as_ref().to_string()));
+            .ok_or(NotFound(self.key()));
 
         match self {
             Self::InputPartitions => get_result
                 .and_then(|v| {
-                    usize::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::InputPartitions.as_ref(),
-                        source: Box::new(e),
-                    })
+                    usize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::UInteger),
             Self::AsOfTimestamp => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
@@ -97,10 +92,8 @@ impl ConfigParser for HudiReadConfig {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use crate::config::read::HudiReadConfig::InputPartitions;
-    use crate::config::ConfigParser;
-    use crate::CoreError::InvalidConfig;
-    use std::collections::HashMap;
 
     #[test]
     fn parse_valid_config_value() {
@@ -113,10 +106,7 @@ mod tests {
     fn parse_invalid_config_value() {
         let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"foo".to_string())]);
         let value = InputPartitions.parse_value(&options);
-        assert!(matches!(
-            value.unwrap_err(),
-            InvalidConfig { item: _, source: _ }
-        ));
+        assert!(matches!(value.unwrap_err(), ParseInt(_, _, _)));
         assert_eq!(
             InputPartitions
                 .parse_value_or_default(&options)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index bb19066..5b82b4c 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -23,11 +23,12 @@ use std::str::FromStr;
 
 use strum_macros::{AsRefStr, EnumIter};
 
-use crate::{
-    config::{ConfigParser, HudiConfigValue},
-    CoreError::{self, ConfigNotFound, InvalidConfig, Unsupported},
-    Result,
+use crate::config::error::ConfigError;
+use crate::config::error::ConfigError::{
+    InvalidValue, NotFound, ParseBool, ParseInt, UnsupportedValue,
 };
+use crate::config::Result;
+use crate::config::{ConfigParser, HudiConfigValue};
 
 /// Configurations for Hudi tables, most of them are persisted in 
`hoodie.properties`.
 ///
@@ -148,7 +149,7 @@ impl ConfigParser for HudiTableConfig {
         let get_result = configs
             .get(self.as_ref())
             .map(|v| v.as_str())
-            .ok_or(ConfigNotFound(self.as_ref().to_string()));
+            .ok_or(NotFound(self.key()));
 
         match self {
             Self::BaseFileFormat => get_result
@@ -157,35 +158,23 @@ impl ConfigParser for HudiTableConfig {
             Self::BasePath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::Checksum => get_result
                 .and_then(|v| {
-                    isize::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::Checksum.as_ref(),
-                        source: Box::new(e),
-                    })
+                    isize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Integer),
             Self::DatabaseName => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::DropsPartitionFields => get_result
                 .and_then(|v| {
-                    bool::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::DropsPartitionFields.as_ref(),
-                        source: Box::new(e),
-                    })
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
             Self::IsHiveStylePartitioning => get_result
                 .and_then(|v| {
-                    bool::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::IsHiveStylePartitioning.as_ref(),
-                        source: Box::new(e),
-                    })
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
             Self::IsPartitionPathUrlencoded => get_result
                 .and_then(|v| {
-                    bool::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::IsPartitionPathUrlencoded.as_ref(),
-                        source: Box::new(e),
-                    })
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
             Self::KeyGeneratorClass => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
@@ -194,10 +183,7 @@ impl ConfigParser for HudiTableConfig {
             Self::PrecombineField => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::PopulatesMetaFields => get_result
                 .and_then(|v| {
-                    bool::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::PopulatesMetaFields.as_ref(),
-                        source: Box::new(e),
-                    })
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
             Self::RecordKeyFields => get_result
@@ -208,18 +194,12 @@ impl ConfigParser for HudiTableConfig {
                 .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
             Self::TableVersion => get_result
                 .and_then(|v| {
-                    isize::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::TableVersion.as_ref(),
-                        source: Box::new(e),
-                    })
+                    isize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Integer),
             Self::TimelineLayoutVersion => get_result
                 .and_then(|v| {
-                    isize::from_str(v).map_err(|e| InvalidConfig {
-                        item: Self::TimelineLayoutVersion.as_ref(),
-                        source: Box::new(e),
-                    })
+                    isize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Integer),
         }
@@ -236,13 +216,13 @@ pub enum TableTypeValue {
 }
 
 impl FromStr for TableTypeValue {
-    type Err = CoreError;
+    type Err = ConfigError;
 
-    fn from_str(s: &str) -> Result<Self> {
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s.to_ascii_lowercase().as_str() {
             "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
             "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
-            v => Err(Unsupported(format!("Unsupported table type {}", v))),
+            v => Err(InvalidValue(v.to_string())),
         }
     }
 }
@@ -255,21 +235,20 @@ pub enum BaseFileFormatValue {
 }
 
 impl FromStr for BaseFileFormatValue {
-    type Err = CoreError;
+    type Err = ConfigError;
 
-    fn from_str(s: &str) -> Result<Self> {
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s.to_ascii_lowercase().as_str() {
             "parquet" => Ok(Self::Parquet),
-            v => Err(Unsupported(format!("Unsupported base file format {}", 
v))),
+            "orc" => Err(UnsupportedValue(s.to_string())),
+            v => Err(InvalidValue(v.to_string())),
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::str::FromStr;
-
-    use crate::config::table::{BaseFileFormatValue, TableTypeValue};
+    use super::*;
 
     #[test]
     fn create_table_type() {
@@ -297,10 +276,22 @@ mod tests {
             TableTypeValue::from_str("Merge-on-read").unwrap(),
             TableTypeValue::MergeOnRead
         );
-        assert!(TableTypeValue::from_str("").is_err());
-        assert!(TableTypeValue::from_str("copyonwrite").is_err());
-        assert!(TableTypeValue::from_str("MERGEONREAD").is_err());
-        assert!(TableTypeValue::from_str("foo").is_err());
+        assert!(matches!(
+            TableTypeValue::from_str("").unwrap_err(),
+            InvalidValue(_)
+        ));
+        assert!(matches!(
+            TableTypeValue::from_str("copyonwrite").unwrap_err(),
+            InvalidValue(_)
+        ));
+        assert!(matches!(
+            TableTypeValue::from_str("MERGEONREAD").unwrap_err(),
+            InvalidValue(_)
+        ));
+        assert!(matches!(
+            TableTypeValue::from_str("foo").unwrap_err(),
+            InvalidValue(_)
+        ));
     }
 
     #[test]
@@ -313,10 +304,13 @@ mod tests {
             BaseFileFormatValue::from_str("PArquet").unwrap(),
             BaseFileFormatValue::Parquet
         );
-        assert!(TableTypeValue::from_str("").is_err());
-        assert!(
-            TableTypeValue::from_str("orc").is_err(),
-            "orc is not yet supported."
-        );
+        assert!(matches!(
+            BaseFileFormatValue::from_str("").unwrap_err(),
+            InvalidValue(_)
+        ));
+        assert!(matches!(
+            BaseFileFormatValue::from_str("orc").unwrap_err(),
+            UnsupportedValue(_)
+        ));
     }
 }
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/util.rs
similarity index 95%
rename from crates/core/src/config/utils.rs
rename to crates/core/src/config/util.rs
index 85784ec..33921c6 100644
--- a/crates/core/src/config/utils.rs
+++ b/crates/core/src/config/util.rs
@@ -18,12 +18,12 @@
  */
 //! Config utilities.
 
+use crate::config::error::ConfigError::ParseLine;
+use crate::config::Result;
 use bytes::Bytes;
 use std::collections::HashMap;
 use std::io::{BufRead, BufReader, Cursor};
 
-use crate::{CoreError, Result};
-
 /// Returns an empty iterator to represent an empty set of options.
 pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
     std::iter::empty::<(&str, &str)>()
@@ -58,7 +58,7 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: 
&str) -> Result<HashMap
     let mut options = HashMap::new();
 
     for line in lines {
-        let line = line.map_err(|e| CoreError::Internal(format!("Failed to 
read line {e}")))?;
+        let line = line.map_err(|e| ParseLine(format!("Failed to parse config 
data: {:?}", e)))?;
         let trimmed_line = line.trim();
         if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
             continue;
@@ -66,9 +66,10 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: 
&str) -> Result<HashMap
         let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
         let key = parts
             .next()
-            .ok_or(CoreError::Internal(
-                "Missing key in config line".to_string(),
-            ))?
+            .ok_or(ParseLine(format!(
+                "Missing key in config line: {}",
+                trimmed_line
+            )))?
             .trim()
             .to_owned();
         let value = parts.next().unwrap_or("").trim().to_owned();
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
new file mode 100644
index 0000000..6d4f0b4
--- /dev/null
+++ b/crates/core/src/error.rs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::error::ConfigError;
+use crate::storage::error::StorageError;
+use thiserror::Error;
+
+pub type Result<T, E = CoreError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum CoreError {
+    #[error(transparent)]
+    ArrowError(#[from] arrow::error::ArrowError),
+
+    #[error("Config error: {0}")]
+    Config(#[from] ConfigError),
+
+    #[error("File group error: {0}")]
+    FileGroup(String),
+
+    #[error("{0}: {1:?}")]
+    ReadFileSliceError(String, StorageError),
+
+    #[error("{0}")]
+    InvalidPartitionPath(String),
+
+    #[error(transparent)]
+    ParquetError(#[from] parquet::errors::ParquetError),
+
+    #[error("Storage error: {0}")]
+    Storage(#[from] StorageError),
+
+    #[error("Timeline error: {0}")]
+    Timeline(String),
+
+    #[error("{0}")]
+    Unsupported(String),
+
+    #[error(transparent)]
+    Utf8Error(#[from] std::str::Utf8Error),
+}
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 2fde1bf..58fb144 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,17 +22,17 @@
 
 pub mod reader;
 
+use crate::error::CoreError;
+use crate::storage::file_info::FileInfo;
+use crate::storage::file_stats::FileStats;
+use crate::storage::Storage;
+use crate::Result;
 use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
 use std::hash::{Hash, Hasher};
 use std::path::PathBuf;
 
-use crate::storage::file_info::FileInfo;
-use crate::storage::file_stats::FileStats;
-use crate::storage::Storage;
-use crate::{CoreError::Internal, Result};
-
 /// Represents common metadata about a Hudi Base File.
 #[derive(Clone, Debug)]
 pub struct BaseFile {
@@ -52,10 +52,16 @@ impl BaseFile {
         let err_msg = format!("Failed to parse file name '{}' for base file.", 
file_name);
         let (name, _) = file_name
             .rsplit_once('.')
-            .ok_or(Internal(err_msg.clone()))?;
+            .ok_or(CoreError::FileGroup(err_msg.clone()))?;
         let parts: Vec<&str> = name.split('_').collect();
-        let file_group_id = 
parts.first().ok_or(Internal(err_msg.clone()))?.to_string();
-        let commit_time = 
parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string();
+        let file_group_id = parts
+            .first()
+            .ok_or(CoreError::FileGroup(err_msg.clone()))?
+            .to_string();
+        let commit_time = parts
+            .get(2)
+            .ok_or(CoreError::FileGroup(err_msg.clone()))?
+            .to_string();
         Ok((file_group_id, commit_time))
     }
 
@@ -190,7 +196,7 @@ impl FileGroup {
     pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
         let commit_time = base_file.commit_time.as_str();
         if self.file_slices.contains_key(commit_time) {
-            Err(Internal(format!(
+            Err(CoreError::FileGroup(format!(
                 "Commit time {0} is already present in File Group {1}",
                 commit_time.to_owned(),
                 self.id,
@@ -243,6 +249,18 @@ mod tests {
         assert_eq!(base_file.commit_time, "20240402144910683");
     }
 
+    #[test]
+    fn create_a_base_file_returns_error() {
+        let result = BaseFile::from_file_name("no_file_extension");
+        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+        let result = BaseFile::from_file_name(".parquet");
+        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+        let result = BaseFile::from_file_name("no-valid-delimiter.parquet");
+        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+    }
+
     #[test]
     fn load_a_valid_file_group() {
         let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
@@ -277,7 +295,7 @@ mod tests {
             
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet",
         );
         assert!(res2.is_err());
-        assert_eq!(res2.unwrap_err().to_string(), "Commit time 
20240402144910683 is already present in File Group 
5a226868-2934-4f84-a16f-55124630c68d-0");
+        assert_eq!(res2.unwrap_err().to_string(), "File group error: Commit 
time 20240402144910683 is already present in File Group 
5a226868-2934-4f84-a16f-55124630c68d-0");
     }
 
     #[test]
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index eeb283a..165ed9c 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -17,12 +17,14 @@
  * under the License.
  */
 use crate::config::table::HudiTableConfig;
-use crate::config::utils::split_hudi_options_from_others;
+use crate::config::util::split_hudi_options_from_others;
 use crate::config::HudiConfigs;
+use crate::error::CoreError::ReadFileSliceError;
 use crate::file_group::FileSlice;
 use crate::storage::Storage;
 use crate::Result;
 use arrow_array::RecordBatch;
+use futures::TryFutureExt;
 use std::sync::Arc;
 
 /// File group reader handles all read operations against a file group.
@@ -58,7 +60,15 @@ impl FileGroupReader {
         &self,
         relative_path: &str,
     ) -> Result<RecordBatch> {
-        self.storage.get_parquet_file_data(relative_path).await
+        self.storage
+            .get_parquet_file_data(relative_path)
+            .map_err(|e| {
+                ReadFileSliceError(
+                    format!("Failed to read file slice at path '{}'", 
relative_path),
+                    e,
+                )
+            })
+            .await
     }
 
     pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
@@ -91,4 +101,16 @@ mod tests {
             .contains(HudiTableConfig::BasePath));
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_read_file_slice_returns_error() {
+        let storage =
+            
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
+                .unwrap();
+        let reader = FileGroupReader::new(storage);
+        let result = reader
+            .read_file_slice_by_base_file_path("non_existent_file")
+            .await;
+        assert!(matches!(result.unwrap_err(), ReadFileSliceError(_, _)));
+    }
 }
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 6485c1d..42079a1 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -44,53 +44,10 @@
 //! ```
 
 pub mod config;
+pub mod error;
 pub mod file_group;
 pub mod storage;
 pub mod table;
 pub mod util;
 
-use thiserror::Error;
-
-#[derive(Error, Debug)]
-pub enum CoreError {
-    #[error("Config '{0}' not found")]
-    ConfigNotFound(String),
-
-    #[error("Invalid config item '{item}', {source:?}")]
-    InvalidConfig {
-        item: &'static str,
-        source: Box<dyn std::error::Error + Sync + Send + 'static>,
-    },
-
-    #[error("Parse url '{url}' failed, {source}")]
-    UrlParse {
-        url: String,
-        source: url::ParseError,
-    },
-
-    #[error("Invalid file path '{name}', {detail}")]
-    InvalidPath { name: String, detail: String },
-
-    #[error("{0}")]
-    Unsupported(String),
-
-    #[error("{0}")]
-    Internal(String),
-
-    #[error(transparent)]
-    Utf8Error(#[from] std::str::Utf8Error),
-
-    #[error(transparent)]
-    ObjectStore(#[from] object_store::Error),
-
-    #[error(transparent)]
-    ObjectStorePath(#[from] object_store::path::Error),
-
-    #[error(transparent)]
-    Parquet(#[from] parquet::errors::ParquetError),
-
-    #[error(transparent)]
-    Arrow(#[from] arrow::error::ArrowError),
-}
-
-type Result<T, E = CoreError> = std::result::Result<T, E>;
+use error::Result;
diff --git a/crates/core/src/storage/error.rs b/crates/core/src/storage/error.rs
new file mode 100644
index 0000000..09c749d
--- /dev/null
+++ b/crates/core/src/storage/error.rs
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use thiserror::Error;
+
+pub type Result<T, E = StorageError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum StorageError {
+    #[error(transparent)]
+    ArrowError(#[from] arrow::error::ArrowError),
+
+    #[error("Failed to create storage: {0}")]
+    Creation(String),
+
+    #[error("Invalid path: {0}")]
+    InvalidPath(String),
+
+    #[error(transparent)]
+    ObjectStoreError(#[from] object_store::Error),
+
+    #[error(transparent)]
+    ObjectStorePathError(#[from] object_store::path::Error),
+
+    #[error(transparent)]
+    ParquetError(#[from] parquet::errors::ParquetError),
+
+    #[error(transparent)]
+    UrlParseError(#[from] url::ParseError),
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 0a9f0e1..7f31011 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -36,13 +36,15 @@ use url::Url;
 
 use crate::config::table::HudiTableConfig;
 use crate::config::HudiConfigs;
+use crate::storage::error::Result;
+use crate::storage::error::StorageError::{Creation, InvalidPath};
 use crate::storage::file_info::FileInfo;
-use crate::storage::utils::join_url_segments;
-use crate::{CoreError, Result};
+use crate::storage::util::join_url_segments;
 
+pub mod error;
 pub mod file_info;
 pub mod file_stats;
-pub mod utils;
+pub mod util;
 
 #[allow(dead_code)]
 #[derive(Clone, Debug)]
@@ -60,14 +62,15 @@ impl Storage {
         options: Arc<HashMap<String, String>>,
         hudi_configs: Arc<HudiConfigs>,
     ) -> Result<Arc<Storage>> {
-        if !hudi_configs.contains(HudiTableConfig::BasePath) {
-            return Err(CoreError::Internal(format!(
-                "Failed to create storage: {} is required.",
-                HudiTableConfig::BasePath.as_ref()
-            )));
-        }
-
-        let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?;
+        let base_url = match hudi_configs.try_get(HudiTableConfig::BasePath) {
+            Some(v) => v.to_url()?,
+            None => {
+                return Err(Creation(format!(
+                    "{} is required.",
+                    HudiTableConfig::BasePath.as_ref()
+                )))
+            }
+        };
 
         match parse_url_opts(&base_url, options.as_ref()) {
             Ok((object_store, _)) => Ok(Arc::new(Storage {
@@ -76,7 +79,7 @@ impl Storage {
                 options,
                 hudi_configs,
             })),
-            Err(e) => Err(CoreError::ObjectStore(e)),
+            Err(e) => Err(Creation(format!("Failed to create storage: {}", 
e))),
         }
     }
 
@@ -109,10 +112,10 @@ impl Storage {
         let uri = obj_url.to_string();
         let name = obj_path
             .filename()
-            .ok_or(CoreError::InvalidPath {
-                name: obj_path.to_string(),
-                detail: "failed to get file name".to_string(),
-            })?
+            .ok_or(InvalidPath(format!(
+                "Failed to get file name from {:?}",
+                obj_path
+            )))?
             .to_string();
         Ok(FileInfo {
             uri,
@@ -176,10 +179,10 @@ impl Storage {
         for dir in dir_paths {
             dirs.push(
                 dir.filename()
-                    .ok_or(CoreError::InvalidPath {
-                        name: dir.to_string(),
-                        detail: "failed to get file name".to_string(),
-                    })?
+                    .ok_or(InvalidPath(format!(
+                        "Failed to get file name from: {:?}",
+                        dir
+                    )))?
                     .to_string(),
             )
         }
@@ -208,10 +211,10 @@ impl Storage {
             let name = obj_meta
                 .location
                 .filename()
-                .ok_or(CoreError::InvalidPath {
-                    name: obj_meta.location.to_string(),
-                    detail: "failed to get file name".to_string(),
-                })?
+                .ok_or(InvalidPath(format!(
+                    "Failed to get file name from {:?}",
+                    obj_meta.location
+                )))?
                 .to_string();
             let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
             file_info.push(FileInfo {
@@ -246,10 +249,10 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: 
Option<&str>) -> Result<Ve
                 next_subdir.push(curr);
             }
             next_subdir.push(child_dir);
-            let next_subdir = 
next_subdir.to_str().ok_or(CoreError::InvalidPath {
-                name: format!("{:?}", next_subdir),
-                detail: "failed to convert path".to_string(),
-            })?;
+            let next_subdir = next_subdir.to_str().ok_or(InvalidPath(format!(
+                "Failed to convert path: {:?}",
+                next_subdir
+            )))?;
             let curr_leaf_dir = get_leaf_dirs(storage, 
Some(next_subdir)).await?;
             leaf_dirs.extend(curr_leaf_dir);
         }
@@ -264,12 +267,6 @@ mod tests {
     use std::fs::canonicalize;
     use std::path::Path;
 
-    use crate::storage::file_info::FileInfo;
-    use crate::storage::utils::join_url_segments;
-    use crate::storage::{get_leaf_dirs, Storage};
-    use object_store::path::Path as ObjPath;
-    use url::Url;
-
     #[test]
     fn test_storage_new_error_no_base_path() {
         let options = Arc::new(HashMap::new());
@@ -299,7 +296,7 @@ mod tests {
             result.is_err(),
             "Should return error when no base path is invalid."
         );
-        assert!(matches!(result.unwrap_err(), CoreError::ObjectStore(_)));
+        assert!(matches!(result.unwrap_err(), Creation(_)));
     }
 
     #[tokio::test]
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/util.rs
similarity index 82%
rename from crates/core/src/storage/utils.rs
rename to crates/core/src/storage/util.rs
index 10198a1..e3f32b9 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/util.rs
@@ -19,12 +19,10 @@
 
 //! Utility functions for storage.
 use std::path::Path;
-use url::{ParseError, Url};
+use url::Url;
 
-use crate::{
-    CoreError::{InvalidPath, UrlParse},
-    Result,
-};
+use crate::storage::error::StorageError::{InvalidPath, UrlParseError};
+use crate::storage::Result;
 
 /// Splits a filename into a stem and an extension.
 pub fn split_filename(filename: &str) -> Result<(String, String)> {
@@ -33,10 +31,7 @@ pub fn split_filename(filename: &str) -> Result<(String, 
String)> {
     let stem = path
         .file_stem()
         .and_then(|s| s.to_str())
-        .ok_or_else(|| InvalidPath {
-            name: filename.to_string(),
-            detail: "no file stem found".to_string(),
-        })?
+        .ok_or(InvalidPath(format!("No file stem found in {}", filename)))?
         .to_string();
 
     let extension = path
@@ -52,19 +47,12 @@ pub fn split_filename(filename: &str) -> Result<(String, 
String)> {
 pub fn parse_uri(uri: &str) -> Result<Url> {
     let mut url = match Url::parse(uri) {
         Ok(url) => url,
-        Err(source) => Url::from_directory_path(uri).map_err(|_| UrlParse {
-            url: uri.to_string(),
-            source,
-        })?,
+        Err(e) => Url::from_directory_path(uri).map_err(|_| UrlParseError(e))?,
     };
 
     if url.path().ends_with('/') {
-        url.path_segments_mut()
-            .map_err(|_| InvalidPath {
-                name: uri.to_string(),
-                detail: "parse uri failed".to_string(),
-            })?
-            .pop();
+        let err = InvalidPath(format!("Url {:?} cannot be a base", url));
+        url.path_segments_mut().map_err(|_| err)?.pop();
     }
 
     Ok(url)
@@ -85,12 +73,8 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) 
-> Result<Url> {
 
     for &seg in segments {
         let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect();
-        url.path_segments_mut()
-            .map_err(|_| UrlParse {
-                url: base_url.to_string(),
-                source: ParseError::RelativeUrlWithoutBase,
-            })?
-            .extend(segs);
+        let err = InvalidPath(format!("Url {:?} cannot be a base", url));
+        url.path_segments_mut().map_err(|_| err)?.extend(segs);
     }
 
     Ok(url)
@@ -98,12 +82,9 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) 
-> Result<Url> {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use std::str::FromStr;
 
-    use url::Url;
-
-    use crate::storage::utils::{join_url_segments, parse_uri};
-
     #[test]
     fn parse_valid_uri_in_various_forms() {
         let urls = vec![
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index eca7d2b..20df88e 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -31,13 +31,14 @@ use crate::config::read::HudiReadConfig;
 use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, 
TableVersion};
 use crate::config::table::TableTypeValue::CopyOnWrite;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
-use crate::config::utils::{parse_data_for_options, 
split_hudi_options_from_others};
+use crate::config::util::{parse_data_for_options, 
split_hudi_options_from_others};
 use crate::config::{HudiConfigs, HUDI_CONF_DIR};
+use crate::error::CoreError;
 use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
 use crate::table::timeline::Timeline;
 use crate::table::Table;
-use crate::{CoreError, Result};
+use crate::Result;
 
 /// Builder for creating a [Table] instance.
 #[derive(Debug, Clone)]
@@ -185,7 +186,9 @@ impl TableBuilder {
         let hudi_options = &mut self.hudi_options;
         Self::imbue_table_properties(hudi_options, storage.clone()).await?;
 
-        // TODO load Hudi configs from env vars here before loading global 
configs
+        // TODO support imbuing Hudi options from env vars HOODIE_ENV.*
+        // (see https://hudi.apache.org/docs/next/s3_hoodie)
+        // before loading global configs
 
         Self::imbue_global_hudi_configs_if_absent(hudi_options, 
storage.clone()).await
     }
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index ee68eec..092b352 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,8 +25,9 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
 
+use crate::error::CoreError;
 use crate::table::partition::PartitionPruner;
-use crate::{CoreError, Result};
+use crate::Result;
 use dashmap::DashMap;
 use futures::stream::{self, StreamExt, TryStreamExt};
 
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4836d1c..da32a10 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -60,7 +60,7 @@
 //! ```rust
 //! use url::Url;
 //! use hudi_core::table::Table;
-//! use hudi_core::storage::utils::parse_uri;
+//! use hudi_core::storage::util::parse_uri;
 //!
 //! pub async fn test() {
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
@@ -95,13 +95,14 @@ use crate::config::read::HudiReadConfig::AsOfTimestamp;
 use crate::config::table::HudiTableConfig;
 use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::HudiConfigs;
+use crate::error::CoreError;
 use crate::file_group::reader::FileGroupReader;
 use crate::file_group::FileSlice;
 use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
 use crate::table::timeline::Timeline;
-use crate::{CoreError, Result};
+use crate::Result;
 
 pub mod builder;
 mod fs_view;
@@ -137,7 +138,10 @@ impl Table {
     }
 
     pub fn base_url(&self) -> Result<Url> {
-        self.hudi_configs.get(HudiTableConfig::BasePath)?.to_url()
+        self.hudi_configs
+            .get(HudiTableConfig::BasePath)?
+            .to_url()
+            .map_err(CoreError::from)
     }
 
     pub fn hudi_options(&self) -> HashMap<String, String> {
@@ -161,6 +165,7 @@ impl Table {
             .register_object_store(runtime_env.clone());
     }
 
+    /// Get the latest [Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
     }
@@ -259,19 +264,10 @@ impl Table {
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<RecordBatch>> {
         let file_slices = self.get_file_slices_as_of(timestamp, 
filters).await?;
-        let mut batches = Vec::new();
         let fg_reader = self.create_file_group_reader();
-        for f in file_slices {
-            match fg_reader.read_file_slice(&f).await {
-                Ok(batch) => batches.push(batch),
-                Err(e) => {
-                    return Err(CoreError::Internal(format!(
-                        "Failed to read file slice {:?} - {}",
-                        f, e
-                    )))
-                }
-            }
-        }
+        let batches =
+            futures::future::try_join_all(file_slices.iter().map(|f| 
fg_reader.read_file_slice(f)))
+                .await?;
         Ok(batches)
     }
 
@@ -294,16 +290,14 @@ impl Table {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use arrow_array::StringArray;
-    use std::collections::HashSet;
     use std::fs::canonicalize;
     use std::path::PathBuf;
     use std::{env, panic};
-    use url::Url;
 
     use hudi_tests::{assert_not, TestTable};
 
-    use crate::config::read::HudiReadConfig::AsOfTimestamp;
     use crate::config::table::HudiTableConfig::{
         BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields, 
IsHiveStylePartitioning,
         IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields, 
PopulatesMetaFields,
@@ -311,9 +305,8 @@ mod tests {
         TimelineLayoutVersion,
     };
     use crate::config::HUDI_CONF_DIR;
-    use crate::storage::utils::join_url_segments;
+    use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
-    use crate::table::Table;
 
     /// Test helper to create a new `Table` instance without validating the 
configuration.
     ///
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 8f03521..ca41f73 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -18,7 +18,9 @@
  */
 use crate::config::table::HudiTableConfig;
 use crate::config::HudiConfigs;
-use crate::{CoreError, Result};
+use crate::error::CoreError;
+use crate::error::CoreError::{InvalidPartitionPath, Unsupported};
+use crate::Result;
 use arrow_array::{ArrayRef, Scalar, StringArray};
 use arrow_cast::{cast_with_options, CastOptions};
 use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
@@ -119,7 +121,7 @@ impl PartitionPruner {
         let parts: Vec<&str> = partition_path.split('/').collect();
 
         if parts.len() != self.schema.fields().len() {
-            return Err(CoreError::Internal(format!(
+            return Err(InvalidPartitionPath(format!(
                 "Partition path should have {} part(s) but got {}",
                 self.schema.fields().len(),
                 parts.len()
@@ -132,14 +134,11 @@ impl PartitionPruner {
             .zip(parts)
             .map(|(field, part)| {
                 let value = if self.is_hive_style {
-                    let (name, value) = part.split_once('=').ok_or_else(|| {
-                        CoreError::Internal(format!(
-                            "Partition path should be hive-style but got {}",
-                            part
-                        ))
-                    })?;
+                    let (name, value) = 
part.split_once('=').ok_or(InvalidPartitionPath(
+                        format!("Partition path should be hive-style but got 
{}", part),
+                    ))?;
                     if name != field.name() {
-                        return Err(CoreError::Internal(format!(
+                        return Err(InvalidPartitionPath(format!(
                             "Partition path should contain {} but got {}",
                             field.name(),
                             name
@@ -179,13 +178,13 @@ impl Operator {
 }
 
 impl FromStr for Operator {
-    type Err = crate::CoreError;
+    type Err = CoreError;
 
-    fn from_str(s: &str) -> Result<Self> {
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
         Operator::TOKEN_OP_PAIRS
             .iter()
             .find_map(|&(token, op)| if token == s { Some(op) } else { None })
-            .ok_or_else(|| CoreError::Internal(format!("Unsupported operator: 
{}", s)))
+            .ok_or(Unsupported(format!("Unsupported operator: {}", s)))
     }
 }
 
@@ -198,9 +197,11 @@ pub struct PartitionFilter {
 }
 
 impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
-    type Error = crate::CoreError;
+    type Error = CoreError;
 
-    fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) -> 
Result<Self> {
+    fn try_from(
+        (filter, partition_schema): ((&str, &str, &str), &Schema),
+    ) -> Result<Self, Self::Error> {
         let (field_name, operator_str, value_str) = filter;
 
         let field: &Field = partition_schema.field_with_name(field_name)?;
@@ -435,6 +436,13 @@ mod tests {
         let configs = create_hudi_configs(true, false);
         let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
 
-        assert!(pruner.parse_segments("invalid/path").is_err());
+        let result = 
pruner.parse_segments("date=2023-02-01/category=A/count=10/extra");
+        assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
+
+        let result = pruner.parse_segments("date=2023-02-01/category=A/10");
+        assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
+
+        let result = 
pruner.parse_segments("date=2023-02-01/category=A/non_exist_field=10");
+        assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
     }
 }
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index d147abf..d48aa55 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -28,10 +28,12 @@ use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
 
 use crate::config::HudiConfigs;
+use crate::error::CoreError;
 use crate::file_group::FileGroup;
-use crate::storage::utils::split_filename;
+use crate::storage::error::StorageError;
+use crate::storage::util::split_filename;
 use crate::storage::Storage;
-use crate::{CoreError, Result};
+use crate::Result;
 
 /// The [State] of an [Instant] represents the status of the action performed 
on the table.
 #[allow(dead_code)]
@@ -80,10 +82,11 @@ impl Instant {
         commit_file_path.push(self.file_name());
         commit_file_path
             .to_str()
-            .ok_or(CoreError::Internal(format!(
+            .ok_or(StorageError::InvalidPath(format!(
                 "Failed to get file path for {:?}",
                 self
             )))
+            .map_err(CoreError::Storage)
             .map(|s| s.to_string())
     }
 
@@ -143,11 +146,15 @@ impl Timeline {
             .storage
             .get_file_data(instant.relative_path()?.as_str())
             .await?;
+        let err_msg = format!("Failed to get commit metadata for {:?}", 
instant);
         let json: Value = serde_json::from_slice(&bytes)
-            .map_err(|e| CoreError::Internal(format!("Invalid instant {:?}", 
e)))?;
+            .map_err(|e| CoreError::Timeline(format!("{}: {:?}", err_msg, 
e)))?;
         let commit_metadata = json
             .as_object()
-            .ok_or_else(|| CoreError::Internal("Expected JSON 
object".to_string()))?
+            .ok_or(CoreError::Timeline(format!(
+                "{}: not a JSON object",
+                err_msg
+            )))?
             .clone();
         Ok(commit_metadata)
     }
@@ -178,7 +185,7 @@ impl Timeline {
                 None,
             )?)
         } else {
-            Err(CoreError::Internal(
+            Err(CoreError::Timeline(
                 "Failed to resolve the latest schema: no file path 
found".to_string(),
             ))
         }
@@ -216,6 +223,7 @@ impl Timeline {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use std::collections::HashMap;
     use std::fs::canonicalize;
     use std::path::Path;
@@ -226,8 +234,6 @@ mod tests {
     use hudi_tests::TestTable;
 
     use crate::config::table::HudiTableConfig;
-    use crate::config::HudiConfigs;
-    use crate::table::timeline::{Instant, State, Timeline};
 
     async fn create_test_timeline(base_url: Url) -> Timeline {
         Timeline::new(
@@ -254,7 +260,7 @@ mod tests {
         assert!(table_schema.is_err());
         assert_eq!(
             table_schema.err().unwrap().to_string(),
-            "Failed to resolve the latest schema: no file path found"
+            "Timeline error: Failed to resolve the latest schema: no file path 
found"
         )
     }
 
@@ -281,4 +287,41 @@ mod tests {
             ]
         )
     }
+
+    #[tokio::test]
+    async fn get_commit_metadata_returns_error() {
+        let base_url = Url::from_file_path(
+            canonicalize(Path::new(
+                "tests/data/timeline/commits_with_invalid_content",
+            ))
+            .unwrap(),
+        )
+        .unwrap();
+        let timeline = create_test_timeline(base_url).await;
+        let instant = Instant {
+            state: State::Completed,
+            action: "commit".to_owned(),
+            timestamp: "20240402123035233".to_owned(),
+        };
+
+        // Test error when reading empty commit metadata file
+        let result = timeline.get_commit_metadata(&instant).await;
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(matches!(err, CoreError::Timeline(_)));
+        assert!(err.to_string().contains("Failed to get commit metadata"));
+
+        let instant = Instant {
+            state: State::Completed,
+            action: "commit".to_owned(),
+            timestamp: "20240402144910683".to_owned(),
+        };
+
+        // Test error when reading a commit metadata file with invalid JSON
+        let result = timeline.get_commit_metadata(&instant).await;
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(matches!(err, CoreError::Timeline(_)));
+        assert!(err.to_string().contains("not a JSON object"));
+    }
 }
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit.requested
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.inflight
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.inflight
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..cb64946
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit
@@ -0,0 +1,7 @@
+[
+    "should",
+    "not",
+    "be",
+    "an",
+    "array"
+]
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit.requested
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.inflight
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.inflight
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/hoodie.properties
new file mode 100644
index 0000000..e69de29
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 1bf17ea..b5790b8 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -40,8 +40,8 @@ use datafusion_expr::{CreateExternalTable, Expr, TableType};
 use datafusion_physical_expr::create_physical_expr;
 
 use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::config::utils::empty_options;
-use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
+use hudi_core::config::util::empty_options;
+use hudi_core::storage::util::{get_scheme_authority, parse_uri};
 use hudi_core::table::Table as HudiTable;
 
 /// Create a `HudiDataSource`.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 549a503..a6a3552 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -26,13 +26,14 @@ use arrow::pyarrow::ToPyArrow;
 use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
 use tokio::runtime::Runtime;
 
+use hudi::error::CoreError;
 use hudi::file_group::reader::FileGroupReader;
 use hudi::file_group::FileSlice;
+use hudi::storage::error::StorageError;
 use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
 use hudi::util::convert_vec_to_slice;
 use hudi::util::vec_to_slice;
-use hudi::CoreError;
 use pyo3::create_exception;
 use pyo3::exceptions::PyException;
 
@@ -45,7 +46,7 @@ fn convert_to_py_err(err: CoreError) -> PyErr {
 
 #[derive(thiserror::Error, Debug)]
 pub enum PythonError {
-    #[error("Error in Hudi core")]
+    #[error("Error in Hudi core: {0}")]
     HudiCore(#[from] CoreError),
 }
 
@@ -110,17 +111,17 @@ pub struct HudiFileSlice {
 #[pymethods]
 impl HudiFileSlice {
     fn base_file_relative_path(&self) -> PyResult<String> {
-        Ok(PathBuf::from(&self.partition_path)
+        let path = PathBuf::from(&self.partition_path)
             .join(&self.base_file_name)
             .to_str()
             .map(String::from)
-            .ok_or_else(|| {
-                CoreError::Internal(format!(
-                    "Failed to get base file relative path for file slice: 
{:?}",
-                    self
-                ))
-            })
-            .map_err(PythonError::from)?)
+            .ok_or(StorageError::InvalidPath(format!(
+                "Failed to get base file relative path for file slice: {:?}",
+                self
+            )))
+            .map_err(CoreError::from)
+            .map_err(PythonError::from)?;
+        Ok(path)
     }
 }
 

Reply via email to