This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d5f2231  refactor: improve thread safety and error handling (#32)
d5f2231 is described below

commit d5f2231d838c854f6992902855df9732cfd878cf
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jul 1 20:03:53 2024 -0500

    refactor: improve thread safety and error handling (#32)
---
 Cargo.toml                                         |   1 +
 crates/core/Cargo.toml                             |   1 +
 crates/core/src/file_group/mod.rs                  |  24 ++-
 crates/core/src/lib.rs                             |   1 -
 crates/core/src/storage/mod.rs                     |  45 ++---
 crates/core/src/table/fs_view.rs                   | 182 ++++++++++-----------
 crates/core/src/table/mod.rs                       | 162 +++++++++---------
 .../src/{timeline/mod.rs => table/timeline.rs}     |  85 +++++++---
 crates/datafusion/Cargo.toml                       |   1 +
 crates/datafusion/src/lib.rs                       |  47 +++---
 python/Cargo.toml                                  |   4 +-
 python/hudi/_internal.pyi                          |   2 +-
 python/src/lib.rs                                  |  64 ++++----
 python/tests/test_table_read.py                    |   2 +-
 14 files changed, 337 insertions(+), 284 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index e3c5b99..1b66057 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -68,6 +68,7 @@ uuid = { version = "1" }
 # runtime / async
 async-trait = { version = "0.1" }
 async-recursion = { version = "1.1.1" }
+dashmap = { version = "6.0.1" }
 futures = { version = "0.3" }
 tokio = { version = "1", features = ["rt-multi-thread"]}
 num_cpus = { version = "1" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 6363804..5d04337 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -67,5 +67,6 @@ url = { workspace = true }
 # runtime / async
 async-recursion = { workspace = true }
 async-trait = { workspace = true }
+dashmap = { workspace = true }
 tokio = { workspace = true }
 futures = { workspace = true }
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index ece19a4..6b9b22c 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -22,9 +22,11 @@ use std::fmt;
 use std::fmt::Formatter;
 use std::path::PathBuf;
 
+use anyhow::{anyhow, Result};
+
 use crate::storage::file_info::FileInfo;
 use crate::storage::file_stats::FileStats;
-use anyhow::{anyhow, Result};
+use crate::storage::Storage;
 
 #[derive(Clone, Debug)]
 pub struct BaseFile {
@@ -63,10 +65,6 @@ impl BaseFile {
             stats: None,
         })
     }
-
-    pub fn populate_stats(&mut self, stats: FileStats) {
-        self.stats = Some(stats)
-    }
 }
 
 #[derive(Clone, Debug)]
@@ -81,9 +79,9 @@ impl FileSlice {
     }
 
     pub fn base_file_relative_path(&self) -> String {
-        let partition_path = self.partition_path.clone().unwrap_or_default();
+        let ptn = self.partition_path.as_deref().unwrap_or_default();
         let file_name = &self.base_file.info.name;
-        PathBuf::from(partition_path)
+        PathBuf::from(ptn)
             .join(file_name)
             .to_str()
             .unwrap()
@@ -97,6 +95,18 @@ impl FileSlice {
     pub fn set_base_file(&mut self, base_file: BaseFile) {
         self.base_file = base_file
     }
+
+    pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> {
+        if self.base_file.stats.is_none() {
+            let parquet_meta = storage
+                .get_parquet_file_metadata(&self.base_file_relative_path())
+                .await;
+            let num_records = parquet_meta.file_metadata().num_rows();
+            let stats = FileStats { num_records };
+            self.base_file.stats = Some(stats);
+        }
+        Ok(())
+    }
 }
 
 #[derive(Clone, Debug)]
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 1586ff8..533d0fb 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,7 +23,6 @@ pub mod file_group;
 pub mod table;
 pub type HudiTable = Table;
 mod storage;
-mod timeline;
 
 pub fn crate_version() -> &'static str {
     env!("CARGO_PKG_VERSION")
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index b35f30d..76e085f 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -21,6 +21,7 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
+use anyhow::{anyhow, Result};
 use arrow::record_batch::RecordBatch;
 use async_recursion::async_recursion;
 use bytes::Bytes;
@@ -40,20 +41,23 @@ pub(crate) mod file_stats;
 pub(crate) mod utils;
 
 #[allow(dead_code)]
+#[derive(Clone, Debug)]
 pub struct Storage {
-    base_url: Url,
+    base_url: Arc<Url>,
+    options: Arc<HashMap<String, String>>,
     object_store: Arc<dyn ObjectStore>,
-    options: HashMap<String, String>,
 }
 
 impl Storage {
-    pub fn new(base_url: Url, options: HashMap<String, String>) -> 
Box<Storage> {
-        let object_store = parse_url_opts(&base_url, &options).unwrap().0;
-        Box::from(Storage {
-            base_url,
-            object_store: Arc::new(object_store),
-            options,
-        })
+    pub fn new(base_url: Arc<Url>, options: Arc<HashMap<String, String>>) -> 
Result<Arc<Storage>> {
+        match parse_url_opts(&base_url, &*options) {
+            Ok(object_store) => Ok(Arc::new(Storage {
+                base_url,
+                options,
+                object_store: Arc::new(object_store.0),
+            })),
+            Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
+        }
     }
 
     #[allow(dead_code)]
@@ -167,6 +171,7 @@ mod tests {
     use std::collections::{HashMap, HashSet};
     use std::fs::canonicalize;
     use std::path::Path;
+    use std::sync::Arc;
 
     use object_store::path::Path as ObjPath;
     use url::Url;
@@ -181,7 +186,7 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url, HashMap::new());
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
         let first_level_dirs: HashSet<String> = 
storage.list_dirs(None).await.into_iter().collect();
         assert_eq!(
             first_level_dirs,
@@ -202,7 +207,7 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url, HashMap::new());
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
         let first_level_dirs: HashSet<ObjPath> = storage
             .list_dirs_as_obj_paths(None)
             .await
@@ -224,12 +229,12 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url.clone(), HashMap::new());
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
         let file_info_1: Vec<FileInfo> = 
storage.list_files(None).await.into_iter().collect();
         assert_eq!(
             file_info_1,
             vec![FileInfo {
-                uri: base_url.clone().join("a.parquet").unwrap().to_string(),
+                uri: storage.base_url.join("a.parquet").unwrap().to_string(),
                 name: "a.parquet".to_string(),
                 size: 0,
             }]
@@ -242,8 +247,8 @@ mod tests {
         assert_eq!(
             file_info_2,
             vec![FileInfo {
-                uri: base_url
-                    .clone()
+                uri: storage
+                    .base_url
                     .join("part1/b.parquet")
                     .unwrap()
                     .to_string(),
@@ -259,8 +264,8 @@ mod tests {
         assert_eq!(
             file_info_3,
             vec![FileInfo {
-                uri: base_url
-                    .clone()
+                uri: storage
+                    .base_url
                     .join("part2/part22/c.parquet")
                     .unwrap()
                     .to_string(),
@@ -276,7 +281,7 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url, HashMap::new());
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
         let leaf_dirs = get_leaf_dirs(&storage, None).await;
         assert_eq!(
             leaf_dirs,
@@ -288,7 +293,7 @@ mod tests {
     async fn storage_get_file_info() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
-        let storage = Storage::new(base_url, HashMap::new());
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
         let file_info = storage.get_file_info("a.parquet").await;
         assert_eq!(file_info.name, "a.parquet");
         assert_eq!(
@@ -302,7 +307,7 @@ mod tests {
     async fn storage_get_parquet_file_data() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
-        let storage = Storage::new(base_url, HashMap::new());
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
         let file_data = storage.get_parquet_file_data("a.parquet").await;
         assert_eq!(file_data.len(), 1);
         assert_eq!(file_data.first().unwrap().num_rows(), 5);
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index f1976d8..8f278dd 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -18,33 +18,44 @@
  */
 
 use std::collections::HashMap;
-use std::path::PathBuf;
+use std::sync::Arc;
 
 use anyhow::{anyhow, Result};
 use arrow::record_batch::RecordBatch;
+use dashmap::DashMap;
 use url::Url;
 
 use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
-use crate::storage::file_stats::FileStats;
 use crate::storage::{get_leaf_dirs, Storage};
 
 #[derive(Clone, Debug)]
+#[allow(dead_code)]
 pub struct FileSystemView {
-    pub base_url: Url,
-    partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
+    props: Arc<HashMap<String, String>>,
+    storage: Arc<Storage>,
+    partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
 }
 
 impl FileSystemView {
-    pub fn new(base_url: Url) -> Self {
-        FileSystemView {
-            base_url,
-            partition_to_file_groups: HashMap::new(),
-        }
+    pub async fn new(
+        base_url: Arc<Url>,
+        storage_options: Arc<HashMap<String, String>>,
+        props: Arc<HashMap<String, String>>,
+    ) -> Result<Self> {
+        let storage = Storage::new(base_url, storage_options)?;
+        let partition_paths = Self::get_partition_paths(&storage).await?;
+        let partition_to_file_groups =
+            Self::load_file_groups_for_partitions(&storage, 
partition_paths).await?;
+        let partition_to_file_groups = 
Arc::new(DashMap::from_iter(partition_to_file_groups));
+        Ok(FileSystemView {
+            props,
+            storage,
+            partition_to_file_groups,
+        })
     }
 
-    async fn get_partition_paths(&self) -> Result<Vec<String>> {
-        let storage = Storage::new(self.base_url.clone(), HashMap::new());
+    async fn get_partition_paths(storage: &Storage) -> Result<Vec<String>> {
         let top_level_dirs: Vec<String> = storage
             .list_dirs(None)
             .await
@@ -53,19 +64,41 @@ impl FileSystemView {
             .collect();
         let mut partition_paths = Vec::new();
         for dir in top_level_dirs {
-            partition_paths.extend(get_leaf_dirs(&storage, Some(&dir)).await);
+            partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await);
+        }
+        if partition_paths.is_empty() {
+            partition_paths.push("".to_string())
         }
         Ok(partition_paths)
     }
 
-    async fn get_file_groups(&self, partition_path: &str) -> 
Result<Vec<FileGroup>> {
-        let storage = Storage::new(self.base_url.clone(), HashMap::new());
+    async fn load_file_groups_for_partitions(
+        storage: &Storage,
+        partition_paths: Vec<String>,
+    ) -> Result<HashMap<String, Vec<FileGroup>>> {
+        let mut partition_to_file_groups = HashMap::new();
+        for p in partition_paths {
+            match Self::load_file_groups_for_partition(storage, 
p.as_str()).await {
+                Ok(file_groups) => {
+                    partition_to_file_groups.insert(p, file_groups);
+                }
+                Err(e) => return Err(anyhow!("Failed to load partitions: {}", 
e)),
+            }
+        }
+        Ok(partition_to_file_groups)
+    }
+
+    async fn load_file_groups_for_partition(
+        storage: &Storage,
+        partition_path: &str,
+    ) -> Result<Vec<FileGroup>> {
         let file_info: Vec<FileInfo> = storage
             .list_files(Some(partition_path))
             .await
             .into_iter()
             .filter(|f| f.name.ends_with(".parquet"))
             .collect();
+
         let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
         for f in file_info {
             let base_file = BaseFile::from_file_info(f)?;
@@ -87,109 +120,68 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub async fn load_file_groups(&mut self) {
-        let fs_view = self.clone();
-        let result = get_partitions_and_file_groups(&fs_view).await.unwrap();
-        for (k, v) in result {
-            self.partition_to_file_groups.insert(k, v);
-        }
-    }
-
-    pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+    pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> {
         let mut file_slices = Vec::new();
-        for fgs in self.partition_to_file_groups.values() {
-            for fg in fgs {
-                if let Some(file_slice) = fg.get_latest_file_slice() {
-                    file_slices.push(file_slice)
+        for fgs in self.partition_to_file_groups.iter() {
+            let fgs_ref = fgs.value();
+            for fg in fgs_ref {
+                if let Some(fsl) = fg.get_latest_file_slice() {
+                    file_slices.push(fsl.clone())
                 }
             }
         }
-        file_slices
+        Ok(file_slices)
     }
 
-    pub async fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut 
FileSlice> {
-        let mut file_slices = Vec::new();
-        let file_groups = &mut self.partition_to_file_groups.values_mut();
-        for fgs in file_groups {
-            for fg in fgs {
+    pub async fn load_latest_file_slices_stats(&self) -> Result<()> {
+        for mut fgs in self.partition_to_file_groups.iter_mut() {
+            let fgs_ref = fgs.value_mut();
+            for fg in fgs_ref {
                 if let Some(file_slice) = fg.get_latest_file_slice_mut() {
-                    let _ = load_file_slice_stats(&self.base_url, 
file_slice).await;
-                    file_slices.push(file_slice)
+                    file_slice
+                        .load_stats(&self.storage)
+                        .await
+                        .expect("Successful loading file stats.");
                 }
             }
         }
-        file_slices
+        Ok(())
     }
 
-    pub async fn read_file_slice(&self, relative_path: &str) -> 
Vec<RecordBatch> {
-        let storage = Storage::new(self.base_url.clone(), HashMap::new());
-        storage.get_parquet_file_data(relative_path).await
+    pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<Vec<RecordBatch>> {
+        Ok(self.storage.get_parquet_file_data(relative_path).await)
     }
-}
-
-async fn load_file_slice_stats(base_url: &Url, file_slice: &mut FileSlice) -> 
Result<()> {
-    let base_file = &mut file_slice.base_file;
-    if base_file.stats.is_none() {
-        let storage = Storage::new(base_url.clone(), HashMap::new());
-        let ptn = file_slice.partition_path.clone();
-        let mut relative_path = PathBuf::from(ptn.unwrap_or("".to_string()));
-        let base_file_name = &base_file.info.name;
-        relative_path.push(base_file_name);
-        let parquet_meta = storage
-            .get_parquet_file_metadata(relative_path.to_str().unwrap())
-            .await;
-        let num_records = parquet_meta.file_metadata().num_rows();
-        base_file.populate_stats(FileStats { num_records });
-    }
-    Ok(())
-}
-
-async fn get_partitions_and_file_groups(
-    fs_view: &FileSystemView,
-) -> Result<HashMap<String, Vec<FileGroup>>> {
-    match fs_view.get_partition_paths().await {
-        Ok(mut partition_paths) => {
-            if partition_paths.is_empty() {
-                partition_paths.push("".to_string());
-            }
-            let mut partition_to_file_groups = HashMap::new();
-            for p in partition_paths {
-                match fs_view.get_file_groups(p.as_str()).await {
-                    Ok(file_groups) => {
-                        partition_to_file_groups.insert(p, file_groups);
-                    }
-                    Err(e) => return Err(anyhow!("Failed to load partitions: 
{}", e)),
-                }
-            }
-            Ok(partition_to_file_groups)
-        }
-        Err(e) => Err(anyhow!("Failed to load partitions: {}", e)),
+    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<Vec<RecordBatch>> {
+        self.read_file_slice_by_path(&file_slice.base_file_relative_path())
+            .await
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::collections::HashSet;
+    use std::collections::{HashMap, HashSet};
+    use std::sync::Arc;
 
     use hudi_tests::TestTable;
 
+    use crate::storage::Storage;
     use crate::table::fs_view::FileSystemView;
 
     #[tokio::test]
     async fn get_partition_paths_for_nonpartitioned_table() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let fs_view = FileSystemView::new(base_url);
-        let partition_paths = fs_view.get_partition_paths().await.unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let partition_paths = 
FileSystemView::get_partition_paths(&storage).await.unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
-        assert_eq!(partition_path_set, HashSet::new(),)
+        assert_eq!(partition_path_set, HashSet::from([""]))
     }
 
     #[tokio::test]
     async fn get_partition_paths_for_complexkeygen_table() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let fs_view = FileSystemView::new(base_url);
-        let partition_paths = fs_view.get_partition_paths().await.unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let partition_paths = 
FileSystemView::get_partition_paths(&storage).await.unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
         assert_eq!(
@@ -203,17 +195,21 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn get_latest_file_slices() {
+    async fn fs_view_get_latest_file_slices() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let mut fs_view = FileSystemView::new(base_url);
-        fs_view.load_file_groups().await;
-        let file_slices = fs_view.get_latest_file_slices();
+        let fs_view = FileSystemView::new(
+            Arc::new(base_url),
+            Arc::new(HashMap::new()),
+            Arc::new(HashMap::new()),
+        )
+        .await
+        .unwrap();
+        let file_slices = fs_view.get_latest_file_slices().unwrap();
         assert_eq!(file_slices.len(), 1);
-        let mut fg_ids = Vec::new();
-        for f in file_slices {
-            let fp = f.file_group_id();
-            fg_ids.push(fp);
-        }
+        let fg_ids = file_slices
+            .iter()
+            .map(|fsl| fsl.file_group_id())
+            .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
     }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 86f1008..825b2d4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -21,10 +21,11 @@ use std::collections::HashMap;
 use std::io::{BufRead, BufReader};
 use std::path::PathBuf;
 use std::str::FromStr;
+use std::sync::Arc;
 
-use anyhow::Result;
+use anyhow::{anyhow, Context, Result};
 use arrow::record_batch::RecordBatch;
-use arrow_schema::SchemaRef;
+use arrow_schema::Schema;
 use url::Url;
 
 use crate::file_group::FileSlice;
@@ -33,53 +34,60 @@ use crate::table::config::BaseFileFormat;
 use crate::table::config::{ConfigKey, TableType};
 use crate::table::fs_view::FileSystemView;
 use crate::table::metadata::ProvidesTableMetadata;
-use crate::timeline::Timeline;
+use crate::table::timeline::Timeline;
 
 mod config;
 mod fs_view;
 mod metadata;
+mod timeline;
 
 #[derive(Debug, Clone)]
-#[allow(dead_code)]
 pub struct Table {
-    pub base_url: Url,
-    pub props: HashMap<String, String>,
-    pub file_system_view: Option<FileSystemView>,
-    pub storage_options: HashMap<String, String>,
+    pub base_url: Arc<Url>,
+    pub storage_options: Arc<HashMap<String, String>>,
+    pub props: Arc<HashMap<String, String>>,
+    pub timeline: Timeline,
+    pub file_system_view: FileSystemView,
 }
 
 impl Table {
-    pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) 
-> Self {
-        let base_url = 
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
-        match Self::load_properties(
-            base_url.clone(),
-            ".hoodie/hoodie.properties".to_string(),
-            storage_options.clone(),
-        )
-        .await
-        {
-            Ok(props) => Self {
-                base_url,
-                props,
-                file_system_view: None,
-                storage_options,
-            },
-            Err(e) => {
-                panic!("Failed to load table properties: {}", e)
-            }
-        }
+    pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) 
-> Result<Self> {
+        let base_url = Url::from_file_path(PathBuf::from(base_uri))
+            .map_err(|_| anyhow!("Failed to create table URL: {}", base_uri))?;
+        let base_url = Arc::new(base_url);
+        let storage_options = Arc::new(storage_options);
+
+        let props = Self::load_properties(base_url.clone(), 
storage_options.clone())
+            .await
+            .context("Failed to create a table")?;
+
+        let props = Arc::new(props);
+        let timeline = Timeline::new(base_url.clone(), 
storage_options.clone(), props.clone())
+            .await
+            .context("Failed to load timeline")?;
+
+        let file_system_view =
+            FileSystemView::new(base_url.clone(), storage_options.clone(), 
props.clone())
+                .await
+                .context("Failed to load file system view")?;
+
+        Ok(Table {
+            base_url,
+            storage_options,
+            props,
+            timeline,
+            file_system_view,
+        })
     }
 
     async fn load_properties(
-        base_url: Url,
-        props_path: String,
-        storage_options: HashMap<String, String>,
+        base_url: Arc<Url>,
+        storage_options: Arc<HashMap<String, String>>,
     ) -> Result<HashMap<String, String>> {
-        let storage = Storage::new(base_url, storage_options);
-        let data = storage.get_file_data(props_path.as_str()).await;
+        let storage = Storage::new(base_url, storage_options)?;
+        let data = storage.get_file_data(".hoodie/hoodie.properties").await;
         let cursor = std::io::Cursor::new(data);
-        let reader = BufReader::new(cursor);
-        let lines = reader.lines();
+        let lines = BufReader::new(cursor).lines();
         let mut properties: HashMap<String, String> = HashMap::new();
         for line in lines {
             let line = line?;
@@ -102,59 +110,35 @@ impl Table {
         }
     }
 
-    #[cfg(test)]
-    async fn get_timeline(&self) -> Result<Timeline> {
-        Timeline::new(self.base_url.clone()).await
+    pub async fn get_schema(&self) -> Result<Schema> {
+        self.timeline.get_latest_schema().await
     }
 
-    pub async fn get_latest_schema(&self) -> SchemaRef {
-        let timeline_result = Timeline::new(self.base_url.clone()).await;
-        match timeline_result {
-            Ok(timeline) => {
-                let schema_result = timeline.get_latest_schema().await;
-                match schema_result {
-                    Ok(schema) => SchemaRef::from(schema),
-                    Err(e) => panic!("Failed to resolve table schema: {}", e),
-                }
-            }
-            Err(e) => panic!("Failed to resolve table schema: {}", e),
-        }
+    pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
+        self.file_system_view
+            .load_latest_file_slices_stats()
+            .await
+            .expect("Successful loading of file slice stats.");
+        self.file_system_view.get_latest_file_slices()
     }
 
-    pub async fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
-        if self.file_system_view.is_none() {
-            let mut new_fs_view = FileSystemView::new(self.base_url.clone());
-            new_fs_view.load_file_groups().await;
-            self.file_system_view = Some(new_fs_view);
-        }
-
-        let fs_view = self.file_system_view.as_mut().unwrap();
-
-        let mut file_slices = Vec::new();
-        for f in fs_view.get_latest_file_slices_with_stats().await {
-            file_slices.push(f.clone());
+    #[cfg(test)]
+    async fn get_file_paths(&self) -> Result<Vec<String>> {
+        let mut file_paths = Vec::new();
+        for f in self.get_file_slices().await? {
+            file_paths.push(f.base_file_path().to_string());
         }
-        Ok(file_slices)
+        Ok(file_paths)
     }
 
-    pub async fn read_file_slice(&mut self, relative_path: &str) -> 
Vec<RecordBatch> {
-        if self.file_system_view.is_none() {
-            let mut new_fs_view = FileSystemView::new(self.base_url.clone());
-            new_fs_view.load_file_groups().await;
-            self.file_system_view = Some(new_fs_view);
-        }
-
-        let fs_view = self.file_system_view.as_ref().unwrap();
-        fs_view.read_file_slice(relative_path).await
+    pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<Vec<RecordBatch>> {
+        self.file_system_view
+            .read_file_slice_by_path(relative_path)
+            .await
     }
 
-    pub async fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
-        let mut file_paths = Vec::new();
-        for f in self.get_latest_file_slices().await? {
-            file_paths.push(f.base_file_path().to_string());
-        }
-        println!("{:?}", file_paths);
-        Ok(file_paths)
+    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<Vec<RecordBatch>> {
+        self.file_system_view.read_file_slice(file_slice).await
     }
 }
 
@@ -258,10 +242,11 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_get_latest_schema() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
         let fields: Vec<String> = hudi_table
-            .get_latest_schema()
+            .get_schema()
             .await
+            .unwrap()
             .all_fields()
             .into_iter()
             .map(|f| f.name().to_string())
@@ -310,12 +295,13 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_read_file_slice() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
         let batches = hudi_table
-            .read_file_slice(
+            .read_file_slice_by_path(
                 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
             )
-            .await;
+            .await
+            .unwrap();
         assert_eq!(batches.len(), 1);
         assert_eq!(batches.first().unwrap().num_rows(), 4);
         assert_eq!(batches.first().unwrap().num_columns(), 21);
@@ -324,10 +310,10 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_get_latest_file_paths() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
-        assert_eq!(hudi_table.get_timeline().await.unwrap().instants.len(), 2);
+        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
+        assert_eq!(hudi_table.timeline.instants.len(), 2);
         let actual: HashSet<String> =
-            
HashSet::from_iter(hudi_table.get_latest_file_paths().await.unwrap());
+            HashSet::from_iter(hudi_table.get_file_paths().await.unwrap());
         let expected: HashSet<String> = HashSet::from_iter(vec![
             
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
             
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
@@ -342,7 +328,9 @@ mod tests {
     async fn hudi_table_get_table_metadata() {
         let base_path =
             
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
-        let table = Table::new(base_path.to_str().unwrap(), 
HashMap::new()).await;
+        let table = Table::new(base_path.to_str().unwrap(), HashMap::new())
+            .await
+            .unwrap();
         assert_eq!(table.base_file_format(), Parquet);
         assert_eq!(table.checksum(), 3761586722);
         assert_eq!(table.database_name(), "default");
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/table/timeline.rs
similarity index 69%
rename from crates/core/src/timeline/mod.rs
rename to crates/core/src/table/timeline.rs
index 311751a..9dcf6e2 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/table/timeline.rs
@@ -17,12 +17,14 @@
  * under the License.
  */
 
+use std::cmp::Ordering;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::path::PathBuf;
+use std::sync::Arc;
 
 use anyhow::{anyhow, Result};
-use arrow_schema::SchemaRef;
+use arrow_schema::Schema;
 use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
 use url::Url;
@@ -31,20 +33,32 @@ use crate::storage::utils::split_filename;
 use crate::storage::Storage;
 
 #[allow(dead_code)]
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
 pub enum State {
     Requested,
     Inflight,
     Completed,
 }
 
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
 pub struct Instant {
     state: State,
     action: String,
     timestamp: String,
 }
 
+impl PartialOrd for Instant {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.timestamp.cmp(&other.timestamp))
+    }
+}
+
+impl Ord for Instant {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.timestamp.cmp(&other.timestamp)
+    }
+}
+
 impl Instant {
     pub fn state_suffix(&self) -> String {
         match self.state {
@@ -59,20 +73,30 @@ impl Instant {
     }
 }
 
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
+#[allow(dead_code)]
 pub struct Timeline {
-    pub base_url: Url,
+    props: Arc<HashMap<String, String>>,
+    storage: Arc<Storage>,
     pub instants: Vec<Instant>,
 }
 
 impl Timeline {
-    pub async fn new(base_url: Url) -> Result<Self> {
-        let instants = Self::load_completed_commit_instants(&base_url).await?;
-        Ok(Self { base_url, instants })
+    pub async fn new(
+        base_url: Arc<Url>,
+        storage_options: Arc<HashMap<String, String>>,
+        props: Arc<HashMap<String, String>>,
+    ) -> Result<Self> {
+        let storage = Storage::new(base_url, storage_options)?;
+        let instants = Self::load_completed_commit_instants(&storage).await?;
+        Ok(Self {
+            storage,
+            props,
+            instants,
+        })
     }
 
-    async fn load_completed_commit_instants(base_url: &Url) -> 
Result<Vec<Instant>> {
-        let storage = Storage::new(base_url.clone(), HashMap::new());
+    async fn load_completed_commit_instants(storage: &Storage) -> 
Result<Vec<Instant>> {
         let mut completed_commits = Vec::new();
         for file_info in storage.list_files(Some(".hoodie")).await {
             let (file_stem, file_ext) = 
split_filename(file_info.name.as_str())?;
@@ -84,8 +108,7 @@ impl Timeline {
                 })
             }
         }
-        // TODO: encapsulate sorting within Instant
-        completed_commits.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
+        completed_commits.sort();
         Ok(completed_commits)
     }
 
@@ -94,10 +117,11 @@ impl Timeline {
             Some(instant) => {
                 let mut commit_file_path = PathBuf::from(".hoodie");
                 commit_file_path.push(instant.file_name());
-                let storage = Storage::new(self.base_url.clone(), 
HashMap::new());
-                let bytes = storage
-                    .get_file_data(commit_file_path.to_str().unwrap())
-                    .await;
+                let relative_path = commit_file_path.to_str().ok_or(anyhow!(
+                    "Failed to get commit file path for instant: {:?}",
+                    instant
+                ));
+                let bytes = self.storage.get_file_data(relative_path?).await;
                 let json: Value = serde_json::from_slice(&bytes)?;
                 let commit_metadata = json
                     .as_object()
@@ -109,20 +133,19 @@ impl Timeline {
         }
     }
 
-    pub async fn get_latest_schema(&self) -> Result<SchemaRef> {
-        let commit_metadata = self.get_latest_commit_metadata().await.unwrap();
+    pub async fn get_latest_schema(&self) -> Result<Schema> {
+        let commit_metadata = self.get_latest_commit_metadata().await?;
         if let Some(partition_to_write_stats) = 
commit_metadata["partitionToWriteStats"].as_object()
         {
             if let Some((_, value)) = partition_to_write_stats.iter().next() {
                 if let Some(first_value) = value.as_array().and_then(|arr| 
arr.first()) {
                     if let Some(path) = first_value["path"].as_str() {
-                        let storage = Storage::new(self.base_url.clone(), 
HashMap::new());
-                        let parquet_meta = 
storage.get_parquet_file_metadata(path).await;
+                        let parquet_meta = 
self.storage.get_parquet_file_metadata(path).await;
                         let arrow_schema = parquet_to_arrow_schema(
                             parquet_meta.file_metadata().schema_descr(),
                             None,
                         )?;
-                        return Ok(SchemaRef::from(arrow_schema));
+                        return Ok(arrow_schema);
                     }
                 }
             }
@@ -133,19 +156,27 @@ impl Timeline {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
     use std::fs::canonicalize;
     use std::path::Path;
+    use std::sync::Arc;
 
     use url::Url;
 
     use hudi_tests::TestTable;
 
-    use crate::timeline::{Instant, State, Timeline};
+    use crate::table::timeline::{Instant, State, Timeline};
 
     #[tokio::test]
     async fn read_latest_schema() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let timeline = Timeline::new(base_url).await.unwrap();
+        let timeline = Timeline::new(
+            Arc::new(base_url),
+            Arc::new(HashMap::new()),
+            Arc::new(HashMap::new()),
+        )
+        .await
+        .unwrap();
         let table_schema = timeline.get_latest_schema().await.unwrap();
         assert_eq!(table_schema.fields.len(), 21)
     }
@@ -155,7 +186,13 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap())
                 .unwrap();
-        let timeline = Timeline::new(base_url).await.unwrap();
+        let timeline = Timeline::new(
+            Arc::new(base_url),
+            Arc::new(HashMap::new()),
+            Arc::new(HashMap::new()),
+        )
+        .await
+        .unwrap();
         assert_eq!(
             timeline.instants,
             vec![
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 84c17a5..bb1f5df 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -59,6 +59,7 @@ serde_json = { workspace = true }
 tokio = { workspace = true }
 
 # "stdlib"
+anyhow = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true, default-features = false, features = ["clock"] }
 hashbrown = "0.14.3"
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f064961..f4a1bba 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -39,13 +39,20 @@ use hudi_core::HudiTable;
 
 #[derive(Debug, Clone)]
 pub struct HudiDataSource {
-    table: HudiTable,
+    table: Arc<HudiTable>,
 }
 
 impl HudiDataSource {
-    pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) 
-> Self {
-        Self {
-            table: HudiTable::new(base_uri, storage_options).await,
+    pub async fn new(
+        base_uri: &str,
+        storage_options: HashMap<String, String>,
+    ) -> datafusion_common::Result<Self> {
+        match HudiTable::new(base_uri, storage_options).await {
+            Ok(t) => Ok(Self { table: Arc::new(t) }),
+            Err(e) => Err(DataFusionError::Execution(format!(
+                "Failed to create Hudi table: {}",
+                e
+            ))),
         }
     }
 
@@ -58,20 +65,18 @@ impl HudiDataSource {
     }
 
     async fn get_record_batches(&mut self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
-        match self.table.get_latest_file_slices().await {
-            Ok(file_slices) => {
-                let mut record_batches = Vec::new();
-                for f in file_slices {
-                    let relative_path = f.base_file_relative_path();
-                    let records = 
self.table.read_file_slice(&relative_path).await;
-                    record_batches.extend(records)
-                }
-                Ok(record_batches)
-            }
-            Err(_e) => Err(DataFusionError::Execution(
-                "Failed to read records from table.".to_owned(),
-            )),
+        let file_slices = self.table.get_file_slices().await.map_err(|e| {
+            DataFusionError::Execution(format!("Failed to load file slices 
from table: {}", e))
+        })?;
+
+        let mut record_batches = Vec::new();
+        for fsl in file_slices {
+            let batches = self.table.read_file_slice(&fsl).await.map_err(|e| {
+                DataFusionError::Execution(format!("Failed to read records 
from table: {}", e))
+            })?;
+            record_batches.extend(batches)
         }
+        Ok(record_batches)
     }
 }
 
@@ -85,9 +90,9 @@ impl TableProvider for HudiDataSource {
         let table = self.table.clone();
         let handle = thread::spawn(move || {
             let rt = tokio::runtime::Runtime::new().unwrap();
-            rt.block_on(async { table.get_latest_schema().await })
+            rt.block_on(async { table.get_schema().await })
         });
-        handle.join().unwrap()
+        SchemaRef::from(handle.join().unwrap().unwrap())
     }
 
     fn table_type(&self) -> TableType {
@@ -196,7 +201,9 @@ mod tests {
         );
         let ctx = SessionContext::new_with_config(config);
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let hudi = HudiDataSource::new(base_url.path(), HashMap::new()).await;
+        let hudi = HudiDataSource::new(base_url.path(), HashMap::new())
+            .await
+            .unwrap();
         ctx.register_table("hudi_table_complexkeygen", Arc::new(hudi))
             .unwrap();
         let df: DataFrame = ctx
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 3db2cc1..8f1d079 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -42,13 +42,15 @@ arrow-row = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
 
+anyhow = { workspace = true }
+
 # runtime / async
 futures = { workspace = true }
 tokio = { workspace = true }
 
 [dependencies.pyo3]
 version = "0.20.3"
-features = ["extension-module", "abi3", "abi3-py38"]
+features = ["extension-module", "abi3", "abi3-py38", "anyhow"]
 
 [dependencies.hudi]
 path = "../crates/hudi"
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index b34c25a..83ed929 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -30,10 +30,10 @@ class HudiFileSlice:
     partition_path: str
     commit_time: str
     base_file_name: str
-    base_file_path: str
     base_file_size: int
     num_records: int
 
+    def base_file_relative_path(self) -> str: ...
 
 class BindingHudiTable:
 
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 89851bf..2436ee5 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -21,6 +21,7 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::OnceLock;
 
+use anyhow::anyhow;
 use arrow::pyarrow::ToPyArrow;
 use pyo3::prelude::*;
 use tokio::runtime::Runtime;
@@ -29,6 +30,7 @@ use hudi::file_group::FileSlice;
 use hudi::HudiTable;
 
 #[cfg(not(tarpaulin))]
+#[derive(Clone, Debug)]
 #[pyclass]
 struct HudiFileSlice {
     #[pyo3(get)]
@@ -40,32 +42,45 @@ struct HudiFileSlice {
     #[pyo3(get)]
     base_file_name: String,
     #[pyo3(get)]
-    base_file_path: String,
-    #[pyo3(get)]
     base_file_size: usize,
     #[pyo3(get)]
     num_records: i64,
 }
 
 #[cfg(not(tarpaulin))]
+#[pymethods]
 impl HudiFileSlice {
-    pub fn from_file_slice(f: FileSlice) -> Self {
-        let partition_path = 
f.partition_path.clone().unwrap_or("".to_string());
-        let mut p = PathBuf::from(&partition_path);
-        p.push(f.base_file.info.name.clone());
-        let base_file_path = p.to_str().unwrap().to_string();
-        Self {
-            file_group_id: f.file_group_id().to_string(),
-            partition_path,
-            commit_time: f.base_file.commit_time,
-            base_file_name: f.base_file.info.name,
-            base_file_path,
-            base_file_size: f.base_file.info.size,
-            num_records: f.base_file.stats.unwrap().num_records,
+    pub fn base_file_relative_path(&self) -> PyResult<String> {
+        let mut p = PathBuf::from(&self.partition_path);
+        p.push(&self.base_file_name);
+        match p.to_str() {
+            Some(s) => Ok(s.to_string()),
+            None => Err(PyErr::from(anyhow!(
+                "Failed to get base file relative path for file slice: {:?}",
+                self
+            ))),
         }
     }
 }
 
+#[cfg(not(tarpaulin))]
+fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
+    let file_group_id = f.file_group_id().to_string();
+    let partition_path = 
f.partition_path.as_deref().unwrap_or_default().to_string();
+    let commit_time = f.base_file.commit_time.to_string();
+    let base_file_name = f.base_file.info.name.clone();
+    let base_file_size = f.base_file.info.size;
+    let num_records = 
f.base_file.stats.clone().unwrap_or_default().num_records;
+    HudiFileSlice {
+        file_group_id,
+        partition_path,
+        commit_time,
+        base_file_name,
+        base_file_size,
+        num_records,
+    }
+}
+
 #[cfg(not(tarpaulin))]
 #[pyclass]
 struct BindingHudiTable {
@@ -81,32 +96,23 @@ impl BindingHudiTable {
         let _table = rt().block_on(HudiTable::new(
             table_uri,
             storage_options.unwrap_or_default(),
-        ));
+        ))?;
         Ok(BindingHudiTable { _table })
     }
 
     pub fn schema(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self._table.get_latest_schema())
-            .to_pyarrow(py)
+        rt().block_on(self._table.get_schema())?.to_pyarrow(py)
     }
 
     pub fn get_latest_file_slices(&mut self, py: Python) -> 
PyResult<Vec<HudiFileSlice>> {
         py.allow_threads(|| {
-            let res = rt().block_on(self._table.get_latest_file_slices());
-            match res {
-                Ok(file_slices) => Ok(file_slices
-                    .into_iter()
-                    .map(HudiFileSlice::from_file_slice)
-                    .collect()),
-                Err(_e) => {
-                    panic!("Failed to retrieve the latest file slices.")
-                }
-            }
+            let file_slices = rt().block_on(self._table.get_file_slices())?;
+            Ok(file_slices.iter().map(convert_file_slice).collect())
         })
     }
 
     pub fn read_file_slice(&mut self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
-        rt().block_on(self._table.read_file_slice(relative_path))
+        rt().block_on(self._table.read_file_slice_by_path(relative_path))?
             .to_pyarrow(py)
     }
 }
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 20ce42c..f0266f0 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -36,7 +36,7 @@ def test_sample_table(get_sample_table):
     assert len(file_slices) == 5
     assert set(f.commit_time for f in file_slices) == {'20240402123035233', 
'20240402144910683'}
     assert all(f.num_records == 1 for f in file_slices)
-    file_slice_paths = [f.base_file_path for f in file_slices]
+    file_slice_paths = [f.base_file_relative_path() for f in file_slices]
     assert set(file_slice_paths) == 
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
                                 
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
                                 
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',


Reply via email to