This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d5f2231 refactor: improve thread safety and error handling (#32)
d5f2231 is described below
commit d5f2231d838c854f6992902855df9732cfd878cf
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jul 1 20:03:53 2024 -0500
refactor: improve thread safety and error handling (#32)
---
Cargo.toml | 1 +
crates/core/Cargo.toml | 1 +
crates/core/src/file_group/mod.rs | 24 ++-
crates/core/src/lib.rs | 1 -
crates/core/src/storage/mod.rs | 45 ++---
crates/core/src/table/fs_view.rs | 182 ++++++++++-----------
crates/core/src/table/mod.rs | 162 +++++++++---------
.../src/{timeline/mod.rs => table/timeline.rs} | 85 +++++++---
crates/datafusion/Cargo.toml | 1 +
crates/datafusion/src/lib.rs | 47 +++---
python/Cargo.toml | 4 +-
python/hudi/_internal.pyi | 2 +-
python/src/lib.rs | 64 ++++----
python/tests/test_table_read.py | 2 +-
14 files changed, 337 insertions(+), 284 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index e3c5b99..1b66057 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -68,6 +68,7 @@ uuid = { version = "1" }
# runtime / async
async-trait = { version = "0.1" }
async-recursion = { version = "1.1.1" }
+dashmap = { version = "6.0.1" }
futures = { version = "0.3" }
tokio = { version = "1", features = ["rt-multi-thread"]}
num_cpus = { version = "1" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 6363804..5d04337 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -67,5 +67,6 @@ url = { workspace = true }
# runtime / async
async-recursion = { workspace = true }
async-trait = { workspace = true }
+dashmap = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index ece19a4..6b9b22c 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,9 +22,11 @@ use std::fmt;
use std::fmt::Formatter;
use std::path::PathBuf;
+use anyhow::{anyhow, Result};
+
use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
-use anyhow::{anyhow, Result};
+use crate::storage::Storage;
#[derive(Clone, Debug)]
pub struct BaseFile {
@@ -63,10 +65,6 @@ impl BaseFile {
stats: None,
})
}
-
- pub fn populate_stats(&mut self, stats: FileStats) {
- self.stats = Some(stats)
- }
}
#[derive(Clone, Debug)]
@@ -81,9 +79,9 @@ impl FileSlice {
}
pub fn base_file_relative_path(&self) -> String {
- let partition_path = self.partition_path.clone().unwrap_or_default();
+ let ptn = self.partition_path.as_deref().unwrap_or_default();
let file_name = &self.base_file.info.name;
- PathBuf::from(partition_path)
+ PathBuf::from(ptn)
.join(file_name)
.to_str()
.unwrap()
@@ -97,6 +95,18 @@ impl FileSlice {
pub fn set_base_file(&mut self, base_file: BaseFile) {
self.base_file = base_file
}
+
+ pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> {
+ if self.base_file.stats.is_none() {
+ let parquet_meta = storage
+ .get_parquet_file_metadata(&self.base_file_relative_path())
+ .await;
+ let num_records = parquet_meta.file_metadata().num_rows();
+ let stats = FileStats { num_records };
+ self.base_file.stats = Some(stats);
+ }
+ Ok(())
+ }
}
#[derive(Clone, Debug)]
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 1586ff8..533d0fb 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,7 +23,6 @@ pub mod file_group;
pub mod table;
pub type HudiTable = Table;
mod storage;
-mod timeline;
pub fn crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index b35f30d..76e085f 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
+use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use async_recursion::async_recursion;
use bytes::Bytes;
@@ -40,20 +41,23 @@ pub(crate) mod file_stats;
pub(crate) mod utils;
#[allow(dead_code)]
+#[derive(Clone, Debug)]
pub struct Storage {
- base_url: Url,
+ base_url: Arc<Url>,
+ options: Arc<HashMap<String, String>>,
object_store: Arc<dyn ObjectStore>,
- options: HashMap<String, String>,
}
impl Storage {
- pub fn new(base_url: Url, options: HashMap<String, String>) ->
Box<Storage> {
- let object_store = parse_url_opts(&base_url, &options).unwrap().0;
- Box::from(Storage {
- base_url,
- object_store: Arc::new(object_store),
- options,
- })
+ pub fn new(base_url: Arc<Url>, options: Arc<HashMap<String, String>>) ->
Result<Arc<Storage>> {
+ match parse_url_opts(&base_url, &*options) {
+ Ok(object_store) => Ok(Arc::new(Storage {
+ base_url,
+ options,
+ object_store: Arc::new(object_store.0),
+ })),
+ Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
+ }
}
#[allow(dead_code)]
@@ -167,6 +171,7 @@ mod tests {
use std::collections::{HashMap, HashSet};
use std::fs::canonicalize;
use std::path::Path;
+ use std::sync::Arc;
use object_store::path::Path as ObjPath;
use url::Url;
@@ -181,7 +186,7 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url, HashMap::new());
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.into_iter().collect();
assert_eq!(
first_level_dirs,
@@ -202,7 +207,7 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url, HashMap::new());
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
let first_level_dirs: HashSet<ObjPath> = storage
.list_dirs_as_obj_paths(None)
.await
@@ -224,12 +229,12 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url.clone(), HashMap::new());
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
let file_info_1: Vec<FileInfo> =
storage.list_files(None).await.into_iter().collect();
assert_eq!(
file_info_1,
vec![FileInfo {
- uri: base_url.clone().join("a.parquet").unwrap().to_string(),
+ uri: storage.base_url.join("a.parquet").unwrap().to_string(),
name: "a.parquet".to_string(),
size: 0,
}]
@@ -242,8 +247,8 @@ mod tests {
assert_eq!(
file_info_2,
vec![FileInfo {
- uri: base_url
- .clone()
+ uri: storage
+ .base_url
.join("part1/b.parquet")
.unwrap()
.to_string(),
@@ -259,8 +264,8 @@ mod tests {
assert_eq!(
file_info_3,
vec![FileInfo {
- uri: base_url
- .clone()
+ uri: storage
+ .base_url
.join("part2/part22/c.parquet")
.unwrap()
.to_string(),
@@ -276,7 +281,7 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url, HashMap::new());
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await;
assert_eq!(
leaf_dirs,
@@ -288,7 +293,7 @@ mod tests {
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
- let storage = Storage::new(base_url, HashMap::new());
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
let file_info = storage.get_file_info("a.parquet").await;
assert_eq!(file_info.name, "a.parquet");
assert_eq!(
@@ -302,7 +307,7 @@ mod tests {
async fn storage_get_parquet_file_data() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
- let storage = Storage::new(base_url, HashMap::new());
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
let file_data = storage.get_parquet_file_data("a.parquet").await;
assert_eq!(file_data.len(), 1);
assert_eq!(file_data.first().unwrap().num_rows(), 5);
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index f1976d8..8f278dd 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -18,33 +18,44 @@
*/
use std::collections::HashMap;
-use std::path::PathBuf;
+use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
+use dashmap::DashMap;
use url::Url;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
-use crate::storage::file_stats::FileStats;
use crate::storage::{get_leaf_dirs, Storage};
#[derive(Clone, Debug)]
+#[allow(dead_code)]
pub struct FileSystemView {
- pub base_url: Url,
- partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
+ props: Arc<HashMap<String, String>>,
+ storage: Arc<Storage>,
+ partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}
impl FileSystemView {
- pub fn new(base_url: Url) -> Self {
- FileSystemView {
- base_url,
- partition_to_file_groups: HashMap::new(),
- }
+ pub async fn new(
+ base_url: Arc<Url>,
+ storage_options: Arc<HashMap<String, String>>,
+ props: Arc<HashMap<String, String>>,
+ ) -> Result<Self> {
+ let storage = Storage::new(base_url, storage_options)?;
+ let partition_paths = Self::get_partition_paths(&storage).await?;
+ let partition_to_file_groups =
+ Self::load_file_groups_for_partitions(&storage,
partition_paths).await?;
+ let partition_to_file_groups =
Arc::new(DashMap::from_iter(partition_to_file_groups));
+ Ok(FileSystemView {
+ props,
+ storage,
+ partition_to_file_groups,
+ })
}
- async fn get_partition_paths(&self) -> Result<Vec<String>> {
- let storage = Storage::new(self.base_url.clone(), HashMap::new());
+ async fn get_partition_paths(storage: &Storage) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await
@@ -53,19 +64,41 @@ impl FileSystemView {
.collect();
let mut partition_paths = Vec::new();
for dir in top_level_dirs {
- partition_paths.extend(get_leaf_dirs(&storage, Some(&dir)).await);
+ partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await);
+ }
+ if partition_paths.is_empty() {
+ partition_paths.push("".to_string())
}
Ok(partition_paths)
}
- async fn get_file_groups(&self, partition_path: &str) ->
Result<Vec<FileGroup>> {
- let storage = Storage::new(self.base_url.clone(), HashMap::new());
+ async fn load_file_groups_for_partitions(
+ storage: &Storage,
+ partition_paths: Vec<String>,
+ ) -> Result<HashMap<String, Vec<FileGroup>>> {
+ let mut partition_to_file_groups = HashMap::new();
+ for p in partition_paths {
+ match Self::load_file_groups_for_partition(storage,
p.as_str()).await {
+ Ok(file_groups) => {
+ partition_to_file_groups.insert(p, file_groups);
+ }
+ Err(e) => return Err(anyhow!("Failed to load partitions: {}",
e)),
+ }
+ }
+ Ok(partition_to_file_groups)
+ }
+
+ async fn load_file_groups_for_partition(
+ storage: &Storage,
+ partition_path: &str,
+ ) -> Result<Vec<FileGroup>> {
let file_info: Vec<FileInfo> = storage
.list_files(Some(partition_path))
.await
.into_iter()
.filter(|f| f.name.ends_with(".parquet"))
.collect();
+
let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
for f in file_info {
let base_file = BaseFile::from_file_info(f)?;
@@ -87,109 +120,68 @@ impl FileSystemView {
Ok(file_groups)
}
- pub async fn load_file_groups(&mut self) {
- let fs_view = self.clone();
- let result = get_partitions_and_file_groups(&fs_view).await.unwrap();
- for (k, v) in result {
- self.partition_to_file_groups.insert(k, v);
- }
- }
-
- pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+ pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
- for fgs in self.partition_to_file_groups.values() {
- for fg in fgs {
- if let Some(file_slice) = fg.get_latest_file_slice() {
- file_slices.push(file_slice)
+ for fgs in self.partition_to_file_groups.iter() {
+ let fgs_ref = fgs.value();
+ for fg in fgs_ref {
+ if let Some(fsl) = fg.get_latest_file_slice() {
+ file_slices.push(fsl.clone())
}
}
}
- file_slices
+ Ok(file_slices)
}
- pub async fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut
FileSlice> {
- let mut file_slices = Vec::new();
- let file_groups = &mut self.partition_to_file_groups.values_mut();
- for fgs in file_groups {
- for fg in fgs {
+ pub async fn load_latest_file_slices_stats(&self) -> Result<()> {
+ for mut fgs in self.partition_to_file_groups.iter_mut() {
+ let fgs_ref = fgs.value_mut();
+ for fg in fgs_ref {
if let Some(file_slice) = fg.get_latest_file_slice_mut() {
- let _ = load_file_slice_stats(&self.base_url,
file_slice).await;
- file_slices.push(file_slice)
+ file_slice
+ .load_stats(&self.storage)
+ .await
+ .expect("Successful loading file stats.");
}
}
}
- file_slices
+ Ok(())
}
- pub async fn read_file_slice(&self, relative_path: &str) ->
Vec<RecordBatch> {
- let storage = Storage::new(self.base_url.clone(), HashMap::new());
- storage.get_parquet_file_data(relative_path).await
+ pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<Vec<RecordBatch>> {
+ Ok(self.storage.get_parquet_file_data(relative_path).await)
}
-}
-
-async fn load_file_slice_stats(base_url: &Url, file_slice: &mut FileSlice) ->
Result<()> {
- let base_file = &mut file_slice.base_file;
- if base_file.stats.is_none() {
- let storage = Storage::new(base_url.clone(), HashMap::new());
- let ptn = file_slice.partition_path.clone();
- let mut relative_path = PathBuf::from(ptn.unwrap_or("".to_string()));
- let base_file_name = &base_file.info.name;
- relative_path.push(base_file_name);
- let parquet_meta = storage
- .get_parquet_file_metadata(relative_path.to_str().unwrap())
- .await;
- let num_records = parquet_meta.file_metadata().num_rows();
- base_file.populate_stats(FileStats { num_records });
- }
- Ok(())
-}
-
-async fn get_partitions_and_file_groups(
- fs_view: &FileSystemView,
-) -> Result<HashMap<String, Vec<FileGroup>>> {
- match fs_view.get_partition_paths().await {
- Ok(mut partition_paths) => {
- if partition_paths.is_empty() {
- partition_paths.push("".to_string());
- }
- let mut partition_to_file_groups = HashMap::new();
- for p in partition_paths {
- match fs_view.get_file_groups(p.as_str()).await {
- Ok(file_groups) => {
- partition_to_file_groups.insert(p, file_groups);
- }
- Err(e) => return Err(anyhow!("Failed to load partitions:
{}", e)),
- }
- }
- Ok(partition_to_file_groups)
- }
- Err(e) => Err(anyhow!("Failed to load partitions: {}", e)),
+ pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<Vec<RecordBatch>> {
+ self.read_file_slice_by_path(&file_slice.base_file_relative_path())
+ .await
}
}
#[cfg(test)]
mod tests {
- use std::collections::HashSet;
+ use std::collections::{HashMap, HashSet};
+ use std::sync::Arc;
use hudi_tests::TestTable;
+ use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
- let fs_view = FileSystemView::new(base_url);
- let partition_paths = fs_view.get_partition_paths().await.unwrap();
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let partition_paths =
FileSystemView::get_partition_paths(&storage).await.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
- assert_eq!(partition_path_set, HashSet::new(),)
+ assert_eq!(partition_path_set, HashSet::from([""]))
}
#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let fs_view = FileSystemView::new(base_url);
- let partition_paths = fs_view.get_partition_paths().await.unwrap();
+ let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
+ let partition_paths =
FileSystemView::get_partition_paths(&storage).await.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
@@ -203,17 +195,21 @@ mod tests {
}
#[tokio::test]
- async fn get_latest_file_slices() {
+ async fn fs_view_get_latest_file_slices() {
let base_url = TestTable::V6Nonpartitioned.url();
- let mut fs_view = FileSystemView::new(base_url);
- fs_view.load_file_groups().await;
- let file_slices = fs_view.get_latest_file_slices();
+ let fs_view = FileSystemView::new(
+ Arc::new(base_url),
+ Arc::new(HashMap::new()),
+ Arc::new(HashMap::new()),
+ )
+ .await
+ .unwrap();
+ let file_slices = fs_view.get_latest_file_slices().unwrap();
assert_eq!(file_slices.len(), 1);
- let mut fg_ids = Vec::new();
- for f in file_slices {
- let fp = f.file_group_id();
- fg_ids.push(fp);
- }
+ let fg_ids = file_slices
+ .iter()
+ .map(|fsl| fsl.file_group_id())
+ .collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 86f1008..825b2d4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -21,10 +21,11 @@ use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
+use std::sync::Arc;
-use anyhow::Result;
+use anyhow::{anyhow, Context, Result};
use arrow::record_batch::RecordBatch;
-use arrow_schema::SchemaRef;
+use arrow_schema::Schema;
use url::Url;
use crate::file_group::FileSlice;
@@ -33,53 +34,60 @@ use crate::table::config::BaseFileFormat;
use crate::table::config::{ConfigKey, TableType};
use crate::table::fs_view::FileSystemView;
use crate::table::metadata::ProvidesTableMetadata;
-use crate::timeline::Timeline;
+use crate::table::timeline::Timeline;
mod config;
mod fs_view;
mod metadata;
+mod timeline;
#[derive(Debug, Clone)]
-#[allow(dead_code)]
pub struct Table {
- pub base_url: Url,
- pub props: HashMap<String, String>,
- pub file_system_view: Option<FileSystemView>,
- pub storage_options: HashMap<String, String>,
+ pub base_url: Arc<Url>,
+ pub storage_options: Arc<HashMap<String, String>>,
+ pub props: Arc<HashMap<String, String>>,
+ pub timeline: Timeline,
+ pub file_system_view: FileSystemView,
}
impl Table {
- pub async fn new(base_uri: &str, storage_options: HashMap<String, String>)
-> Self {
- let base_url =
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
- match Self::load_properties(
- base_url.clone(),
- ".hoodie/hoodie.properties".to_string(),
- storage_options.clone(),
- )
- .await
- {
- Ok(props) => Self {
- base_url,
- props,
- file_system_view: None,
- storage_options,
- },
- Err(e) => {
- panic!("Failed to load table properties: {}", e)
- }
- }
+ pub async fn new(base_uri: &str, storage_options: HashMap<String, String>)
-> Result<Self> {
+ let base_url = Url::from_file_path(PathBuf::from(base_uri))
+ .map_err(|_| anyhow!("Failed to create table URL: {}", base_uri))?;
+ let base_url = Arc::new(base_url);
+ let storage_options = Arc::new(storage_options);
+
+ let props = Self::load_properties(base_url.clone(),
storage_options.clone())
+ .await
+ .context("Failed to create a table")?;
+
+ let props = Arc::new(props);
+ let timeline = Timeline::new(base_url.clone(),
storage_options.clone(), props.clone())
+ .await
+ .context("Failed to load timeline")?;
+
+ let file_system_view =
+ FileSystemView::new(base_url.clone(), storage_options.clone(),
props.clone())
+ .await
+ .context("Failed to load file system view")?;
+
+ Ok(Table {
+ base_url,
+ storage_options,
+ props,
+ timeline,
+ file_system_view,
+ })
}
async fn load_properties(
- base_url: Url,
- props_path: String,
- storage_options: HashMap<String, String>,
+ base_url: Arc<Url>,
+ storage_options: Arc<HashMap<String, String>>,
) -> Result<HashMap<String, String>> {
- let storage = Storage::new(base_url, storage_options);
- let data = storage.get_file_data(props_path.as_str()).await;
+ let storage = Storage::new(base_url, storage_options)?;
+ let data = storage.get_file_data(".hoodie/hoodie.properties").await;
let cursor = std::io::Cursor::new(data);
- let reader = BufReader::new(cursor);
- let lines = reader.lines();
+ let lines = BufReader::new(cursor).lines();
let mut properties: HashMap<String, String> = HashMap::new();
for line in lines {
let line = line?;
@@ -102,59 +110,35 @@ impl Table {
}
}
- #[cfg(test)]
- async fn get_timeline(&self) -> Result<Timeline> {
- Timeline::new(self.base_url.clone()).await
+ pub async fn get_schema(&self) -> Result<Schema> {
+ self.timeline.get_latest_schema().await
}
- pub async fn get_latest_schema(&self) -> SchemaRef {
- let timeline_result = Timeline::new(self.base_url.clone()).await;
- match timeline_result {
- Ok(timeline) => {
- let schema_result = timeline.get_latest_schema().await;
- match schema_result {
- Ok(schema) => SchemaRef::from(schema),
- Err(e) => panic!("Failed to resolve table schema: {}", e),
- }
- }
- Err(e) => panic!("Failed to resolve table schema: {}", e),
- }
+ pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
+ self.file_system_view
+ .load_latest_file_slices_stats()
+ .await
+ .expect("Successful loading of file slice stats.");
+ self.file_system_view.get_latest_file_slices()
}
- pub async fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
- if self.file_system_view.is_none() {
- let mut new_fs_view = FileSystemView::new(self.base_url.clone());
- new_fs_view.load_file_groups().await;
- self.file_system_view = Some(new_fs_view);
- }
-
- let fs_view = self.file_system_view.as_mut().unwrap();
-
- let mut file_slices = Vec::new();
- for f in fs_view.get_latest_file_slices_with_stats().await {
- file_slices.push(f.clone());
+ #[cfg(test)]
+ async fn get_file_paths(&self) -> Result<Vec<String>> {
+ let mut file_paths = Vec::new();
+ for f in self.get_file_slices().await? {
+ file_paths.push(f.base_file_path().to_string());
}
- Ok(file_slices)
+ Ok(file_paths)
}
- pub async fn read_file_slice(&mut self, relative_path: &str) ->
Vec<RecordBatch> {
- if self.file_system_view.is_none() {
- let mut new_fs_view = FileSystemView::new(self.base_url.clone());
- new_fs_view.load_file_groups().await;
- self.file_system_view = Some(new_fs_view);
- }
-
- let fs_view = self.file_system_view.as_ref().unwrap();
- fs_view.read_file_slice(relative_path).await
+ pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<Vec<RecordBatch>> {
+ self.file_system_view
+ .read_file_slice_by_path(relative_path)
+ .await
}
- pub async fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
- let mut file_paths = Vec::new();
- for f in self.get_latest_file_slices().await? {
- file_paths.push(f.base_file_path().to_string());
- }
- println!("{:?}", file_paths);
- Ok(file_paths)
+ pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<Vec<RecordBatch>> {
+ self.file_system_view.read_file_slice(file_slice).await
}
}
@@ -258,10 +242,11 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_latest_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+ let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
let fields: Vec<String> = hudi_table
- .get_latest_schema()
+ .get_schema()
.await
+ .unwrap()
.all_fields()
.into_iter()
.map(|f| f.name().to_string())
@@ -310,12 +295,13 @@ mod tests {
#[tokio::test]
async fn hudi_table_read_file_slice() {
let base_url = TestTable::V6Nonpartitioned.url();
- let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+ let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
let batches = hudi_table
- .read_file_slice(
+ .read_file_slice_by_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
- .await;
+ .await
+ .unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches.first().unwrap().num_rows(), 4);
assert_eq!(batches.first().unwrap().num_columns(), 21);
@@ -324,10 +310,10 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_latest_file_paths() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
- assert_eq!(hudi_table.get_timeline().await.unwrap().instants.len(), 2);
+ let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+ assert_eq!(hudi_table.timeline.instants.len(), 2);
let actual: HashSet<String> =
-
HashSet::from_iter(hudi_table.get_latest_file_paths().await.unwrap());
+ HashSet::from_iter(hudi_table.get_file_paths().await.unwrap());
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
@@ -342,7 +328,9 @@ mod tests {
async fn hudi_table_get_table_metadata() {
let base_path =
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
- let table = Table::new(base_path.to_str().unwrap(),
HashMap::new()).await;
+ let table = Table::new(base_path.to_str().unwrap(), HashMap::new())
+ .await
+ .unwrap();
assert_eq!(table.base_file_format(), Parquet);
assert_eq!(table.checksum(), 3761586722);
assert_eq!(table.database_name(), "default");
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/table/timeline.rs
similarity index 69%
rename from crates/core/src/timeline/mod.rs
rename to crates/core/src/table/timeline.rs
index 311751a..9dcf6e2 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/table/timeline.rs
@@ -17,12 +17,14 @@
* under the License.
*/
+use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::PathBuf;
+use std::sync::Arc;
use anyhow::{anyhow, Result};
-use arrow_schema::SchemaRef;
+use arrow_schema::Schema;
use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
use url::Url;
@@ -31,20 +33,32 @@ use crate::storage::utils::split_filename;
use crate::storage::Storage;
#[allow(dead_code)]
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
pub enum State {
Requested,
Inflight,
Completed,
}
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Instant {
state: State,
action: String,
timestamp: String,
}
+impl PartialOrd for Instant {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.timestamp.cmp(&other.timestamp))
+ }
+}
+
+impl Ord for Instant {
+ fn cmp(&self, other: &Self) -> Ordering {
+ self.timestamp.cmp(&other.timestamp)
+ }
+}
+
impl Instant {
pub fn state_suffix(&self) -> String {
match self.state {
@@ -59,20 +73,30 @@ impl Instant {
}
}
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
+#[allow(dead_code)]
pub struct Timeline {
- pub base_url: Url,
+ props: Arc<HashMap<String, String>>,
+ storage: Arc<Storage>,
pub instants: Vec<Instant>,
}
impl Timeline {
- pub async fn new(base_url: Url) -> Result<Self> {
- let instants = Self::load_completed_commit_instants(&base_url).await?;
- Ok(Self { base_url, instants })
+ pub async fn new(
+ base_url: Arc<Url>,
+ storage_options: Arc<HashMap<String, String>>,
+ props: Arc<HashMap<String, String>>,
+ ) -> Result<Self> {
+ let storage = Storage::new(base_url, storage_options)?;
+ let instants = Self::load_completed_commit_instants(&storage).await?;
+ Ok(Self {
+ storage,
+ props,
+ instants,
+ })
}
- async fn load_completed_commit_instants(base_url: &Url) ->
Result<Vec<Instant>> {
- let storage = Storage::new(base_url.clone(), HashMap::new());
+ async fn load_completed_commit_instants(storage: &Storage) ->
Result<Vec<Instant>> {
let mut completed_commits = Vec::new();
for file_info in storage.list_files(Some(".hoodie")).await {
let (file_stem, file_ext) =
split_filename(file_info.name.as_str())?;
@@ -84,8 +108,7 @@ impl Timeline {
})
}
}
- // TODO: encapsulate sorting within Instant
- completed_commits.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
+ completed_commits.sort();
Ok(completed_commits)
}
@@ -94,10 +117,11 @@ impl Timeline {
Some(instant) => {
let mut commit_file_path = PathBuf::from(".hoodie");
commit_file_path.push(instant.file_name());
- let storage = Storage::new(self.base_url.clone(),
HashMap::new());
- let bytes = storage
- .get_file_data(commit_file_path.to_str().unwrap())
- .await;
+ let relative_path = commit_file_path.to_str().ok_or(anyhow!(
+ "Failed to get commit file path for instant: {:?}",
+ instant
+ ));
+ let bytes = self.storage.get_file_data(relative_path?).await;
let json: Value = serde_json::from_slice(&bytes)?;
let commit_metadata = json
.as_object()
@@ -109,20 +133,19 @@ impl Timeline {
}
}
- pub async fn get_latest_schema(&self) -> Result<SchemaRef> {
- let commit_metadata = self.get_latest_commit_metadata().await.unwrap();
+ pub async fn get_latest_schema(&self) -> Result<Schema> {
+ let commit_metadata = self.get_latest_commit_metadata().await?;
if let Some(partition_to_write_stats) =
commit_metadata["partitionToWriteStats"].as_object()
{
if let Some((_, value)) = partition_to_write_stats.iter().next() {
if let Some(first_value) = value.as_array().and_then(|arr|
arr.first()) {
if let Some(path) = first_value["path"].as_str() {
- let storage = Storage::new(self.base_url.clone(),
HashMap::new());
- let parquet_meta =
storage.get_parquet_file_metadata(path).await;
+ let parquet_meta =
self.storage.get_parquet_file_metadata(path).await;
let arrow_schema = parquet_to_arrow_schema(
parquet_meta.file_metadata().schema_descr(),
None,
)?;
- return Ok(SchemaRef::from(arrow_schema));
+ return Ok(arrow_schema);
}
}
}
@@ -133,19 +156,27 @@ impl Timeline {
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
use std::fs::canonicalize;
use std::path::Path;
+ use std::sync::Arc;
use url::Url;
use hudi_tests::TestTable;
- use crate::timeline::{Instant, State, Timeline};
+ use crate::table::timeline::{Instant, State, Timeline};
#[tokio::test]
async fn read_latest_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
- let timeline = Timeline::new(base_url).await.unwrap();
+ let timeline = Timeline::new(
+ Arc::new(base_url),
+ Arc::new(HashMap::new()),
+ Arc::new(HashMap::new()),
+ )
+ .await
+ .unwrap();
let table_schema = timeline.get_latest_schema().await.unwrap();
assert_eq!(table_schema.fields.len(), 21)
}
@@ -155,7 +186,13 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap())
.unwrap();
- let timeline = Timeline::new(base_url).await.unwrap();
+ let timeline = Timeline::new(
+ Arc::new(base_url),
+ Arc::new(HashMap::new()),
+ Arc::new(HashMap::new()),
+ )
+ .await
+ .unwrap();
assert_eq!(
timeline.instants,
vec![
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 84c17a5..bb1f5df 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -59,6 +59,7 @@ serde_json = { workspace = true }
tokio = { workspace = true }
# "stdlib"
+anyhow = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "0.14.3"
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f064961..f4a1bba 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -39,13 +39,20 @@ use hudi_core::HudiTable;
#[derive(Debug, Clone)]
pub struct HudiDataSource {
- table: HudiTable,
+ table: Arc<HudiTable>,
}
impl HudiDataSource {
- pub async fn new(base_uri: &str, storage_options: HashMap<String, String>)
-> Self {
- Self {
- table: HudiTable::new(base_uri, storage_options).await,
+ pub async fn new(
+ base_uri: &str,
+ storage_options: HashMap<String, String>,
+ ) -> datafusion_common::Result<Self> {
+ match HudiTable::new(base_uri, storage_options).await {
+ Ok(t) => Ok(Self { table: Arc::new(t) }),
+ Err(e) => Err(DataFusionError::Execution(format!(
+ "Failed to create Hudi table: {}",
+ e
+ ))),
}
}
@@ -58,20 +65,18 @@ impl HudiDataSource {
}
async fn get_record_batches(&mut self) ->
datafusion_common::Result<Vec<RecordBatch>> {
- match self.table.get_latest_file_slices().await {
- Ok(file_slices) => {
- let mut record_batches = Vec::new();
- for f in file_slices {
- let relative_path = f.base_file_relative_path();
- let records =
self.table.read_file_slice(&relative_path).await;
- record_batches.extend(records)
- }
- Ok(record_batches)
- }
- Err(_e) => Err(DataFusionError::Execution(
- "Failed to read records from table.".to_owned(),
- )),
+ let file_slices = self.table.get_file_slices().await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to load file slices
from table: {}", e))
+ })?;
+
+ let mut record_batches = Vec::new();
+ for fsl in file_slices {
+ let batches = self.table.read_file_slice(&fsl).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to read records
from table: {}", e))
+ })?;
+ record_batches.extend(batches)
}
+ Ok(record_batches)
}
}
@@ -85,9 +90,9 @@ impl TableProvider for HudiDataSource {
let table = self.table.clone();
let handle = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
- rt.block_on(async { table.get_latest_schema().await })
+ rt.block_on(async { table.get_schema().await })
});
- handle.join().unwrap()
+ SchemaRef::from(handle.join().unwrap().unwrap())
}
fn table_type(&self) -> TableType {
@@ -196,7 +201,9 @@ mod tests {
);
let ctx = SessionContext::new_with_config(config);
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let hudi = HudiDataSource::new(base_url.path(), HashMap::new()).await;
+ let hudi = HudiDataSource::new(base_url.path(), HashMap::new())
+ .await
+ .unwrap();
ctx.register_table("hudi_table_complexkeygen", Arc::new(hudi))
.unwrap();
let df: DataFrame = ctx
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 3db2cc1..8f1d079 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -42,13 +42,15 @@ arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
+anyhow = { workspace = true }
+
# runtime / async
futures = { workspace = true }
tokio = { workspace = true }
[dependencies.pyo3]
version = "0.20.3"
-features = ["extension-module", "abi3", "abi3-py38"]
+features = ["extension-module", "abi3", "abi3-py38", "anyhow"]
[dependencies.hudi]
path = "../crates/hudi"
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index b34c25a..83ed929 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -30,10 +30,10 @@ class HudiFileSlice:
partition_path: str
commit_time: str
base_file_name: str
- base_file_path: str
base_file_size: int
num_records: int
+ def base_file_relative_path(self) -> str: ...
class BindingHudiTable:
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 89851bf..2436ee5 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::OnceLock;
+use anyhow::anyhow;
use arrow::pyarrow::ToPyArrow;
use pyo3::prelude::*;
use tokio::runtime::Runtime;
@@ -29,6 +30,7 @@ use hudi::file_group::FileSlice;
use hudi::HudiTable;
#[cfg(not(tarpaulin))]
+#[derive(Clone, Debug)]
#[pyclass]
struct HudiFileSlice {
#[pyo3(get)]
@@ -40,32 +42,45 @@ struct HudiFileSlice {
#[pyo3(get)]
base_file_name: String,
#[pyo3(get)]
- base_file_path: String,
- #[pyo3(get)]
base_file_size: usize,
#[pyo3(get)]
num_records: i64,
}
#[cfg(not(tarpaulin))]
+#[pymethods]
impl HudiFileSlice {
- pub fn from_file_slice(f: FileSlice) -> Self {
- let partition_path =
f.partition_path.clone().unwrap_or("".to_string());
- let mut p = PathBuf::from(&partition_path);
- p.push(f.base_file.info.name.clone());
- let base_file_path = p.to_str().unwrap().to_string();
- Self {
- file_group_id: f.file_group_id().to_string(),
- partition_path,
- commit_time: f.base_file.commit_time,
- base_file_name: f.base_file.info.name,
- base_file_path,
- base_file_size: f.base_file.info.size,
- num_records: f.base_file.stats.unwrap().num_records,
+ pub fn base_file_relative_path(&self) -> PyResult<String> {
+ let mut p = PathBuf::from(&self.partition_path);
+ p.push(&self.base_file_name);
+ match p.to_str() {
+ Some(s) => Ok(s.to_string()),
+ None => Err(PyErr::from(anyhow!(
+ "Failed to get base file relative path for file slice: {:?}",
+ self
+ ))),
}
}
}
+#[cfg(not(tarpaulin))]
+fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
+ let file_group_id = f.file_group_id().to_string();
+ let partition_path =
f.partition_path.as_deref().unwrap_or_default().to_string();
+ let commit_time = f.base_file.commit_time.to_string();
+ let base_file_name = f.base_file.info.name.clone();
+ let base_file_size = f.base_file.info.size;
+ let num_records =
f.base_file.stats.clone().unwrap_or_default().num_records;
+ HudiFileSlice {
+ file_group_id,
+ partition_path,
+ commit_time,
+ base_file_name,
+ base_file_size,
+ num_records,
+ }
+}
+
#[cfg(not(tarpaulin))]
#[pyclass]
struct BindingHudiTable {
@@ -81,32 +96,23 @@ impl BindingHudiTable {
let _table = rt().block_on(HudiTable::new(
table_uri,
storage_options.unwrap_or_default(),
- ));
+ ))?;
Ok(BindingHudiTable { _table })
}
pub fn schema(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self._table.get_latest_schema())
- .to_pyarrow(py)
+ rt().block_on(self._table.get_schema())?.to_pyarrow(py)
}
pub fn get_latest_file_slices(&mut self, py: Python) ->
PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
- let res = rt().block_on(self._table.get_latest_file_slices());
- match res {
- Ok(file_slices) => Ok(file_slices
- .into_iter()
- .map(HudiFileSlice::from_file_slice)
- .collect()),
- Err(_e) => {
- panic!("Failed to retrieve the latest file slices.")
- }
- }
+ let file_slices = rt().block_on(self._table.get_file_slices())?;
+ Ok(file_slices.iter().map(convert_file_slice).collect())
})
}
pub fn read_file_slice(&mut self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
- rt().block_on(self._table.read_file_slice(relative_path))
+ rt().block_on(self._table.read_file_slice_by_path(relative_path))?
.to_pyarrow(py)
}
}
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 20ce42c..f0266f0 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -36,7 +36,7 @@ def test_sample_table(get_sample_table):
assert len(file_slices) == 5
assert set(f.commit_time for f in file_slices) == {'20240402123035233',
'20240402144910683'}
assert all(f.num_records == 1 for f in file_slices)
- file_slice_paths = [f.base_file_path for f in file_slices]
+ file_slice_paths = [f.base_file_relative_path() for f in file_slices]
assert set(file_slice_paths) ==
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',