This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d84854c  feat: use `object_store` for common storage APIs (#25)
d84854c is described below

commit d84854c2b3d58252aba9701f320432714cdc3b29
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jun 21 17:28:45 2024 -0500

    feat: use `object_store` for common storage APIs (#25)
---
 Cargo.toml                                         |   4 +-
 crates/core/Cargo.toml                             |   7 +
 .../core/fixtures/timeline/commits_stub/a.parquet  |   0
 .../fixtures/timeline/commits_stub/part1/b.parquet |   0
 .../timeline/commits_stub/part2/part22/c.parquet   |   0
 .../commits_stub/part3/part32/part33/d.parquet     |   0
 crates/core/src/error.rs                           |   9 -
 crates/core/src/file_group/mod.rs                  |   3 +-
 crates/core/src/lib.rs                             |   2 +-
 .../src/{utils.rs => storage/file_metadata.rs}     |  30 ++-
 crates/core/src/storage/mod.rs                     | 217 +++++++++++++++++++++
 crates/core/src/table/config.rs                    |   8 +-
 crates/core/src/table/fs_view.rs                   | 136 +++++++------
 crates/core/src/table/mod.rs                       |   2 +-
 14 files changed, 317 insertions(+), 101 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 29294d8..4a9f419 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,10 +53,11 @@ datafusion-sql = { version = "35" }
 datafusion-physical-expr = { version = "35" }
 
 # serde
-serde = { version = "1.0.194", features = ["derive"] }
+serde = { version = "1.0.203", features = ["derive"] }
 serde_json = "1"
 
 # "stdlib"
+anyhow = { version = "1.0.86" }
 bytes = { version = "1" }
 chrono = { version = "=0.4.34", default-features = false, features = ["clock"] 
}
 tracing = { version = "0.1", features = ["log"] }
@@ -67,6 +68,7 @@ uuid = { version = "1" }
 
 # runtime / async
 async-trait = { version = "0.1" }
+async-recursion = { version = "1.1.1" }
 futures = { version = "0.3" }
 tokio = { version = "1" }
 num_cpus = { version = "1" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index e2f9bf0..f97a526 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -36,6 +36,7 @@ arrow-ord = { workspace = true }
 arrow-row = { workspace = true }
 arrow-schema = { workspace = true, features = ["serde"] }
 arrow-select = { workspace = true }
+object_store = { workspace = true }
 parquet = { workspace = true, features = [
     "async",
     "object_store",
@@ -55,6 +56,7 @@ serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
 
 # "stdlib"
+anyhow = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true, default-features = false, features = ["clock"] }
 hashbrown = "0.14.3"
@@ -63,6 +65,11 @@ thiserror = { workspace = true }
 uuid = { workspace = true, features = ["serde", "v4"] }
 url = { workspace = true }
 
+# runtime / async
+async-recursion = { workspace = true }
+async-trait = { workspace = true }
+tokio = { workspace = true }
+
 # test
 tempfile = "3.10.1"
 zip-extract = "0.1.3"
diff --git a/crates/core/fixtures/timeline/commits_stub/a.parquet 
b/crates/core/fixtures/timeline/commits_stub/a.parquet
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/fixtures/timeline/commits_stub/part1/b.parquet 
b/crates/core/fixtures/timeline/commits_stub/part1/b.parquet
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/fixtures/timeline/commits_stub/part2/part22/c.parquet 
b/crates/core/fixtures/timeline/commits_stub/part2/part22/c.parquet
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/part3/part32/part33/d.parquet 
b/crates/core/fixtures/timeline/commits_stub/part3/part32/part33/d.parquet
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 4a2f0f2..e8f76c9 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-use std::error::Error;
 use std::fmt::Debug;
 
 use thiserror::Error;
@@ -28,18 +27,10 @@ pub enum HudiFileGroupError {
     CommitTimeAlreadyExists(String, String),
 }
 
-#[derive(Debug, Error)]
-pub enum HudiFileSystemViewError {
-    #[error("Error in loading partitions: {0}")]
-    FailToLoadPartitions(Box<dyn Error>),
-}
-
 #[derive(Debug, Error)]
 pub enum HudiCoreError {
     #[error("Failed to load file group")]
     FailToLoadFileGroup(#[from] HudiFileGroupError),
-    #[error("Failed to build file system view")]
-    FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
     #[error("Failed to load table properties")]
     LoadTablePropertiesError,
 }
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 29761bd..b71c0ce 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -21,10 +21,9 @@ use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
 
-use hudi_fs::file_systems::FileMetadata;
-
 use crate::error::HudiFileGroupError;
 use crate::error::HudiFileGroupError::CommitTimeAlreadyExists;
+use crate::storage::file_metadata::FileMetadata;
 
 #[derive(Clone, Debug)]
 pub struct BaseFile {
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 86a191f..3198cbc 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,8 +23,8 @@ mod error;
 mod file_group;
 pub mod table;
 pub type HudiTable = Table;
+mod storage;
 mod timeline;
-mod utils;
 
 pub fn crate_version() -> &'static str {
     env!("CARGO_PKG_VERSION")
diff --git a/crates/core/src/utils.rs b/crates/core/src/storage/file_metadata.rs
similarity index 60%
rename from crates/core/src/utils.rs
rename to crates/core/src/storage/file_metadata.rs
index c4189c5..ee80a03 100644
--- a/crates/core/src/utils.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -17,23 +17,21 @@
  * under the License.
  */
 
-use std::path::{Path, PathBuf};
-use std::{fs, io};
+#[derive(Clone, Debug, Default, Eq, PartialEq)]
+pub struct FileMetadata {
+    pub path: String,
+    pub name: String,
+    pub size: usize,
+    pub num_records: Option<usize>,
+}
 
-pub fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
-    let mut leaf_dirs = Vec::new();
-    let mut is_leaf_dir = true;
-    for entry in fs::read_dir(path)? {
-        let entry = entry?;
-        if entry.path().is_dir() {
-            is_leaf_dir = false;
-            let curr_sub_dir = entry.path();
-            let curr = get_leaf_dirs(&curr_sub_dir)?;
-            leaf_dirs.extend(curr);
+impl FileMetadata {
+    pub fn new(path: String, name: String, size: usize) -> FileMetadata {
+        FileMetadata {
+            path,
+            name,
+            size,
+            num_records: None,
         }
     }
-    if is_leaf_dir {
-        leaf_dirs.push(path.to_path_buf())
-    }
-    Ok(leaf_dirs)
 }
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
new file mode 100644
index 0000000..97df419
--- /dev/null
+++ b/crates/core/src/storage/mod.rs
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use async_recursion::async_recursion;
+use object_store::path::Path as ObjPath;
+use object_store::{parse_url_opts, DynObjectStore, ObjectStore};
+use url::Url;
+
+use crate::storage::file_metadata::FileMetadata;
+
+pub(crate) mod file_metadata;
+
+#[allow(dead_code)]
+pub struct Storage {
+    base_url: Url,
+    object_store: Box<DynObjectStore>,
+    options: HashMap<String, String>,
+}
+
+#[allow(dead_code)]
+impl Storage {
+    pub fn new(base_uri: &str, options: HashMap<String, String>) -> 
Box<Storage> {
+        let base_url = 
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
+        let object_store = parse_url_opts(&base_url, &options).unwrap().0;
+        Box::from(Storage {
+            base_url,
+            object_store,
+            options,
+        })
+    }
+
+    pub async fn get_file_metadata(&self, path: &str) -> FileMetadata {
+        let p = ObjPath::from(path);
+        let meta = self.object_store.head(&p).await.unwrap();
+        FileMetadata {
+            path: meta.location.to_string(),
+            name: p.filename().unwrap().to_string(),
+            size: meta.size,
+            num_records: None,
+        }
+    }
+
+    pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
+        self.list_dirs_as_paths(subdir)
+            .await
+            .into_iter()
+            .map(|p| p.filename().unwrap().to_string())
+            .collect()
+    }
+
+    pub async fn list_dirs_as_paths(&self, subdir: Option<&str>) -> 
Vec<ObjPath> {
+        let mut prefix_url = self.base_url.clone();
+        if let Some(subdir) = subdir {
+            prefix_url.path_segments_mut().unwrap().push(subdir);
+        }
+        let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+        self.object_store
+            .list_with_delimiter(Some(&prefix))
+            .await
+            .unwrap()
+            .common_prefixes
+    }
+
+    pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileMetadata> {
+        let mut prefix_url = self.base_url.clone();
+        if let Some(subdir) = subdir {
+            prefix_url.path_segments_mut().unwrap().push(subdir);
+        }
+        let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+        self.object_store
+            .list_with_delimiter(Some(&prefix))
+            .await
+            .unwrap()
+            .objects
+            .into_iter()
+            .map(|obj_meta| {
+                FileMetadata::new(
+                    obj_meta.location.to_string(),
+                    obj_meta.location.filename().unwrap().to_string(),
+                    obj_meta.size,
+                )
+            })
+            .collect()
+    }
+}
+
+#[async_recursion]
+pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> 
Vec<String> {
+    let mut leaf_dirs = Vec::new();
+    let child_dirs = storage.list_dirs(subdir).await;
+    if child_dirs.is_empty() {
+        leaf_dirs.push(subdir.unwrap().to_owned());
+    } else {
+        for child_dir in child_dirs {
+            let mut next_subdir = PathBuf::new();
+            if let Some(curr) = subdir {
+                next_subdir.push(curr);
+            }
+            next_subdir.push(child_dir);
+            let curr_leaf_dir = get_leaf_dirs(storage, 
Some(next_subdir.to_str().unwrap())).await;
+            leaf_dirs.extend(curr_leaf_dir);
+        }
+    }
+    leaf_dirs
+}
+
+#[cfg(test)]
+mod tests {
+    use object_store::path::Path as ObjPath;
+    use std::collections::{HashMap, HashSet};
+    use std::fs::canonicalize;
+    use std::path::Path;
+
+    use url::Url;
+
+    use crate::storage::{get_leaf_dirs, Storage};
+
+    #[tokio::test]
+    async fn storage_list_dirs() {
+        let base_url = Url::from_directory_path(
+            canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+        )
+        .unwrap();
+        let storage = Storage::new(base_url.path(), HashMap::new());
+        let first_level_dirs: HashSet<String> = 
storage.list_dirs(None).await.into_iter().collect();
+        assert_eq!(
+            first_level_dirs,
+            vec![".hoodie", "part1", "part2", "part3"]
+                .into_iter()
+                .map(String::from)
+                .collect()
+        );
+        let second_level_dirs: Vec<String> = 
storage.list_dirs(Some("part2")).await;
+        assert_eq!(second_level_dirs, vec!["part22"]);
+        let no_dirs = storage.list_dirs(Some("part1")).await;
+        assert!(no_dirs.is_empty());
+    }
+
+    #[tokio::test]
+    async fn storage_list_dirs_as_paths() {
+        let base_url = Url::from_directory_path(
+            canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+        )
+        .unwrap();
+        let storage = Storage::new(base_url.path(), HashMap::new());
+        let first_level_dirs: HashSet<ObjPath> =
+            storage.list_dirs_as_paths(None).await.into_iter().collect();
+        let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1", 
"part2", "part3"]
+            .into_iter()
+            .map(|dir| 
ObjPath::from_url_path(base_url.join(dir).unwrap().path()).unwrap())
+            .collect();
+        assert_eq!(first_level_dirs, expected_paths);
+    }
+
+    #[tokio::test]
+    async fn storage_list_files() {
+        let base_url = Url::from_directory_path(
+            canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+        )
+        .unwrap();
+        let storage = Storage::new(base_url.path(), HashMap::new());
+        let file_names_1: Vec<String> = storage
+            .list_files(None)
+            .await
+            .into_iter()
+            .map(|file_metadata| file_metadata.name)
+            .collect();
+        assert_eq!(file_names_1, vec!["a.parquet"]);
+        let file_names_2: Vec<String> = storage
+            .list_files(Some("part1"))
+            .await
+            .into_iter()
+            .map(|file_metadata| file_metadata.name)
+            .collect();
+        assert_eq!(file_names_2, vec!["b.parquet"]);
+        let file_names_3: Vec<String> = storage
+            .list_files(Some("part2/part22"))
+            .await
+            .into_iter()
+            .map(|file_metadata| file_metadata.name)
+            .collect();
+        assert_eq!(file_names_3, vec!["c.parquet"]);
+    }
+
+    #[tokio::test]
+    async fn use_storage_to_get_leaf_dirs() {
+        let base_url = Url::from_directory_path(
+            canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+        )
+        .unwrap();
+        let storage = Storage::new(base_url.path(), HashMap::new());
+        let leaf_dirs = get_leaf_dirs(&storage, None).await;
+        assert_eq!(
+            leaf_dirs,
+            vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"]
+        );
+    }
+}
diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs
index 05a75db..2f64cc0 100644
--- a/crates/core/src/table/config.rs
+++ b/crates/core/src/table/config.rs
@@ -71,12 +71,8 @@ impl FromStr for TableType {
 
     fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s.to_ascii_lowercase().as_str() {
-            "copy_on_write" => Ok(Self::CopyOnWrite),
-            "copy-on-write" => Ok(Self::CopyOnWrite),
-            "cow" => Ok(Self::CopyOnWrite),
-            "merge_on_read" => Ok(Self::MergeOnRead),
-            "merge-on-read" => Ok(Self::MergeOnRead),
-            "mor" => Ok(Self::MergeOnRead),
+            "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
+            "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
             _ => Err(HudiCoreError::LoadTablePropertiesError),
         }
     }
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index f672dfc..ae8812b 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -17,18 +17,14 @@
  * under the License.
  */
 
-use std::error::Error;
+use std::collections::HashMap;
 use std::path::{Path, PathBuf};
-use std::{fs, io};
 
-use hashbrown::HashMap;
+use anyhow::{anyhow, Result};
 
-use hudi_fs::file_systems::FileMetadata;
-
-use crate::error::HudiFileSystemViewError;
-use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
 use crate::file_group::{BaseFile, FileGroup, FileSlice};
-use crate::utils::get_leaf_dirs;
+use crate::storage::file_metadata::FileMetadata;
+use crate::storage::{get_leaf_dirs, Storage};
 
 #[derive(Clone, Debug)]
 pub struct FileSystemView {
@@ -37,69 +33,46 @@ pub struct FileSystemView {
 }
 
 impl FileSystemView {
-    pub fn new(base_path: &Path) -> Result<Self, HudiFileSystemViewError> {
-        let mut fs_view = FileSystemView {
+    pub fn new(base_path: &Path) -> Self {
+        FileSystemView {
             base_path: base_path.to_path_buf(),
             partition_to_file_groups: HashMap::new(),
-        };
-        fs_view.load_partitions()?;
-        Ok(fs_view)
-    }
-
-    fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
-        match self.get_partition_paths() {
-            Ok(partition_paths) => {
-                for p in partition_paths {
-                    match self.get_file_groups(p.as_str()) {
-                        Ok(file_groups) => {
-                            self.partition_to_file_groups.insert(p, 
file_groups);
-                        }
-                        Err(e) => return Err(FailToLoadPartitions(e)),
-                    }
-                }
-            }
-            Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
         }
-        Ok(())
     }
 
-    pub fn get_partition_paths(&self) -> Result<Vec<String>, io::Error> {
-        let mut first_level_partition_paths: Vec<PathBuf> = Vec::new();
-        for entry in fs::read_dir(self.base_path.as_path())? {
-            let p = entry?.path();
-            if p.is_dir() && p.file_name().and_then(|e| e.to_str()) != 
Some(".hoodie") {
-                first_level_partition_paths.push(p);
-            }
-        }
-        let mut full_partition_paths: Vec<PathBuf> = Vec::new();
-        for p in first_level_partition_paths {
-            full_partition_paths.extend(get_leaf_dirs(p.as_path())?)
-        }
-        let common_prefix_len = self.base_path.to_str().unwrap().len() + 1;
+    async fn get_partition_paths(&self) -> Result<Vec<String>> {
+        let storage = Storage::new(self.base_path.to_str().unwrap(), 
HashMap::new());
+        let top_level_dirs: Vec<String> = storage
+            .list_dirs(None)
+            .await
+            .into_iter()
+            .filter(|dir| dir != ".hoodie")
+            .collect();
         let mut partition_paths = Vec::new();
-        for p in full_partition_paths {
-            let full_partition_path = p.to_str().unwrap();
-            
partition_paths.push(full_partition_path[common_prefix_len..].to_owned())
+        for dir in top_level_dirs {
+            partition_paths.extend(get_leaf_dirs(&storage, Some(&dir)).await);
         }
         Ok(partition_paths)
     }
 
-    pub fn get_file_groups(&self, partition_path: &str) -> 
Result<Vec<FileGroup>, Box<dyn Error>> {
-        let mut part_path = self.base_path.to_path_buf();
-        part_path.push(partition_path);
+    async fn get_file_groups(&self, partition_path: &str) -> 
Result<Vec<FileGroup>> {
+        let storage = Storage::new(self.base_path.to_str().unwrap(), 
HashMap::new());
+        let file_metadata: Vec<FileMetadata> = storage
+            .list_files(Some(partition_path))
+            .await
+            .into_iter()
+            .filter(|f| f.name.ends_with(".parquet"))
+            .collect();
         let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
-        for entry in fs::read_dir(part_path)? {
-            let p = entry?.path();
-            if p.is_file() && p.extension().and_then(|e| e.to_str()) == 
Some("parquet") {
-                let file_metadata = FileMetadata::from_path(p.as_path())?;
-                let base_file = BaseFile::from_file_metadata(file_metadata);
-                let fg_id = &base_file.file_group_id;
-                fg_id_to_base_files
-                    .entry(fg_id.to_owned())
-                    .or_default()
-                    .push(base_file);
-            }
+        for f in file_metadata {
+            let base_file = BaseFile::from_file_metadata(f);
+            let fg_id = &base_file.file_group_id;
+            fg_id_to_base_files
+                .entry(fg_id.to_owned())
+                .or_default()
+                .push(base_file);
         }
+
         let mut file_groups: Vec<FileGroup> = Vec::new();
         for (fg_id, base_files) in fg_id_to_base_files.into_iter() {
             let mut fg = FileGroup::new(fg_id.to_owned(), 
Some(partition_path.to_owned()));
@@ -111,8 +84,18 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+    pub fn get_latest_file_slices(&mut self) -> Vec<&FileSlice> {
         let mut file_slices = Vec::new();
+        let fs_view = self.clone();
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let wrapper = async { get_partitions_and_file_groups(&fs_view).await };
+        let result = rt.block_on(wrapper).unwrap();
+        for (k, v) in result {
+            self.partition_to_file_groups.insert(k, v);
+        }
         for fgs in self.partition_to_file_groups.values() {
             for fg in fgs {
                 if let Some(file_slice) = fg.get_latest_file_slice() {
@@ -124,6 +107,29 @@ impl FileSystemView {
     }
 }
 
+async fn get_partitions_and_file_groups(
+    fs_view: &FileSystemView,
+) -> Result<HashMap<String, Vec<FileGroup>>> {
+    match fs_view.get_partition_paths().await {
+        Ok(mut partition_paths) => {
+            if partition_paths.is_empty() {
+                partition_paths.push("".to_string());
+            }
+            let mut partition_to_file_groups = HashMap::new();
+            for p in partition_paths {
+                match fs_view.get_file_groups(p.as_str()).await {
+                    Ok(file_groups) => {
+                        partition_to_file_groups.insert(p, file_groups);
+                    }
+                    Err(e) => return Err(anyhow!("Failed to load partitions: 
{}", e)),
+                }
+            }
+            Ok(partition_to_file_groups)
+        }
+        Err(e) => Err(anyhow!("Failed to load partitions: {}", e)),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashSet;
@@ -133,12 +139,12 @@ mod tests {
 
     use crate::table::fs_view::FileSystemView;
 
-    #[test]
-    fn get_partition_paths() {
+    #[tokio::test]
+    async fn get_partition_paths() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let fs_view = FileSystemView::new(&target_table_path).unwrap();
-        let partition_paths = fs_view.get_partition_paths().unwrap();
+        let fs_view = FileSystemView::new(&target_table_path);
+        let partition_paths = fs_view.get_partition_paths().await.unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
         assert_eq!(
@@ -151,7 +157,7 @@ mod tests {
     fn get_latest_file_slices() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let fs_view = FileSystemView::new(&target_table_path).unwrap();
+        let mut fs_view = FileSystemView::new(&target_table_path);
         let file_slices = fs_view.get_latest_file_slices();
         assert_eq!(file_slices.len(), 5);
         let mut fg_ids = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 2548554..d2584cd 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -101,7 +101,7 @@ impl Table {
 
     pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>, Box<dyn 
Error>> {
         let mut file_slices = Vec::new();
-        let fs_view = FileSystemView::new(self.base_path.as_path())?;
+        let mut fs_view = FileSystemView::new(self.base_path.as_path());
         for f in fs_view.get_latest_file_slices() {
             file_slices.push(f.clone());
         }

Reply via email to