This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0cab8ac refactor: reorganize custom error types (#215)
0cab8ac is described below
commit 0cab8ac1b35cf5024a82649e56216f2836b10de9
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Dec 5 23:15:08 2024 -1000
refactor: reorganize custom error types (#215)
---
crates/core/src/config/error.rs | 47 +++++++++++
crates/core/src/config/internal.rs | 15 ++--
crates/core/src/config/mod.rs | 17 ++--
crates/core/src/config/read.rs | 24 ++----
crates/core/src/config/table.rs | 96 ++++++++++------------
crates/core/src/config/{utils.rs => util.rs} | 13 +--
crates/core/src/error.rs | 56 +++++++++++++
crates/core/src/file_group/mod.rs | 38 ++++++---
crates/core/src/file_group/reader.rs | 26 +++++-
crates/core/src/lib.rs | 47 +----------
crates/core/src/storage/error.rs | 45 ++++++++++
crates/core/src/storage/mod.rs | 67 ++++++++-------
crates/core/src/storage/{utils.rs => util.rs} | 39 +++------
crates/core/src/table/builder.rs | 9 +-
crates/core/src/table/fs_view.rs | 3 +-
crates/core/src/table/mod.rs | 33 +++-----
crates/core/src/table/partition.rs | 38 +++++----
crates/core/src/table/timeline.rs | 61 ++++++++++++--
.../.hoodie/20240402123035233.commit | 0
.../.hoodie/20240402123035233.commit.requested | 0
.../.hoodie/20240402123035233.inflight | 0
.../.hoodie/20240402144910683.commit | 7 ++
.../.hoodie/20240402144910683.commit.requested | 0
.../.hoodie/20240402144910683.inflight | 0
.../.hoodie/hoodie.properties | 0
crates/datafusion/src/lib.rs | 4 +-
python/src/internal.rs | 21 ++---
27 files changed, 436 insertions(+), 270 deletions(-)
diff --git a/crates/core/src/config/error.rs b/crates/core/src/config/error.rs
new file mode 100644
index 0000000..6d852f6
--- /dev/null
+++ b/crates/core/src/config/error.rs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use std::num::ParseIntError;
+use std::str::ParseBoolError;
+use thiserror::Error;
+
+pub type Result<T, E = ConfigError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum ConfigError {
+ /// ParseBool(key, value, source)
+ #[error("Failed to parse '{1}' for config '{0}': {2}")]
+ ParseBool(String, String, ParseBoolError),
+
+ /// ParseInt(key, value, source)
+ #[error("Failed to parse '{1}' for config '{0}': {2}")]
+ ParseInt(String, String, ParseIntError),
+
+ /// ParseLine(message)
+ #[error("Failed to parse line: {0}'")]
+ ParseLine(String),
+
+ #[error("Config value '{0}' is not valid")]
+ InvalidValue(String),
+
+ #[error("Config '{0}' not found")]
+ NotFound(String),
+
+ #[error("Config value '{0}' is not supported")]
+ UnsupportedValue(String),
+}
diff --git a/crates/core/src/config/internal.rs
b/crates/core/src/config/internal.rs
index 4fdc276..61e1a88 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -23,11 +23,9 @@ use std::str::FromStr;
use strum_macros::EnumIter;
-use crate::{
- config::{ConfigParser, HudiConfigValue},
- CoreError::{ConfigNotFound, InvalidConfig},
- Result,
-};
+use crate::config::error::ConfigError::{NotFound, ParseBool};
+use crate::config::Result;
+use crate::config::{ConfigParser, HudiConfigValue};
/// Configurations for internal use.
///
@@ -67,15 +65,12 @@ impl ConfigParser for HudiInternalConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
- .ok_or(ConfigNotFound(self.as_ref().to_string()));
+ .ok_or(NotFound(self.key()));
match self {
Self::SkipConfigValidation => get_result
.and_then(|v| {
- bool::from_str(v).map_err(|e| InvalidConfig {
- item: Self::SkipConfigValidation.as_ref(),
- source: Box::new(e),
- })
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
}
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index f586a89..bc554bd 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -21,14 +21,18 @@ use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
-use crate::{storage::utils::parse_uri, Result};
+use crate::config::error::{ConfigError, Result};
+use crate::storage::error::Result as StorageResult;
+use crate::storage::util::parse_uri;
use serde::{Deserialize, Serialize};
use url::Url;
+use ConfigError::NotFound;
+pub mod error;
pub mod internal;
pub mod read;
pub mod table;
-pub mod utils;
+pub mod util;
pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR";
@@ -40,6 +44,10 @@ pub trait ConfigParser: AsRef<str> {
/// Supplies the default value of the configuration.
fn default_value(&self) -> Option<Self::Output>;
+ fn key(&self) -> String {
+ self.as_ref().to_string()
+ }
+
/// To indicate if the configuration is required or not, this helps in
validation.
fn is_required(&self) -> bool {
false
@@ -50,8 +58,7 @@ pub trait ConfigParser: AsRef<str> {
match self.parse_value(configs) {
Ok(_) => Ok(()),
Err(e) => {
- if !self.is_required() && e.to_string().ends_with("not found")
{
- // TODO: introduce error type to avoid checking "not found"
+ if !self.is_required() && matches!(e, NotFound(_)) {
Ok(())
} else {
Err(e)
@@ -98,7 +105,7 @@ impl HudiConfigValue {
/// A convenience method to convert [HudiConfigValue] to [Url] when the
value is a [String] and is intended to be a URL.
/// Panic if the value is not a [String].
- pub fn to_url(self) -> Result<Url> {
+ pub fn to_url(self) -> StorageResult<Url> {
match self {
HudiConfigValue::String(v) => parse_uri(&v),
_ => panic!(
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index a938953..ece692e 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -23,11 +23,9 @@ use std::str::FromStr;
use strum_macros::EnumIter;
-use crate::{
- config::{ConfigParser, HudiConfigValue},
- CoreError::{ConfigNotFound, InvalidConfig},
- Result,
-};
+use crate::config::error::ConfigError::{NotFound, ParseInt};
+use crate::config::Result;
+use crate::config::{ConfigParser, HudiConfigValue};
/// Configurations for reading Hudi tables.
///
@@ -79,15 +77,12 @@ impl ConfigParser for HudiReadConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
- .ok_or(ConfigNotFound(self.as_ref().to_string()));
+ .ok_or(NotFound(self.key()));
match self {
Self::InputPartitions => get_result
.and_then(|v| {
- usize::from_str(v).map_err(|e| InvalidConfig {
- item: Self::InputPartitions.as_ref(),
- source: Box::new(e),
- })
+ usize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
Self::AsOfTimestamp => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
@@ -97,10 +92,8 @@ impl ConfigParser for HudiReadConfig {
#[cfg(test)]
mod tests {
+ use super::*;
use crate::config::read::HudiReadConfig::InputPartitions;
- use crate::config::ConfigParser;
- use crate::CoreError::InvalidConfig;
- use std::collections::HashMap;
#[test]
fn parse_valid_config_value() {
@@ -113,10 +106,7 @@ mod tests {
fn parse_invalid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"foo".to_string())]);
let value = InputPartitions.parse_value(&options);
- assert!(matches!(
- value.unwrap_err(),
- InvalidConfig { item: _, source: _ }
- ));
+ assert!(matches!(value.unwrap_err(), ParseInt(_, _, _)));
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index bb19066..5b82b4c 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -23,11 +23,12 @@ use std::str::FromStr;
use strum_macros::{AsRefStr, EnumIter};
-use crate::{
- config::{ConfigParser, HudiConfigValue},
- CoreError::{self, ConfigNotFound, InvalidConfig, Unsupported},
- Result,
+use crate::config::error::ConfigError;
+use crate::config::error::ConfigError::{
+ InvalidValue, NotFound, ParseBool, ParseInt, UnsupportedValue,
};
+use crate::config::Result;
+use crate::config::{ConfigParser, HudiConfigValue};
/// Configurations for Hudi tables, most of them are persisted in
`hoodie.properties`.
///
@@ -148,7 +149,7 @@ impl ConfigParser for HudiTableConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
- .ok_or(ConfigNotFound(self.as_ref().to_string()));
+ .ok_or(NotFound(self.key()));
match self {
Self::BaseFileFormat => get_result
@@ -157,35 +158,23 @@ impl ConfigParser for HudiTableConfig {
Self::BasePath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::Checksum => get_result
.and_then(|v| {
- isize::from_str(v).map_err(|e| InvalidConfig {
- item: Self::Checksum.as_ref(),
- source: Box::new(e),
- })
+ isize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Integer),
Self::DatabaseName => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::DropsPartitionFields => get_result
.and_then(|v| {
- bool::from_str(v).map_err(|e| InvalidConfig {
- item: Self::DropsPartitionFields.as_ref(),
- source: Box::new(e),
- })
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
Self::IsHiveStylePartitioning => get_result
.and_then(|v| {
- bool::from_str(v).map_err(|e| InvalidConfig {
- item: Self::IsHiveStylePartitioning.as_ref(),
- source: Box::new(e),
- })
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
Self::IsPartitionPathUrlencoded => get_result
.and_then(|v| {
- bool::from_str(v).map_err(|e| InvalidConfig {
- item: Self::IsPartitionPathUrlencoded.as_ref(),
- source: Box::new(e),
- })
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
Self::KeyGeneratorClass => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
@@ -194,10 +183,7 @@ impl ConfigParser for HudiTableConfig {
Self::PrecombineField => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::PopulatesMetaFields => get_result
.and_then(|v| {
- bool::from_str(v).map_err(|e| InvalidConfig {
- item: Self::PopulatesMetaFields.as_ref(),
- source: Box::new(e),
- })
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
Self::RecordKeyFields => get_result
@@ -208,18 +194,12 @@ impl ConfigParser for HudiTableConfig {
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::TableVersion => get_result
.and_then(|v| {
- isize::from_str(v).map_err(|e| InvalidConfig {
- item: Self::TableVersion.as_ref(),
- source: Box::new(e),
- })
+ isize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Integer),
Self::TimelineLayoutVersion => get_result
.and_then(|v| {
- isize::from_str(v).map_err(|e| InvalidConfig {
- item: Self::TimelineLayoutVersion.as_ref(),
- source: Box::new(e),
- })
+ isize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Integer),
}
@@ -236,13 +216,13 @@ pub enum TableTypeValue {
}
impl FromStr for TableTypeValue {
- type Err = CoreError;
+ type Err = ConfigError;
- fn from_str(s: &str) -> Result<Self> {
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
"merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
- v => Err(Unsupported(format!("Unsupported table type {}", v))),
+ v => Err(InvalidValue(v.to_string())),
}
}
}
@@ -255,21 +235,20 @@ pub enum BaseFileFormatValue {
}
impl FromStr for BaseFileFormatValue {
- type Err = CoreError;
+ type Err = ConfigError;
- fn from_str(s: &str) -> Result<Self> {
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"parquet" => Ok(Self::Parquet),
- v => Err(Unsupported(format!("Unsupported base file format {}",
v))),
+ "orc" => Err(UnsupportedValue(s.to_string())),
+ v => Err(InvalidValue(v.to_string())),
}
}
}
#[cfg(test)]
mod tests {
- use std::str::FromStr;
-
- use crate::config::table::{BaseFileFormatValue, TableTypeValue};
+ use super::*;
#[test]
fn create_table_type() {
@@ -297,10 +276,22 @@ mod tests {
TableTypeValue::from_str("Merge-on-read").unwrap(),
TableTypeValue::MergeOnRead
);
- assert!(TableTypeValue::from_str("").is_err());
- assert!(TableTypeValue::from_str("copyonwrite").is_err());
- assert!(TableTypeValue::from_str("MERGEONREAD").is_err());
- assert!(TableTypeValue::from_str("foo").is_err());
+ assert!(matches!(
+ TableTypeValue::from_str("").unwrap_err(),
+ InvalidValue(_)
+ ));
+ assert!(matches!(
+ TableTypeValue::from_str("copyonwrite").unwrap_err(),
+ InvalidValue(_)
+ ));
+ assert!(matches!(
+ TableTypeValue::from_str("MERGEONREAD").unwrap_err(),
+ InvalidValue(_)
+ ));
+ assert!(matches!(
+ TableTypeValue::from_str("foo").unwrap_err(),
+ InvalidValue(_)
+ ));
}
#[test]
@@ -313,10 +304,13 @@ mod tests {
BaseFileFormatValue::from_str("PArquet").unwrap(),
BaseFileFormatValue::Parquet
);
- assert!(TableTypeValue::from_str("").is_err());
- assert!(
- TableTypeValue::from_str("orc").is_err(),
- "orc is not yet supported."
- );
+ assert!(matches!(
+ BaseFileFormatValue::from_str("").unwrap_err(),
+ InvalidValue(_)
+ ));
+ assert!(matches!(
+ BaseFileFormatValue::from_str("orc").unwrap_err(),
+ UnsupportedValue(_)
+ ));
}
}
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/util.rs
similarity index 95%
rename from crates/core/src/config/utils.rs
rename to crates/core/src/config/util.rs
index 85784ec..33921c6 100644
--- a/crates/core/src/config/utils.rs
+++ b/crates/core/src/config/util.rs
@@ -18,12 +18,12 @@
*/
//! Config utilities.
+use crate::config::error::ConfigError::ParseLine;
+use crate::config::Result;
use bytes::Bytes;
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};
-use crate::{CoreError, Result};
-
/// Returns an empty iterator to represent an empty set of options.
pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
std::iter::empty::<(&str, &str)>()
@@ -58,7 +58,7 @@ pub fn parse_data_for_options(data: &Bytes, split_chars:
&str) -> Result<HashMap
let mut options = HashMap::new();
for line in lines {
- let line = line.map_err(|e| CoreError::Internal(format!("Failed to
read line {e}")))?;
+ let line = line.map_err(|e| ParseLine(format!("Failed to parse config
data: {:?}", e)))?;
let trimmed_line = line.trim();
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
@@ -66,9 +66,10 @@ pub fn parse_data_for_options(data: &Bytes, split_chars:
&str) -> Result<HashMap
let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
let key = parts
.next()
- .ok_or(CoreError::Internal(
- "Missing key in config line".to_string(),
- ))?
+ .ok_or(ParseLine(format!(
+ "Missing key in config line: {}",
+ trimmed_line
+ )))?
.trim()
.to_owned();
let value = parts.next().unwrap_or("").trim().to_owned();
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
new file mode 100644
index 0000000..6d4f0b4
--- /dev/null
+++ b/crates/core/src/error.rs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::error::ConfigError;
+use crate::storage::error::StorageError;
+use thiserror::Error;
+
+pub type Result<T, E = CoreError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum CoreError {
+ #[error(transparent)]
+ ArrowError(#[from] arrow::error::ArrowError),
+
+ #[error("Config error: {0}")]
+ Config(#[from] ConfigError),
+
+ #[error("File group error: {0}")]
+ FileGroup(String),
+
+ #[error("{0}: {1:?}")]
+ ReadFileSliceError(String, StorageError),
+
+ #[error("{0}")]
+ InvalidPartitionPath(String),
+
+ #[error(transparent)]
+ ParquetError(#[from] parquet::errors::ParquetError),
+
+ #[error("Storage error: {0}")]
+ Storage(#[from] StorageError),
+
+ #[error("Timeline error: {0}")]
+ Timeline(String),
+
+ #[error("{0}")]
+ Unsupported(String),
+
+ #[error(transparent)]
+ Utf8Error(#[from] std::str::Utf8Error),
+}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 2fde1bf..58fb144 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,17 +22,17 @@
pub mod reader;
+use crate::error::CoreError;
+use crate::storage::file_info::FileInfo;
+use crate::storage::file_stats::FileStats;
+use crate::storage::Storage;
+use crate::Result;
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
-use crate::storage::file_info::FileInfo;
-use crate::storage::file_stats::FileStats;
-use crate::storage::Storage;
-use crate::{CoreError::Internal, Result};
-
/// Represents common metadata about a Hudi Base File.
#[derive(Clone, Debug)]
pub struct BaseFile {
@@ -52,10 +52,16 @@ impl BaseFile {
let err_msg = format!("Failed to parse file name '{}' for base file.",
file_name);
let (name, _) = file_name
.rsplit_once('.')
- .ok_or(Internal(err_msg.clone()))?;
+ .ok_or(CoreError::FileGroup(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
- let file_group_id =
parts.first().ok_or(Internal(err_msg.clone()))?.to_string();
- let commit_time =
parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string();
+ let file_group_id = parts
+ .first()
+ .ok_or(CoreError::FileGroup(err_msg.clone()))?
+ .to_string();
+ let commit_time = parts
+ .get(2)
+ .ok_or(CoreError::FileGroup(err_msg.clone()))?
+ .to_string();
Ok((file_group_id, commit_time))
}
@@ -190,7 +196,7 @@ impl FileGroup {
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
let commit_time = base_file.commit_time.as_str();
if self.file_slices.contains_key(commit_time) {
- Err(Internal(format!(
+ Err(CoreError::FileGroup(format!(
"Commit time {0} is already present in File Group {1}",
commit_time.to_owned(),
self.id,
@@ -243,6 +249,18 @@ mod tests {
assert_eq!(base_file.commit_time, "20240402144910683");
}
+ #[test]
+ fn create_a_base_file_returns_error() {
+ let result = BaseFile::from_file_name("no_file_extension");
+ assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+ let result = BaseFile::from_file_name(".parquet");
+ assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+ let result = BaseFile::from_file_name("no-valid-delimiter.parquet");
+ assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+ }
+
#[test]
fn load_a_valid_file_group() {
let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
@@ -277,7 +295,7 @@ mod tests {
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet",
);
assert!(res2.is_err());
- assert_eq!(res2.unwrap_err().to_string(), "Commit time
20240402144910683 is already present in File Group
5a226868-2934-4f84-a16f-55124630c68d-0");
+ assert_eq!(res2.unwrap_err().to_string(), "File group error: Commit
time 20240402144910683 is already present in File Group
5a226868-2934-4f84-a16f-55124630c68d-0");
}
#[test]
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index eeb283a..165ed9c 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -17,12 +17,14 @@
* under the License.
*/
use crate::config::table::HudiTableConfig;
-use crate::config::utils::split_hudi_options_from_others;
+use crate::config::util::split_hudi_options_from_others;
use crate::config::HudiConfigs;
+use crate::error::CoreError::ReadFileSliceError;
use crate::file_group::FileSlice;
use crate::storage::Storage;
use crate::Result;
use arrow_array::RecordBatch;
+use futures::TryFutureExt;
use std::sync::Arc;
/// File group reader handles all read operations against a file group.
@@ -58,7 +60,15 @@ impl FileGroupReader {
&self,
relative_path: &str,
) -> Result<RecordBatch> {
- self.storage.get_parquet_file_data(relative_path).await
+ self.storage
+ .get_parquet_file_data(relative_path)
+ .map_err(|e| {
+ ReadFileSliceError(
+ format!("Failed to read file slice at path '{}'",
relative_path),
+ e,
+ )
+ })
+ .await
}
pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
@@ -91,4 +101,16 @@ mod tests {
.contains(HudiTableConfig::BasePath));
Ok(())
}
+
+ #[tokio::test]
+ async fn test_read_file_slice_returns_error() {
+ let storage =
+
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())
+ .unwrap();
+ let reader = FileGroupReader::new(storage);
+ let result = reader
+ .read_file_slice_by_base_file_path("non_existent_file")
+ .await;
+ assert!(matches!(result.unwrap_err(), ReadFileSliceError(_, _)));
+ }
}
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 6485c1d..42079a1 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -44,53 +44,10 @@
//! ```
pub mod config;
+pub mod error;
pub mod file_group;
pub mod storage;
pub mod table;
pub mod util;
-use thiserror::Error;
-
-#[derive(Error, Debug)]
-pub enum CoreError {
- #[error("Config '{0}' not found")]
- ConfigNotFound(String),
-
- #[error("Invalid config item '{item}', {source:?}")]
- InvalidConfig {
- item: &'static str,
- source: Box<dyn std::error::Error + Sync + Send + 'static>,
- },
-
- #[error("Parse url '{url}' failed, {source}")]
- UrlParse {
- url: String,
- source: url::ParseError,
- },
-
- #[error("Invalid file path '{name}', {detail}")]
- InvalidPath { name: String, detail: String },
-
- #[error("{0}")]
- Unsupported(String),
-
- #[error("{0}")]
- Internal(String),
-
- #[error(transparent)]
- Utf8Error(#[from] std::str::Utf8Error),
-
- #[error(transparent)]
- ObjectStore(#[from] object_store::Error),
-
- #[error(transparent)]
- ObjectStorePath(#[from] object_store::path::Error),
-
- #[error(transparent)]
- Parquet(#[from] parquet::errors::ParquetError),
-
- #[error(transparent)]
- Arrow(#[from] arrow::error::ArrowError),
-}
-
-type Result<T, E = CoreError> = std::result::Result<T, E>;
+use error::Result;
diff --git a/crates/core/src/storage/error.rs b/crates/core/src/storage/error.rs
new file mode 100644
index 0000000..09c749d
--- /dev/null
+++ b/crates/core/src/storage/error.rs
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use thiserror::Error;
+
+pub type Result<T, E = StorageError> = std::result::Result<T, E>;
+
+#[derive(Error, Debug)]
+pub enum StorageError {
+ #[error(transparent)]
+ ArrowError(#[from] arrow::error::ArrowError),
+
+ #[error("Failed to create storage: {0}")]
+ Creation(String),
+
+ #[error("Invalid path: {0}")]
+ InvalidPath(String),
+
+ #[error(transparent)]
+ ObjectStoreError(#[from] object_store::Error),
+
+ #[error(transparent)]
+ ObjectStorePathError(#[from] object_store::path::Error),
+
+ #[error(transparent)]
+ ParquetError(#[from] parquet::errors::ParquetError),
+
+ #[error(transparent)]
+ UrlParseError(#[from] url::ParseError),
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 0a9f0e1..7f31011 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -36,13 +36,15 @@ use url::Url;
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
+use crate::storage::error::Result;
+use crate::storage::error::StorageError::{Creation, InvalidPath};
use crate::storage::file_info::FileInfo;
-use crate::storage::utils::join_url_segments;
-use crate::{CoreError, Result};
+use crate::storage::util::join_url_segments;
+pub mod error;
pub mod file_info;
pub mod file_stats;
-pub mod utils;
+pub mod util;
#[allow(dead_code)]
#[derive(Clone, Debug)]
@@ -60,14 +62,15 @@ impl Storage {
options: Arc<HashMap<String, String>>,
hudi_configs: Arc<HudiConfigs>,
) -> Result<Arc<Storage>> {
- if !hudi_configs.contains(HudiTableConfig::BasePath) {
- return Err(CoreError::Internal(format!(
- "Failed to create storage: {} is required.",
- HudiTableConfig::BasePath.as_ref()
- )));
- }
-
- let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?;
+ let base_url = match hudi_configs.try_get(HudiTableConfig::BasePath) {
+ Some(v) => v.to_url()?,
+ None => {
+ return Err(Creation(format!(
+ "{} is required.",
+ HudiTableConfig::BasePath.as_ref()
+ )))
+ }
+ };
match parse_url_opts(&base_url, options.as_ref()) {
Ok((object_store, _)) => Ok(Arc::new(Storage {
@@ -76,7 +79,7 @@ impl Storage {
options,
hudi_configs,
})),
- Err(e) => Err(CoreError::ObjectStore(e)),
+ Err(e) => Err(Creation(format!("Failed to create storage: {}",
e))),
}
}
@@ -109,10 +112,10 @@ impl Storage {
let uri = obj_url.to_string();
let name = obj_path
.filename()
- .ok_or(CoreError::InvalidPath {
- name: obj_path.to_string(),
- detail: "failed to get file name".to_string(),
- })?
+ .ok_or(InvalidPath(format!(
+ "Failed to get file name from {:?}",
+ obj_path
+ )))?
.to_string();
Ok(FileInfo {
uri,
@@ -176,10 +179,10 @@ impl Storage {
for dir in dir_paths {
dirs.push(
dir.filename()
- .ok_or(CoreError::InvalidPath {
- name: dir.to_string(),
- detail: "failed to get file name".to_string(),
- })?
+ .ok_or(InvalidPath(format!(
+ "Failed to get file name from: {:?}",
+ dir
+ )))?
.to_string(),
)
}
@@ -208,10 +211,10 @@ impl Storage {
let name = obj_meta
.location
.filename()
- .ok_or(CoreError::InvalidPath {
- name: obj_meta.location.to_string(),
- detail: "failed to get file name".to_string(),
- })?
+ .ok_or(InvalidPath(format!(
+ "Failed to get file name from {:?}",
+ obj_meta.location
+ )))?
.to_string();
let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
file_info.push(FileInfo {
@@ -246,10 +249,10 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir:
Option<&str>) -> Result<Ve
next_subdir.push(curr);
}
next_subdir.push(child_dir);
- let next_subdir =
next_subdir.to_str().ok_or(CoreError::InvalidPath {
- name: format!("{:?}", next_subdir),
- detail: "failed to convert path".to_string(),
- })?;
+ let next_subdir = next_subdir.to_str().ok_or(InvalidPath(format!(
+ "Failed to convert path: {:?}",
+ next_subdir
+ )))?;
let curr_leaf_dir = get_leaf_dirs(storage,
Some(next_subdir)).await?;
leaf_dirs.extend(curr_leaf_dir);
}
@@ -264,12 +267,6 @@ mod tests {
use std::fs::canonicalize;
use std::path::Path;
- use crate::storage::file_info::FileInfo;
- use crate::storage::utils::join_url_segments;
- use crate::storage::{get_leaf_dirs, Storage};
- use object_store::path::Path as ObjPath;
- use url::Url;
-
#[test]
fn test_storage_new_error_no_base_path() {
let options = Arc::new(HashMap::new());
@@ -299,7 +296,7 @@ mod tests {
result.is_err(),
"Should return error when no base path is invalid."
);
- assert!(matches!(result.unwrap_err(), CoreError::ObjectStore(_)));
+ assert!(matches!(result.unwrap_err(), Creation(_)));
}
#[tokio::test]
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/util.rs
similarity index 82%
rename from crates/core/src/storage/utils.rs
rename to crates/core/src/storage/util.rs
index 10198a1..e3f32b9 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/util.rs
@@ -19,12 +19,10 @@
//! Utility functions for storage.
use std::path::Path;
-use url::{ParseError, Url};
+use url::Url;
-use crate::{
- CoreError::{InvalidPath, UrlParse},
- Result,
-};
+use crate::storage::error::StorageError::{InvalidPath, UrlParseError};
+use crate::storage::Result;
/// Splits a filename into a stem and an extension.
pub fn split_filename(filename: &str) -> Result<(String, String)> {
@@ -33,10 +31,7 @@ pub fn split_filename(filename: &str) -> Result<(String,
String)> {
let stem = path
.file_stem()
.and_then(|s| s.to_str())
- .ok_or_else(|| InvalidPath {
- name: filename.to_string(),
- detail: "no file stem found".to_string(),
- })?
+ .ok_or(InvalidPath(format!("No file stem found in {}", filename)))?
.to_string();
let extension = path
@@ -52,19 +47,12 @@ pub fn split_filename(filename: &str) -> Result<(String,
String)> {
pub fn parse_uri(uri: &str) -> Result<Url> {
let mut url = match Url::parse(uri) {
Ok(url) => url,
- Err(source) => Url::from_directory_path(uri).map_err(|_| UrlParse {
- url: uri.to_string(),
- source,
- })?,
+ Err(e) => Url::from_directory_path(uri).map_err(|_| UrlParseError(e))?,
};
if url.path().ends_with('/') {
- url.path_segments_mut()
- .map_err(|_| InvalidPath {
- name: uri.to_string(),
- detail: "parse uri failed".to_string(),
- })?
- .pop();
+ let err = InvalidPath(format!("Url {:?} cannot be a base", url));
+ url.path_segments_mut().map_err(|_| err)?.pop();
}
Ok(url)
@@ -85,12 +73,8 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str])
-> Result<Url> {
for &seg in segments {
let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect();
- url.path_segments_mut()
- .map_err(|_| UrlParse {
- url: base_url.to_string(),
- source: ParseError::RelativeUrlWithoutBase,
- })?
- .extend(segs);
+ let err = InvalidPath(format!("Url {:?} cannot be a base", url));
+ url.path_segments_mut().map_err(|_| err)?.extend(segs);
}
Ok(url)
@@ -98,12 +82,9 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str])
-> Result<Url> {
#[cfg(test)]
mod tests {
+ use super::*;
use std::str::FromStr;
- use url::Url;
-
- use crate::storage::utils::{join_url_segments, parse_uri};
-
#[test]
fn parse_valid_uri_in_various_forms() {
let urls = vec![
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index eca7d2b..20df88e 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -31,13 +31,14 @@ use crate::config::read::HudiReadConfig;
use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType,
TableVersion};
use crate::config::table::TableTypeValue::CopyOnWrite;
use crate::config::table::{HudiTableConfig, TableTypeValue};
-use crate::config::utils::{parse_data_for_options,
split_hudi_options_from_others};
+use crate::config::util::{parse_data_for_options,
split_hudi_options_from_others};
use crate::config::{HudiConfigs, HUDI_CONF_DIR};
+use crate::error::CoreError;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
use crate::table::Table;
-use crate::{CoreError, Result};
+use crate::Result;
/// Builder for creating a [Table] instance.
#[derive(Debug, Clone)]
@@ -185,7 +186,9 @@ impl TableBuilder {
let hudi_options = &mut self.hudi_options;
Self::imbue_table_properties(hudi_options, storage.clone()).await?;
- // TODO load Hudi configs from env vars here before loading global
configs
+ // TODO support imbuing Hudi options from env vars HOODIE_ENV.*
+ // (see https://hudi.apache.org/docs/next/s3_hoodie)
+ // before loading global configs
Self::imbue_global_hudi_configs_if_absent(hudi_options,
storage.clone()).await
}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index ee68eec..092b352 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,8 +25,9 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
+use crate::error::CoreError;
use crate::table::partition::PartitionPruner;
-use crate::{CoreError, Result};
+use crate::Result;
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4836d1c..da32a10 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -60,7 +60,7 @@
//! ```rust
//! use url::Url;
//! use hudi_core::table::Table;
-//! use hudi_core::storage::utils::parse_uri;
+//! use hudi_core::storage::util::parse_uri;
//!
//! pub async fn test() {
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
@@ -95,13 +95,14 @@ use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig;
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::HudiConfigs;
+use crate::error::CoreError;
use crate::file_group::reader::FileGroupReader;
use crate::file_group::FileSlice;
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::timeline::Timeline;
-use crate::{CoreError, Result};
+use crate::Result;
pub mod builder;
mod fs_view;
@@ -137,7 +138,10 @@ impl Table {
}
pub fn base_url(&self) -> Result<Url> {
- self.hudi_configs.get(HudiTableConfig::BasePath)?.to_url()
+ self.hudi_configs
+ .get(HudiTableConfig::BasePath)?
+ .to_url()
+ .map_err(CoreError::from)
}
pub fn hudi_options(&self) -> HashMap<String, String> {
@@ -161,6 +165,7 @@ impl Table {
.register_object_store(runtime_env.clone());
}
+ /// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
}
@@ -259,19 +264,10 @@ impl Table {
filters: &[(&str, &str, &str)],
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
- let mut batches = Vec::new();
let fg_reader = self.create_file_group_reader();
- for f in file_slices {
- match fg_reader.read_file_slice(&f).await {
- Ok(batch) => batches.push(batch),
- Err(e) => {
- return Err(CoreError::Internal(format!(
- "Failed to read file slice {:?} - {}",
- f, e
- )))
- }
- }
- }
+ let batches =
+ futures::future::try_join_all(file_slices.iter().map(|f|
fg_reader.read_file_slice(f)))
+ .await?;
Ok(batches)
}
@@ -294,16 +290,14 @@ impl Table {
#[cfg(test)]
mod tests {
+ use super::*;
use arrow_array::StringArray;
- use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
use std::{env, panic};
- use url::Url;
use hudi_tests::{assert_not, TestTable};
- use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig::{
BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields,
IsHiveStylePartitioning,
IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields,
PopulatesMetaFields,
@@ -311,9 +305,8 @@ mod tests {
TimelineLayoutVersion,
};
use crate::config::HUDI_CONF_DIR;
- use crate::storage::utils::join_url_segments;
+ use crate::storage::util::join_url_segments;
use crate::storage::Storage;
- use crate::table::Table;
/// Test helper to create a new `Table` instance without validating the
configuration.
///
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 8f03521..ca41f73 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -18,7 +18,9 @@
*/
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
-use crate::{CoreError, Result};
+use crate::error::CoreError;
+use crate::error::CoreError::{InvalidPartitionPath, Unsupported};
+use crate::Result;
use arrow_array::{ArrayRef, Scalar, StringArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
@@ -119,7 +121,7 @@ impl PartitionPruner {
let parts: Vec<&str> = partition_path.split('/').collect();
if parts.len() != self.schema.fields().len() {
- return Err(CoreError::Internal(format!(
+ return Err(InvalidPartitionPath(format!(
"Partition path should have {} part(s) but got {}",
self.schema.fields().len(),
parts.len()
@@ -132,14 +134,11 @@ impl PartitionPruner {
.zip(parts)
.map(|(field, part)| {
let value = if self.is_hive_style {
- let (name, value) = part.split_once('=').ok_or_else(|| {
- CoreError::Internal(format!(
- "Partition path should be hive-style but got {}",
- part
- ))
- })?;
+ let (name, value) =
part.split_once('=').ok_or(InvalidPartitionPath(
+ format!("Partition path should be hive-style but got
{}", part),
+ ))?;
if name != field.name() {
- return Err(CoreError::Internal(format!(
+ return Err(InvalidPartitionPath(format!(
"Partition path should contain {} but got {}",
field.name(),
name
@@ -179,13 +178,13 @@ impl Operator {
}
impl FromStr for Operator {
- type Err = crate::CoreError;
+ type Err = CoreError;
- fn from_str(s: &str) -> Result<Self> {
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
Operator::TOKEN_OP_PAIRS
.iter()
.find_map(|&(token, op)| if token == s { Some(op) } else { None })
- .ok_or_else(|| CoreError::Internal(format!("Unsupported operator:
{}", s)))
+ .ok_or(Unsupported(format!("Unsupported operator: {}", s)))
}
}
@@ -198,9 +197,11 @@ pub struct PartitionFilter {
}
impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
- type Error = crate::CoreError;
+ type Error = CoreError;
- fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) ->
Result<Self> {
+ fn try_from(
+ (filter, partition_schema): ((&str, &str, &str), &Schema),
+ ) -> Result<Self, Self::Error> {
let (field_name, operator_str, value_str) = filter;
let field: &Field = partition_schema.field_with_name(field_name)?;
@@ -435,6 +436,13 @@ mod tests {
let configs = create_hudi_configs(true, false);
let pruner = PartitionPruner::new(&[], &schema, &configs).unwrap();
- assert!(pruner.parse_segments("invalid/path").is_err());
+ let result =
pruner.parse_segments("date=2023-02-01/category=A/count=10/extra");
+ assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
+
+ let result = pruner.parse_segments("date=2023-02-01/category=A/10");
+ assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
+
+ let result =
pruner.parse_segments("date=2023-02-01/category=A/non_exist_field=10");
+ assert!(matches!(result.unwrap_err(), InvalidPartitionPath(_)));
}
}
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index d147abf..d48aa55 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -28,10 +28,12 @@ use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
use crate::config::HudiConfigs;
+use crate::error::CoreError;
use crate::file_group::FileGroup;
-use crate::storage::utils::split_filename;
+use crate::storage::error::StorageError;
+use crate::storage::util::split_filename;
use crate::storage::Storage;
-use crate::{CoreError, Result};
+use crate::Result;
/// The [State] of an [Instant] represents the status of the action performed
on the table.
#[allow(dead_code)]
@@ -80,10 +82,11 @@ impl Instant {
commit_file_path.push(self.file_name());
commit_file_path
.to_str()
- .ok_or(CoreError::Internal(format!(
+ .ok_or(StorageError::InvalidPath(format!(
"Failed to get file path for {:?}",
self
)))
+ .map_err(CoreError::Storage)
.map(|s| s.to_string())
}
@@ -143,11 +146,15 @@ impl Timeline {
.storage
.get_file_data(instant.relative_path()?.as_str())
.await?;
+ let err_msg = format!("Failed to get commit metadata for {:?}",
instant);
let json: Value = serde_json::from_slice(&bytes)
- .map_err(|e| CoreError::Internal(format!("Invalid instant {:?}",
e)))?;
+ .map_err(|e| CoreError::Timeline(format!("{}: {:?}", err_msg,
e)))?;
let commit_metadata = json
.as_object()
- .ok_or_else(|| CoreError::Internal("Expected JSON
object".to_string()))?
+ .ok_or(CoreError::Timeline(format!(
+ "{}: not a JSON object",
+ err_msg
+ )))?
.clone();
Ok(commit_metadata)
}
@@ -178,7 +185,7 @@ impl Timeline {
None,
)?)
} else {
- Err(CoreError::Internal(
+ Err(CoreError::Timeline(
"Failed to resolve the latest schema: no file path
found".to_string(),
))
}
@@ -216,6 +223,7 @@ impl Timeline {
#[cfg(test)]
mod tests {
+ use super::*;
use std::collections::HashMap;
use std::fs::canonicalize;
use std::path::Path;
@@ -226,8 +234,6 @@ mod tests {
use hudi_tests::TestTable;
use crate::config::table::HudiTableConfig;
- use crate::config::HudiConfigs;
- use crate::table::timeline::{Instant, State, Timeline};
async fn create_test_timeline(base_url: Url) -> Timeline {
Timeline::new(
@@ -254,7 +260,7 @@ mod tests {
assert!(table_schema.is_err());
assert_eq!(
table_schema.err().unwrap().to_string(),
- "Failed to resolve the latest schema: no file path found"
+ "Timeline error: Failed to resolve the latest schema: no file path
found"
)
}
@@ -281,4 +287,41 @@ mod tests {
]
)
}
+
+ #[tokio::test]
+ async fn get_commit_metadata_returns_error() {
+ let base_url = Url::from_file_path(
+ canonicalize(Path::new(
+ "tests/data/timeline/commits_with_invalid_content",
+ ))
+ .unwrap(),
+ )
+ .unwrap();
+ let timeline = create_test_timeline(base_url).await;
+ let instant = Instant {
+ state: State::Completed,
+ action: "commit".to_owned(),
+ timestamp: "20240402123035233".to_owned(),
+ };
+
+ // Test error when reading empty commit metadata file
+ let result = timeline.get_commit_metadata(&instant).await;
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(matches!(err, CoreError::Timeline(_)));
+ assert!(err.to_string().contains("Failed to get commit metadata"));
+
+ let instant = Instant {
+ state: State::Completed,
+ action: "commit".to_owned(),
+ timestamp: "20240402144910683".to_owned(),
+ };
+
+ // Test error when reading a commit metadata file with invalid JSON
+ let result = timeline.get_commit_metadata(&instant).await;
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(matches!(err, CoreError::Timeline(_)));
+ assert!(err.to_string().contains("not a JSON object"));
+ }
}
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit.requested
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.inflight
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402123035233.inflight
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..cb64946
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit
@@ -0,0 +1,7 @@
+[
+ "should",
+ "not",
+ "be",
+ "an",
+ "array"
+]
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit.requested
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.inflight
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/20240402144910683.inflight
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_with_invalid_content/.hoodie/hoodie.properties
new file mode 100644
index 0000000..e69de29
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 1bf17ea..b5790b8 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -40,8 +40,8 @@ use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::create_physical_expr;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::config::utils::empty_options;
-use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
+use hudi_core::config::util::empty_options;
+use hudi_core::storage::util::{get_scheme_authority, parse_uri};
use hudi_core::table::Table as HudiTable;
/// Create a `HudiDataSource`.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 549a503..a6a3552 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -26,13 +26,14 @@ use arrow::pyarrow::ToPyArrow;
use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
use tokio::runtime::Runtime;
+use hudi::error::CoreError;
use hudi::file_group::reader::FileGroupReader;
use hudi::file_group::FileSlice;
+use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
use hudi::util::convert_vec_to_slice;
use hudi::util::vec_to_slice;
-use hudi::CoreError;
use pyo3::create_exception;
use pyo3::exceptions::PyException;
@@ -45,7 +46,7 @@ fn convert_to_py_err(err: CoreError) -> PyErr {
#[derive(thiserror::Error, Debug)]
pub enum PythonError {
- #[error("Error in Hudi core")]
+ #[error("Error in Hudi core: {0}")]
HudiCore(#[from] CoreError),
}
@@ -110,17 +111,17 @@ pub struct HudiFileSlice {
#[pymethods]
impl HudiFileSlice {
fn base_file_relative_path(&self) -> PyResult<String> {
- Ok(PathBuf::from(&self.partition_path)
+ let path = PathBuf::from(&self.partition_path)
.join(&self.base_file_name)
.to_str()
.map(String::from)
- .ok_or_else(|| {
- CoreError::Internal(format!(
- "Failed to get base file relative path for file slice:
{:?}",
- self
- ))
- })
- .map_err(PythonError::from)?)
+ .ok_or(StorageError::InvalidPath(format!(
+ "Failed to get base file relative path for file slice: {:?}",
+ self
+ )))
+ .map_err(CoreError::from)
+ .map_err(PythonError::from)?;
+ Ok(path)
}
}