This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d84854c feat: use `object_store` for common storage APIs (#25)
d84854c is described below
commit d84854c2b3d58252aba9701f320432714cdc3b29
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jun 21 17:28:45 2024 -0500
feat: use `object_store` for common storage APIs (#25)
---
Cargo.toml | 4 +-
crates/core/Cargo.toml | 7 +
.../core/fixtures/timeline/commits_stub/a.parquet | 0
.../fixtures/timeline/commits_stub/part1/b.parquet | 0
.../timeline/commits_stub/part2/part22/c.parquet | 0
.../commits_stub/part3/part32/part33/d.parquet | 0
crates/core/src/error.rs | 9 -
crates/core/src/file_group/mod.rs | 3 +-
crates/core/src/lib.rs | 2 +-
.../src/{utils.rs => storage/file_metadata.rs} | 30 ++-
crates/core/src/storage/mod.rs | 217 +++++++++++++++++++++
crates/core/src/table/config.rs | 8 +-
crates/core/src/table/fs_view.rs | 136 +++++++------
crates/core/src/table/mod.rs | 2 +-
14 files changed, 317 insertions(+), 101 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 29294d8..4a9f419 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,10 +53,11 @@ datafusion-sql = { version = "35" }
datafusion-physical-expr = { version = "35" }
# serde
-serde = { version = "1.0.194", features = ["derive"] }
+serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1"
# "stdlib"
+anyhow = { version = "1.0.86" }
bytes = { version = "1" }
chrono = { version = "=0.4.34", default-features = false, features = ["clock"]
}
tracing = { version = "0.1", features = ["log"] }
@@ -67,6 +68,7 @@ uuid = { version = "1" }
# runtime / async
async-trait = { version = "0.1" }
+async-recursion = { version = "1.1.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index e2f9bf0..f97a526 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -36,6 +36,7 @@ arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
+object_store = { workspace = true }
parquet = { workspace = true, features = [
"async",
"object_store",
@@ -55,6 +56,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
# "stdlib"
+anyhow = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
hashbrown = "0.14.3"
@@ -63,6 +65,11 @@ thiserror = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
+# runtime / async
+async-recursion = { workspace = true }
+async-trait = { workspace = true }
+tokio = { workspace = true }
+
# test
tempfile = "3.10.1"
zip-extract = "0.1.3"
diff --git a/crates/core/fixtures/timeline/commits_stub/a.parquet
b/crates/core/fixtures/timeline/commits_stub/a.parquet
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/fixtures/timeline/commits_stub/part1/b.parquet
b/crates/core/fixtures/timeline/commits_stub/part1/b.parquet
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/fixtures/timeline/commits_stub/part2/part22/c.parquet
b/crates/core/fixtures/timeline/commits_stub/part2/part22/c.parquet
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/part3/part32/part33/d.parquet
b/crates/core/fixtures/timeline/commits_stub/part3/part32/part33/d.parquet
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 4a2f0f2..e8f76c9 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -17,7 +17,6 @@
* under the License.
*/
-use std::error::Error;
use std::fmt::Debug;
use thiserror::Error;
@@ -28,18 +27,10 @@ pub enum HudiFileGroupError {
CommitTimeAlreadyExists(String, String),
}
-#[derive(Debug, Error)]
-pub enum HudiFileSystemViewError {
- #[error("Error in loading partitions: {0}")]
- FailToLoadPartitions(Box<dyn Error>),
-}
-
#[derive(Debug, Error)]
pub enum HudiCoreError {
#[error("Failed to load file group")]
FailToLoadFileGroup(#[from] HudiFileGroupError),
- #[error("Failed to build file system view")]
- FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
#[error("Failed to load table properties")]
LoadTablePropertiesError,
}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 29761bd..b71c0ce 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -21,10 +21,9 @@ use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
-use hudi_fs::file_systems::FileMetadata;
-
use crate::error::HudiFileGroupError;
use crate::error::HudiFileGroupError::CommitTimeAlreadyExists;
+use crate::storage::file_metadata::FileMetadata;
#[derive(Clone, Debug)]
pub struct BaseFile {
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 86a191f..3198cbc 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,8 +23,8 @@ mod error;
mod file_group;
pub mod table;
pub type HudiTable = Table;
+mod storage;
mod timeline;
-mod utils;
pub fn crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
diff --git a/crates/core/src/utils.rs b/crates/core/src/storage/file_metadata.rs
similarity index 60%
rename from crates/core/src/utils.rs
rename to crates/core/src/storage/file_metadata.rs
index c4189c5..ee80a03 100644
--- a/crates/core/src/utils.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -17,23 +17,21 @@
* under the License.
*/
-use std::path::{Path, PathBuf};
-use std::{fs, io};
+#[derive(Clone, Debug, Default, Eq, PartialEq)]
+pub struct FileMetadata {
+ pub path: String,
+ pub name: String,
+ pub size: usize,
+ pub num_records: Option<usize>,
+}
-pub fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
- let mut leaf_dirs = Vec::new();
- let mut is_leaf_dir = true;
- for entry in fs::read_dir(path)? {
- let entry = entry?;
- if entry.path().is_dir() {
- is_leaf_dir = false;
- let curr_sub_dir = entry.path();
- let curr = get_leaf_dirs(&curr_sub_dir)?;
- leaf_dirs.extend(curr);
+impl FileMetadata {
+ pub fn new(path: String, name: String, size: usize) -> FileMetadata {
+ FileMetadata {
+ path,
+ name,
+ size,
+ num_records: None,
}
}
- if is_leaf_dir {
- leaf_dirs.push(path.to_path_buf())
- }
- Ok(leaf_dirs)
}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
new file mode 100644
index 0000000..97df419
--- /dev/null
+++ b/crates/core/src/storage/mod.rs
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use async_recursion::async_recursion;
+use object_store::path::Path as ObjPath;
+use object_store::{parse_url_opts, DynObjectStore, ObjectStore};
+use url::Url;
+
+use crate::storage::file_metadata::FileMetadata;
+
+pub(crate) mod file_metadata;
+
+#[allow(dead_code)]
+pub struct Storage {
+ base_url: Url,
+ object_store: Box<DynObjectStore>,
+ options: HashMap<String, String>,
+}
+
+#[allow(dead_code)]
+impl Storage {
+ pub fn new(base_uri: &str, options: HashMap<String, String>) ->
Box<Storage> {
+ let base_url =
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
+ let object_store = parse_url_opts(&base_url, &options).unwrap().0;
+ Box::from(Storage {
+ base_url,
+ object_store,
+ options,
+ })
+ }
+
+ pub async fn get_file_metadata(&self, path: &str) -> FileMetadata {
+ let p = ObjPath::from(path);
+ let meta = self.object_store.head(&p).await.unwrap();
+ FileMetadata {
+ path: meta.location.to_string(),
+ name: p.filename().unwrap().to_string(),
+ size: meta.size,
+ num_records: None,
+ }
+ }
+
+ pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
+ self.list_dirs_as_paths(subdir)
+ .await
+ .into_iter()
+ .map(|p| p.filename().unwrap().to_string())
+ .collect()
+ }
+
+ pub async fn list_dirs_as_paths(&self, subdir: Option<&str>) ->
Vec<ObjPath> {
+ let mut prefix_url = self.base_url.clone();
+ if let Some(subdir) = subdir {
+ prefix_url.path_segments_mut().unwrap().push(subdir);
+ }
+ let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+ self.object_store
+ .list_with_delimiter(Some(&prefix))
+ .await
+ .unwrap()
+ .common_prefixes
+ }
+
+ pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileMetadata> {
+ let mut prefix_url = self.base_url.clone();
+ if let Some(subdir) = subdir {
+ prefix_url.path_segments_mut().unwrap().push(subdir);
+ }
+ let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+ self.object_store
+ .list_with_delimiter(Some(&prefix))
+ .await
+ .unwrap()
+ .objects
+ .into_iter()
+ .map(|obj_meta| {
+ FileMetadata::new(
+ obj_meta.location.to_string(),
+ obj_meta.location.filename().unwrap().to_string(),
+ obj_meta.size,
+ )
+ })
+ .collect()
+ }
+}
+
+#[async_recursion]
+pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) ->
Vec<String> {
+ let mut leaf_dirs = Vec::new();
+ let child_dirs = storage.list_dirs(subdir).await;
+ if child_dirs.is_empty() {
+ leaf_dirs.push(subdir.unwrap().to_owned());
+ } else {
+ for child_dir in child_dirs {
+ let mut next_subdir = PathBuf::new();
+ if let Some(curr) = subdir {
+ next_subdir.push(curr);
+ }
+ next_subdir.push(child_dir);
+ let curr_leaf_dir = get_leaf_dirs(storage,
Some(next_subdir.to_str().unwrap())).await;
+ leaf_dirs.extend(curr_leaf_dir);
+ }
+ }
+ leaf_dirs
+}
+
+#[cfg(test)]
+mod tests {
+ use object_store::path::Path as ObjPath;
+ use std::collections::{HashMap, HashSet};
+ use std::fs::canonicalize;
+ use std::path::Path;
+
+ use url::Url;
+
+ use crate::storage::{get_leaf_dirs, Storage};
+
+ #[tokio::test]
+ async fn storage_list_dirs() {
+ let base_url = Url::from_directory_path(
+ canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+ )
+ .unwrap();
+ let storage = Storage::new(base_url.path(), HashMap::new());
+ let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.into_iter().collect();
+ assert_eq!(
+ first_level_dirs,
+ vec![".hoodie", "part1", "part2", "part3"]
+ .into_iter()
+ .map(String::from)
+ .collect()
+ );
+ let second_level_dirs: Vec<String> =
storage.list_dirs(Some("part2")).await;
+ assert_eq!(second_level_dirs, vec!["part22"]);
+ let no_dirs = storage.list_dirs(Some("part1")).await;
+ assert!(no_dirs.is_empty());
+ }
+
+ #[tokio::test]
+ async fn storage_list_dirs_as_paths() {
+ let base_url = Url::from_directory_path(
+ canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+ )
+ .unwrap();
+ let storage = Storage::new(base_url.path(), HashMap::new());
+ let first_level_dirs: HashSet<ObjPath> =
+ storage.list_dirs_as_paths(None).await.into_iter().collect();
+ let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1",
"part2", "part3"]
+ .into_iter()
+ .map(|dir|
ObjPath::from_url_path(base_url.join(dir).unwrap().path()).unwrap())
+ .collect();
+ assert_eq!(first_level_dirs, expected_paths);
+ }
+
+ #[tokio::test]
+ async fn storage_list_files() {
+ let base_url = Url::from_directory_path(
+ canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+ )
+ .unwrap();
+ let storage = Storage::new(base_url.path(), HashMap::new());
+ let file_names_1: Vec<String> = storage
+ .list_files(None)
+ .await
+ .into_iter()
+ .map(|file_metadata| file_metadata.name)
+ .collect();
+ assert_eq!(file_names_1, vec!["a.parquet"]);
+ let file_names_2: Vec<String> = storage
+ .list_files(Some("part1"))
+ .await
+ .into_iter()
+ .map(|file_metadata| file_metadata.name)
+ .collect();
+ assert_eq!(file_names_2, vec!["b.parquet"]);
+ let file_names_3: Vec<String> = storage
+ .list_files(Some("part2/part22"))
+ .await
+ .into_iter()
+ .map(|file_metadata| file_metadata.name)
+ .collect();
+ assert_eq!(file_names_3, vec!["c.parquet"]);
+ }
+
+ #[tokio::test]
+ async fn use_storage_to_get_leaf_dirs() {
+ let base_url = Url::from_directory_path(
+ canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
+ )
+ .unwrap();
+ let storage = Storage::new(base_url.path(), HashMap::new());
+ let leaf_dirs = get_leaf_dirs(&storage, None).await;
+ assert_eq!(
+ leaf_dirs,
+ vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"]
+ );
+ }
+}
diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs
index 05a75db..2f64cc0 100644
--- a/crates/core/src/table/config.rs
+++ b/crates/core/src/table/config.rs
@@ -71,12 +71,8 @@ impl FromStr for TableType {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
- "copy_on_write" => Ok(Self::CopyOnWrite),
- "copy-on-write" => Ok(Self::CopyOnWrite),
- "cow" => Ok(Self::CopyOnWrite),
- "merge_on_read" => Ok(Self::MergeOnRead),
- "merge-on-read" => Ok(Self::MergeOnRead),
- "mor" => Ok(Self::MergeOnRead),
+ "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
+ "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
_ => Err(HudiCoreError::LoadTablePropertiesError),
}
}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index f672dfc..ae8812b 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -17,18 +17,14 @@
* under the License.
*/
-use std::error::Error;
+use std::collections::HashMap;
use std::path::{Path, PathBuf};
-use std::{fs, io};
-use hashbrown::HashMap;
+use anyhow::{anyhow, Result};
-use hudi_fs::file_systems::FileMetadata;
-
-use crate::error::HudiFileSystemViewError;
-use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
-use crate::utils::get_leaf_dirs;
+use crate::storage::file_metadata::FileMetadata;
+use crate::storage::{get_leaf_dirs, Storage};
#[derive(Clone, Debug)]
pub struct FileSystemView {
@@ -37,69 +33,46 @@ pub struct FileSystemView {
}
impl FileSystemView {
- pub fn new(base_path: &Path) -> Result<Self, HudiFileSystemViewError> {
- let mut fs_view = FileSystemView {
+ pub fn new(base_path: &Path) -> Self {
+ FileSystemView {
base_path: base_path.to_path_buf(),
partition_to_file_groups: HashMap::new(),
- };
- fs_view.load_partitions()?;
- Ok(fs_view)
- }
-
- fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
- match self.get_partition_paths() {
- Ok(partition_paths) => {
- for p in partition_paths {
- match self.get_file_groups(p.as_str()) {
- Ok(file_groups) => {
- self.partition_to_file_groups.insert(p,
file_groups);
- }
- Err(e) => return Err(FailToLoadPartitions(e)),
- }
- }
- }
- Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
}
- Ok(())
}
- pub fn get_partition_paths(&self) -> Result<Vec<String>, io::Error> {
- let mut first_level_partition_paths: Vec<PathBuf> = Vec::new();
- for entry in fs::read_dir(self.base_path.as_path())? {
- let p = entry?.path();
- if p.is_dir() && p.file_name().and_then(|e| e.to_str()) !=
Some(".hoodie") {
- first_level_partition_paths.push(p);
- }
- }
- let mut full_partition_paths: Vec<PathBuf> = Vec::new();
- for p in first_level_partition_paths {
- full_partition_paths.extend(get_leaf_dirs(p.as_path())?)
- }
- let common_prefix_len = self.base_path.to_str().unwrap().len() + 1;
+ async fn get_partition_paths(&self) -> Result<Vec<String>> {
+ let storage = Storage::new(self.base_path.to_str().unwrap(),
HashMap::new());
+ let top_level_dirs: Vec<String> = storage
+ .list_dirs(None)
+ .await
+ .into_iter()
+ .filter(|dir| dir != ".hoodie")
+ .collect();
let mut partition_paths = Vec::new();
- for p in full_partition_paths {
- let full_partition_path = p.to_str().unwrap();
-
partition_paths.push(full_partition_path[common_prefix_len..].to_owned())
+ for dir in top_level_dirs {
+ partition_paths.extend(get_leaf_dirs(&storage, Some(&dir)).await);
}
Ok(partition_paths)
}
- pub fn get_file_groups(&self, partition_path: &str) ->
Result<Vec<FileGroup>, Box<dyn Error>> {
- let mut part_path = self.base_path.to_path_buf();
- part_path.push(partition_path);
+ async fn get_file_groups(&self, partition_path: &str) ->
Result<Vec<FileGroup>> {
+ let storage = Storage::new(self.base_path.to_str().unwrap(),
HashMap::new());
+ let file_metadata: Vec<FileMetadata> = storage
+ .list_files(Some(partition_path))
+ .await
+ .into_iter()
+ .filter(|f| f.name.ends_with(".parquet"))
+ .collect();
let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
- for entry in fs::read_dir(part_path)? {
- let p = entry?.path();
- if p.is_file() && p.extension().and_then(|e| e.to_str()) ==
Some("parquet") {
- let file_metadata = FileMetadata::from_path(p.as_path())?;
- let base_file = BaseFile::from_file_metadata(file_metadata);
- let fg_id = &base_file.file_group_id;
- fg_id_to_base_files
- .entry(fg_id.to_owned())
- .or_default()
- .push(base_file);
- }
+ for f in file_metadata {
+ let base_file = BaseFile::from_file_metadata(f);
+ let fg_id = &base_file.file_group_id;
+ fg_id_to_base_files
+ .entry(fg_id.to_owned())
+ .or_default()
+ .push(base_file);
}
+
let mut file_groups: Vec<FileGroup> = Vec::new();
for (fg_id, base_files) in fg_id_to_base_files.into_iter() {
let mut fg = FileGroup::new(fg_id.to_owned(),
Some(partition_path.to_owned()));
@@ -111,8 +84,18 @@ impl FileSystemView {
Ok(file_groups)
}
- pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+ pub fn get_latest_file_slices(&mut self) -> Vec<&FileSlice> {
let mut file_slices = Vec::new();
+ let fs_view = self.clone();
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let wrapper = async { get_partitions_and_file_groups(&fs_view).await };
+ let result = rt.block_on(wrapper).unwrap();
+ for (k, v) in result {
+ self.partition_to_file_groups.insert(k, v);
+ }
for fgs in self.partition_to_file_groups.values() {
for fg in fgs {
if let Some(file_slice) = fg.get_latest_file_slice() {
@@ -124,6 +107,29 @@ impl FileSystemView {
}
}
+async fn get_partitions_and_file_groups(
+ fs_view: &FileSystemView,
+) -> Result<HashMap<String, Vec<FileGroup>>> {
+ match fs_view.get_partition_paths().await {
+ Ok(mut partition_paths) => {
+ if partition_paths.is_empty() {
+ partition_paths.push("".to_string());
+ }
+ let mut partition_to_file_groups = HashMap::new();
+ for p in partition_paths {
+ match fs_view.get_file_groups(p.as_str()).await {
+ Ok(file_groups) => {
+ partition_to_file_groups.insert(p, file_groups);
+ }
+ Err(e) => return Err(anyhow!("Failed to load partitions:
{}", e)),
+ }
+ }
+ Ok(partition_to_file_groups)
+ }
+ Err(e) => Err(anyhow!("Failed to load partitions: {}", e)),
+ }
+}
+
#[cfg(test)]
mod tests {
use std::collections::HashSet;
@@ -133,12 +139,12 @@ mod tests {
use crate::table::fs_view::FileSystemView;
- #[test]
- fn get_partition_paths() {
+ #[tokio::test]
+ async fn get_partition_paths() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let fs_view = FileSystemView::new(&target_table_path).unwrap();
- let partition_paths = fs_view.get_partition_paths().unwrap();
+ let fs_view = FileSystemView::new(&target_table_path);
+ let partition_paths = fs_view.get_partition_paths().await.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
@@ -151,7 +157,7 @@ mod tests {
fn get_latest_file_slices() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let fs_view = FileSystemView::new(&target_table_path).unwrap();
+ let mut fs_view = FileSystemView::new(&target_table_path);
let file_slices = fs_view.get_latest_file_slices();
assert_eq!(file_slices.len(), 5);
let mut fg_ids = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 2548554..d2584cd 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -101,7 +101,7 @@ impl Table {
pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>, Box<dyn
Error>> {
let mut file_slices = Vec::new();
- let fs_view = FileSystemView::new(self.base_path.as_path())?;
+ let mut fs_view = FileSystemView::new(self.base_path.as_path());
for f in fs_view.get_latest_file_slices() {
file_slices.push(f.clone());
}