This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 52a9245 refactor: improve error handling in storage module (#34)
52a9245 is described below
commit 52a924557ee18effadc02749ec7cdb1001ad6b58
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jul 2 22:18:26 2024 -0500
refactor: improve error handling in storage module (#34)
---
crates/core/fixtures/leaf_dir/.gitkeep | 0
crates/core/src/storage/mod.rs | 198 +++++++++++++++++++++------------
crates/core/src/table/fs_view.rs | 15 +--
crates/core/src/table/mod.rs | 11 +-
crates/core/src/table/timeline.rs | 4 +-
python/hudi/_internal.pyi | 2 +-
python/hudi/table.py | 2 +-
python/tests/test_table_read.py | 6 +-
8 files changed, 143 insertions(+), 95 deletions(-)
diff --git a/crates/core/fixtures/leaf_dir/.gitkeep
b/crates/core/fixtures/leaf_dir/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 0f09c05..43dd0e7 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -21,7 +21,8 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
+use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
use async_recursion::async_recursion;
use bytes::Bytes;
@@ -60,16 +61,21 @@ impl Storage {
}
}
- #[allow(dead_code)]
- pub async fn get_file_info(&self, relative_path: &str) -> FileInfo {
- let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
- let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
- let meta = self.object_store.head(&obj_path).await.unwrap();
- FileInfo {
- uri: obj_url.to_string(),
- name: obj_path.filename().unwrap().to_string(),
+ #[cfg(test)]
+ async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
+ let meta = self.object_store.head(&obj_path).await?;
+ let uri = obj_url.to_string();
+ let name = obj_path
+ .filename()
+ .ok_or(anyhow!("Failed to get file name for {}", obj_path))?
+ .to_string();
+ Ok(FileInfo {
+ uri,
+ name,
size: meta.size,
- }
+ })
}
pub async fn get_parquet_file_metadata(&self, relative_path: &str) ->
Result<ParquetMetaData> {
@@ -79,79 +85,100 @@ impl Storage {
let meta = obj_store.head(&obj_path).await?;
let reader = ParquetObjectReader::new(obj_store, meta);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
- Ok(builder.metadata().as_ref().to_owned())
+ Ok(builder.metadata().as_ref().clone())
}
- pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
- let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
- let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
- let result = self.object_store.get(&obj_path).await.unwrap();
- result.bytes().await.unwrap()
+ pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
+ let result = self.object_store.get(&obj_path).await?;
+ let bytes = result.bytes().await?;
+ Ok(bytes)
}
- pub async fn get_parquet_file_data(&self, relative_path: &str) ->
Vec<RecordBatch> {
- let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
- let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+ pub async fn get_parquet_file_data(&self, relative_path: &str) ->
Result<RecordBatch> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
let obj_store = self.object_store.clone();
- let meta = obj_store.head(&obj_path).await.unwrap();
+ let meta = obj_store.head(&obj_path).await?;
+
+ // read parquet
let reader = ParquetObjectReader::new(obj_store, meta);
- let stream = ParquetRecordBatchStreamBuilder::new(reader)
- .await
- .unwrap()
- .build()
- .unwrap();
- stream
- .collect::<Vec<_>>()
- .await
- .into_iter()
- .map(|r| r.unwrap())
- .collect()
+ let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+ let schema = builder.schema().clone();
+ let mut stream = builder.build()?;
+ let mut batches = Vec::new();
+
+ while let Some(r) = stream.next().await {
+ let batch = r.context("Failed to read record batch.")?;
+ batches.push(batch)
+ }
+
+ if batches.is_empty() {
+ return Ok(RecordBatch::new_empty(schema.clone()));
+ }
+
+ concat_batches(&schema, &batches)
+ .map_err(|e| anyhow!("Failed to concat record batches: {}", e))
}
- pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
- self.list_dirs_as_obj_paths(subdir)
- .await
- .into_iter()
- .map(|p| p.filename().unwrap().to_string())
- .collect()
+ pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>>
{
+ let dir_paths = self.list_dirs_as_obj_paths(subdir).await?;
+ let mut dirs = Vec::new();
+ for dir in dir_paths {
+ dirs.push(
+ dir.filename()
+ .ok_or(anyhow!("Failed to get file name for {}", dir))?
+ .to_string(),
+ )
+ }
+ Ok(dirs)
}
- async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) ->
Vec<ObjPath> {
- let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()]).unwrap();
- let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
- self.object_store
+ async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) ->
Result<Vec<ObjPath>> {
+ let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()])?;
+ let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
+ let list_res = self
+ .object_store
.list_with_delimiter(Some(&prefix_path))
- .await
- .unwrap()
- .common_prefixes
+ .await?;
+ Ok(list_res.common_prefixes)
}
- pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> {
- let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()]).unwrap();
- let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
- self.object_store
+ pub async fn list_files(&self, subdir: Option<&str>) ->
Result<Vec<FileInfo>> {
+ let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()])?;
+ let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
+ let list_res = self
+ .object_store
.list_with_delimiter(Some(&prefix_path))
- .await
- .unwrap()
- .objects
- .into_iter()
- .map(|obj_meta| FileInfo {
- uri: join_url_segments(&prefix_url,
&[obj_meta.location.filename().unwrap()])
- .unwrap()
- .to_string(),
- name: obj_meta.location.filename().unwrap().to_string(),
+ .await?;
+ let mut file_info = Vec::new();
+ for obj_meta in list_res.objects {
+ let name = obj_meta
+ .location
+ .filename()
+ .ok_or(anyhow!(
+ "Failed to get file name for {:?}",
+ obj_meta.location
+ ))?
+ .to_string();
+ let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
+ file_info.push(FileInfo {
+ uri,
+ name,
size: obj_meta.size,
- })
- .collect()
+ });
+ }
+ Ok(file_info)
}
}
#[async_recursion]
-pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) ->
Vec<String> {
+pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) ->
Result<Vec<String>> {
let mut leaf_dirs = Vec::new();
- let child_dirs = storage.list_dirs(subdir).await;
+ let child_dirs = storage.list_dirs(subdir).await?;
if child_dirs.is_empty() {
- leaf_dirs.push(subdir.unwrap().to_owned());
+ leaf_dirs.push(subdir.unwrap_or_default().to_owned());
} else {
for child_dir in child_dirs {
let mut next_subdir = PathBuf::new();
@@ -159,11 +186,14 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir:
Option<&str>) -> Vec<Strin
next_subdir.push(curr);
}
next_subdir.push(child_dir);
- let curr_leaf_dir = get_leaf_dirs(storage,
Some(next_subdir.to_str().unwrap())).await;
+ let next_subdir = next_subdir
+ .to_str()
+ .ok_or(anyhow!("Failed to convert path: {:?}", next_subdir))?;
+ let curr_leaf_dir = get_leaf_dirs(storage,
Some(next_subdir)).await?;
leaf_dirs.extend(curr_leaf_dir);
}
}
- leaf_dirs
+ Ok(leaf_dirs)
}
#[cfg(test)]
@@ -187,7 +217,8 @@ mod tests {
)
.unwrap();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.into_iter().collect();
+ let first_level_dirs: HashSet<String> =
+ storage.list_dirs(None).await.unwrap().into_iter().collect();
assert_eq!(
first_level_dirs,
vec![".hoodie", "part1", "part2", "part3"]
@@ -195,9 +226,9 @@ mod tests {
.map(String::from)
.collect()
);
- let second_level_dirs: Vec<String> =
storage.list_dirs(Some("part2")).await;
+ let second_level_dirs: Vec<String> =
storage.list_dirs(Some("part2")).await.unwrap();
assert_eq!(second_level_dirs, vec!["part22"]);
- let no_dirs = storage.list_dirs(Some("part1")).await;
+ let no_dirs = storage.list_dirs(Some("part1")).await.unwrap();
assert!(no_dirs.is_empty());
}
@@ -211,6 +242,7 @@ mod tests {
let first_level_dirs: HashSet<ObjPath> = storage
.list_dirs_as_obj_paths(None)
.await
+ .unwrap()
.into_iter()
.collect();
let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1",
"part2", "part3"]
@@ -230,7 +262,12 @@ mod tests {
)
.unwrap();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let file_info_1: Vec<FileInfo> =
storage.list_files(None).await.into_iter().collect();
+ let file_info_1: Vec<FileInfo> = storage
+ .list_files(None)
+ .await
+ .unwrap()
+ .into_iter()
+ .collect();
assert_eq!(
file_info_1,
vec![FileInfo {
@@ -242,6 +279,7 @@ mod tests {
let file_info_2: Vec<FileInfo> = storage
.list_files(Some("part1"))
.await
+ .unwrap()
.into_iter()
.collect();
assert_eq!(
@@ -259,6 +297,7 @@ mod tests {
let file_info_3: Vec<FileInfo> = storage
.list_files(Some("part2/part22"))
.await
+ .unwrap()
.into_iter()
.collect();
assert_eq!(
@@ -282,19 +321,33 @@ mod tests {
)
.unwrap();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let leaf_dirs = get_leaf_dirs(&storage, None).await;
+ let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"]
);
}
+ #[tokio::test]
+ async fn use_storage_to_get_leaf_dirs_for_leaf_dir() {
+ let base_url =
+
Url::from_directory_path(canonicalize(Path::new("fixtures/leaf_dir")).unwrap())
+ .unwrap();
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
+ assert_eq!(
+ leaf_dirs,
+ vec![""],
+ "Listing a leaf dir should get the relative path to itself."
+ );
+ }
+
#[tokio::test]
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let file_info = storage.get_file_info("a.parquet").await;
+ let file_info = storage.get_file_info("a.parquet").await.unwrap();
assert_eq!(file_info.name, "a.parquet");
assert_eq!(
file_info.uri,
@@ -308,8 +361,7 @@ mod tests {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let file_data = storage.get_parquet_file_data("a.parquet").await;
- assert_eq!(file_data.len(), 1);
- assert_eq!(file_data.first().unwrap().num_rows(), 5);
+ let file_data =
storage.get_parquet_file_data("a.parquet").await.unwrap();
+ assert_eq!(file_data.num_rows(), 5);
}
}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index b7cd77c..ad4f5f5 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -58,13 +58,13 @@ impl FileSystemView {
async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
- .await
+ .await?
.into_iter()
.filter(|dir| dir != ".hoodie")
.collect();
let mut partition_paths = Vec::new();
for dir in top_level_dirs {
- partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await);
+ partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?);
}
if partition_paths.is_empty() {
partition_paths.push("".to_string())
@@ -94,7 +94,7 @@ impl FileSystemView {
) -> Result<Vec<FileGroup>> {
let file_info: Vec<FileInfo> = storage
.list_files(Some(partition_path))
- .await
+ .await?
.into_iter()
.filter(|f| f.name.ends_with(".parquet"))
.collect();
@@ -152,13 +152,10 @@ impl FileSystemView {
pub async fn read_file_slice_by_path_unchecked(
&self,
relative_path: &str,
- ) -> Result<Vec<RecordBatch>> {
- Ok(self.storage.get_parquet_file_data(relative_path).await)
+ ) -> Result<RecordBatch> {
+ self.storage.get_parquet_file_data(relative_path).await
}
- pub async fn read_file_slice_unchecked(
- &self,
- file_slice: &FileSlice,
- ) -> Result<Vec<RecordBatch>> {
+ pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 52daf12..d1be74f 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -85,7 +85,7 @@ impl Table {
storage_options: Arc<HashMap<String, String>>,
) -> Result<HashMap<String, String>> {
let storage = Storage::new(base_url, storage_options)?;
- let data = storage.get_file_data(".hoodie/hoodie.properties").await;
+ let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
let cursor = std::io::Cursor::new(data);
let lines = BufReader::new(cursor).lines();
let mut properties: HashMap<String, String> = HashMap::new();
@@ -146,7 +146,7 @@ impl Table {
let mut batches = Vec::new();
for f in file_slices {
match self.file_system_view.read_file_slice_unchecked(&f).await {
- Ok(batch) => batches.extend(batch),
+ Ok(batch) => batches.push(batch),
Err(e) => return Err(anyhow!("Failed to read file slice {:?} -
{}", f, e)),
}
}
@@ -162,7 +162,7 @@ impl Table {
Ok(file_paths)
}
- pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<Vec<RecordBatch>> {
+ pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<RecordBatch> {
self.file_system_view
.read_file_slice_by_path_unchecked(relative_path)
.await
@@ -329,9 +329,8 @@ mod tests {
)
.await
.unwrap();
- assert_eq!(batches.len(), 1);
- assert_eq!(batches.first().unwrap().num_rows(), 4);
- assert_eq!(batches.first().unwrap().num_columns(), 21);
+ assert_eq!(batches.num_rows(), 4);
+ assert_eq!(batches.num_columns(), 21);
}
#[tokio::test]
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index 70fc6ee..3a68387 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -98,7 +98,7 @@ impl Timeline {
async fn load_completed_commit_instants(storage: &Storage) ->
Result<Vec<Instant>> {
let mut completed_commits = Vec::new();
- for file_info in storage.list_files(Some(".hoodie")).await {
+ for file_info in storage.list_files(Some(".hoodie")).await? {
let (file_stem, file_ext) =
split_filename(file_info.name.as_str())?;
if file_ext == "commit" {
completed_commits.push(Instant {
@@ -128,7 +128,7 @@ impl Timeline {
"Failed to get commit file path for instant: {:?}",
instant
))?;
- let bytes = self.storage.get_file_data(relative_path).await;
+ let bytes = self.storage.get_file_data(relative_path).await?;
let json: Value = serde_json::from_slice(&bytes)?;
let commit_metadata = json
.as_object()
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 67dd0cc..b91b492 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -49,7 +49,7 @@ class BindingHudiTable:
def get_file_slices(self) -> List[HudiFileSlice]: ...
- def read_file_slice(self, base_file_relative_path) ->
List["pyarrow.RecordBatch"]: ...
+ def read_file_slice(self, base_file_relative_path) -> pyarrow.RecordBatch:
...
def read_snapshot(self) -> List["pyarrow.RecordBatch"]: ...
diff --git a/python/hudi/table.py b/python/hudi/table.py
index 943f423..def024d 100644
--- a/python/hudi/table.py
+++ b/python/hudi/table.py
@@ -47,7 +47,7 @@ class HudiTable:
def get_file_slices(self) -> List[HudiFileSlice]:
return self._table.get_file_slices()
- def read_file_slice(self, base_file_relative_path: str) ->
List["pyarrow.RecordBatch"]:
+ def read_file_slice(self, base_file_relative_path: str) ->
"pyarrow.RecordBatch":
return self._table.read_file_slice(base_file_relative_path)
def read_snapshot(self) -> List["pyarrow.RecordBatch"]:
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index aec6d70..883958c 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -36,15 +36,15 @@ def test_sample_table(get_sample_table):
assert len(file_slices) == 5
assert set(f.commit_time for f in file_slices) == {'20240402123035233',
'20240402144910683'}
assert all(f.num_records == 1 for f in file_slices)
- file_slice_paths = [f.base_file_relative_path() for f in file_slices]
+ file_slice_paths = [f.base_file_relative_path for f in file_slices]
assert set(file_slice_paths) ==
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
- batches = table.read_file_slice(file_slice_paths[0])
- t = pa.Table.from_batches(batches)
+ batch = table.read_file_slice(file_slice_paths[0])
+ t = pa.Table.from_batches([batch])
assert t.num_rows == 1
assert t.num_columns == 11