This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 22bb7b7 fix(io): return correct list_status paths and reuse storage
operators (#101)
22bb7b7 is described below
commit 22bb7b7fcac68933d8ce8371c0aec5a02560895f
Author: Zach <[email protected]>
AuthorDate: Tue Mar 3 21:48:46 2026 +0800
fix(io): return correct list_status paths and reuse storage operators (#101)
---
crates/paimon/src/io/file_io.rs | 43 ++++++++++++++++++++++++++++++++++++++++-
1 file changed, 42 insertions(+), 1 deletion(-)
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index e10e59d..3b0a4d6 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -117,6 +117,7 @@ impl FileIO {
/// FIXME: how to handle large dir? Better to return a stream instead?
pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
let (op, relative_path) = self.storage.create(path)?;
+ let base_path = &path[..path.len() - relative_path.len()];
// Opendal list() expects directory path to end with `/`.
// use normalize_root to make sure it end with `/`.
let list_path = normalize_root(relative_path);
@@ -137,7 +138,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
- path: entry.path().to_string(),
+ path: format!("{base_path}{}", entry.path()),
last_modified: meta.last_modified(),
});
}
@@ -381,6 +382,7 @@ impl OutputFile {
#[cfg(test)]
mod file_action_test {
+ use std::collections::BTreeSet;
use std::fs;
use super::*;
@@ -461,6 +463,39 @@ mod file_action_test {
file_io.delete_file(dst).await.unwrap();
}
+ async fn common_test_list_status_paths(file_io: &FileIO, dir_path: &str) {
+ if let Some(local_dir) = dir_path.strip_prefix("file:/") {
+ let _ = fs::remove_dir_all(local_dir);
+ }
+
+ file_io.mkdirs(dir_path).await.unwrap();
+
+ let file_a = format!("{dir_path}a.txt");
+ let file_b = format!("{dir_path}b.txt");
+ for file in [&file_a, &file_b] {
+ file_io
+ .new_output(file)
+ .unwrap()
+ .write(Bytes::from("test data"))
+ .await
+ .unwrap();
+ }
+
+ let statuses = file_io.list_status(dir_path).await.unwrap();
+ assert_eq!(statuses.len(), 2);
+
+ let expected_paths: BTreeSet<String> =
+ [file_a.clone(), file_b.clone()].into_iter().collect();
+ let actual_paths: BTreeSet<String> =
+ statuses.iter().map(|status| status.path.clone()).collect();
+ assert_eq!(
+ actual_paths, expected_paths,
+ "list_status should return exact entry paths"
+ );
+
+ file_io.delete_dir(dir_path).await.unwrap();
+ }
+
#[tokio::test]
async fn test_delete_file_memory() {
let file_io = setup_memory_file_io();
@@ -501,6 +536,12 @@ mod file_action_test {
)
.await;
}
+
+ #[tokio::test]
+ async fn test_list_status_fs_should_return_entry_paths() {
+ let file_io = setup_fs_file_io();
+ common_test_list_status_paths(&file_io,
"file:/tmp/test_list_status_paths_fs/").await;
+ }
}
#[cfg(test)]