This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f2e0dc7  feat(io): refactor FileIO to reuse storage operators and 
improve path handling (#106)
f2e0dc7 is described below

commit f2e0dc78e4ca05ff568463736e81dbac333667bd
Author: Zach <[email protected]>
AuthorDate: Thu Apr 2 11:46:23 2026 +0800

    feat(io): refactor FileIO to reuse storage operators and improve path 
handling (#106)
---
 crates/paimon/src/io/file_io.rs | 101 ++++++++++++++-----
 crates/paimon/src/io/storage.rs | 211 +++++++++++++++++++++++++++++-----------
 2 files changed, 230 insertions(+), 82 deletions(-)

diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 84c134a..aba327f 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -19,10 +19,11 @@ use crate::error::*;
 use std::collections::HashMap;
 use std::ops::Range;
 use std::sync::Arc;
+use std::time::SystemTime;
 
 use bytes::Bytes;
+use chrono::{DateTime, Utc};
 use opendal::raw::normalize_root;
-use opendal::raw::Timestamp;
 use opendal::Operator;
 use snafu::ResultExt;
 use url::Url;
@@ -105,7 +106,9 @@ impl FileIO {
         Ok(FileStatus {
             size: meta.content_length(),
             is_dir: meta.is_dir(),
-            last_modified: meta.last_modified(),
+            last_modified: meta
+                .last_modified()
+                .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
             path: path.to_string(),
         })
     }
@@ -129,20 +132,18 @@ impl FileIO {
         let mut statuses = Vec::new();
         let list_path_normalized = list_path.trim_start_matches('/');
         for entry in entries {
-            // opendal list_with includes the root directory itself as the 
first entry.
-            // The root entry's path equals list_path (with or without leading 
slash).
-            // Skip it so callers only see the direct children.
             let entry_path = entry.path();
-            let entry_path_normalized = entry_path.trim_start_matches('/');
-            if entry_path_normalized == list_path_normalized {
+            if entry_path.trim_start_matches('/') == list_path_normalized {
                 continue;
             }
             let meta = entry.metadata();
             statuses.push(FileStatus {
                 size: meta.content_length(),
                 is_dir: meta.is_dir(),
-                path: format!("{base_path}{}", entry.path()),
-                last_modified: meta.last_modified(),
+                path: format!("{base_path}{entry_path}"),
+                last_modified: meta
+                    .last_modified()
+                    .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
             });
         }
 
@@ -298,7 +299,7 @@ pub struct FileStatus {
     pub size: u64,
     pub is_dir: bool,
     pub path: String,
-    pub last_modified: Option<Timestamp>,
+    pub last_modified: Option<DateTime<Utc>>,
 }
 
 #[derive(Debug)]
@@ -324,7 +325,9 @@ impl InputFile {
             size: meta.content_length(),
             is_dir: meta.is_dir(),
             path: self.path.clone(),
-            last_modified: meta.last_modified(),
+            last_modified: meta
+                .last_modified()
+                .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
         })
     }
 
@@ -387,17 +390,11 @@ mod file_action_test {
     use bytes::Bytes;
 
     fn setup_memory_file_io() -> FileIO {
-        let storage = Storage::Memory;
-        FileIO {
-            storage: Arc::new(storage),
-        }
+        FileIOBuilder::new("memory").build().unwrap()
     }
 
     fn setup_fs_file_io() -> FileIO {
-        let storage = Storage::LocalFs;
-        FileIO {
-            storage: Arc::new(storage),
-        }
+        FileIOBuilder::new("file").build().unwrap()
     }
 
     async fn common_test_get_status(file_io: &FileIO, path: &str) {
@@ -500,6 +497,62 @@ mod file_action_test {
         common_test_delete_file(&file_io, 
"memory:/test_file_delete_mem").await;
     }
 
+    #[tokio::test]
+    async fn test_empty_path_should_return_error_for_exists_fs() {
+        let file_io = setup_fs_file_io();
+        let result = file_io.exists("").await;
+        assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
+    }
+
+    #[tokio::test]
+    async fn test_empty_path_should_return_error_for_exists_memory() {
+        let file_io = setup_memory_file_io();
+        let result = file_io.exists("").await;
+        assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
+    }
+
+    #[tokio::test]
+    async fn test_memory_operator_reuse_across_file_io_calls() {
+        let file_io = setup_memory_file_io();
+        let path = "memory:/tmp/reuse_case";
+        let dir = "memory:/tmp/";
+
+        file_io
+            .new_output(path)
+            .unwrap()
+            .write(Bytes::from("data"))
+            .await
+            .unwrap();
+
+        assert!(file_io.exists(path).await.unwrap());
+        assert_eq!(file_io.get_status(path).await.unwrap().size, 4);
+        assert!(file_io
+            .list_status(dir)
+            .await
+            .unwrap()
+            .iter()
+            .any(|status| status.path == path));
+
+        file_io.delete_dir(dir).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_memory_operator_not_shared_between_file_io_instances() {
+        let file_io_1 = setup_memory_file_io();
+        let file_io_2 = setup_memory_file_io();
+        let path = "memory:/tmp/reuse_isolation_case";
+
+        file_io_1
+            .new_output(path)
+            .unwrap()
+            .write(Bytes::from("data"))
+            .await
+            .unwrap();
+
+        assert!(file_io_1.exists(path).await.unwrap());
+        assert!(!file_io_2.exists(path).await.unwrap());
+    }
+
     #[tokio::test]
     async fn test_get_status_fs() {
         let file_io = setup_fs_file_io();
@@ -548,17 +601,11 @@ mod input_output_test {
     use bytes::Bytes;
 
     fn setup_memory_file_io() -> FileIO {
-        let storage = Storage::Memory;
-        FileIO {
-            storage: Arc::new(storage),
-        }
+        FileIOBuilder::new("memory").build().unwrap()
     }
 
     fn setup_fs_file_io() -> FileIO {
-        let storage = Storage::LocalFs;
-        FileIO {
-            storage: Arc::new(storage),
-        }
+        FileIOBuilder::new("file").build().unwrap()
     }
 
     async fn common_test_output_file_write_and_read(file_io: &FileIO, path: 
&str) {
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
index 77739a1..31eab86 100644
--- a/crates/paimon/src/io/storage.rs
+++ b/crates/paimon/src/io/storage.rs
@@ -15,11 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
+#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+use std::sync::{Mutex, MutexGuard};
+
 #[cfg(feature = "storage-oss")]
 use opendal::services::OssConfig;
 #[cfg(feature = "storage-s3")]
 use opendal::services::S3Config;
 use opendal::{Operator, Scheme};
+#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+use url::Url;
 
 use crate::error;
 
@@ -29,37 +35,49 @@ use super::FileIOBuilder;
 #[derive(Debug)]
 pub enum Storage {
     #[cfg(feature = "storage-memory")]
-    Memory,
+    Memory { op: Operator },
     #[cfg(feature = "storage-fs")]
-    LocalFs,
+    LocalFs { op: Operator },
     #[cfg(feature = "storage-oss")]
-    Oss { config: Box<OssConfig> },
+    Oss {
+        config: Box<OssConfig>,
+        operators: Mutex<HashMap<String, Operator>>,
+    },
     #[cfg(feature = "storage-s3")]
-    S3 { config: Box<S3Config> },
+    S3 {
+        config: Box<S3Config>,
+        operators: Mutex<HashMap<String, Operator>>,
+    },
 }
 
 impl Storage {
     pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> 
{
-        let (scheme_str, _props) = file_io_builder.into_parts();
+        let (scheme_str, props) = file_io_builder.into_parts();
         let scheme = Self::parse_scheme(&scheme_str)?;
 
         match scheme {
             #[cfg(feature = "storage-memory")]
-            Scheme::Memory => Ok(Self::Memory),
+            Scheme::Memory => Ok(Self::Memory {
+                op: super::memory_config_build()?,
+            }),
             #[cfg(feature = "storage-fs")]
-            Scheme::Fs => Ok(Self::LocalFs),
+            Scheme::Fs => Ok(Self::LocalFs {
+                op: super::fs_config_build()?,
+            }),
             #[cfg(feature = "storage-oss")]
             Scheme::Oss => {
-                let config = super::oss_config_parse(_props)?;
+                let config = super::oss_config_parse(props)?;
                 Ok(Self::Oss {
                     config: Box::new(config),
+                    operators: Mutex::new(HashMap::new()),
                 })
             }
             #[cfg(feature = "storage-s3")]
             Scheme::S3 => {
-                let config = super::s3_config_parse(_props)?;
+                let config = super::s3_config_parse(props)?;
                 Ok(Self::S3 {
                     config: Box::new(config),
+                    operators: Mutex::new(HashMap::new()),
                 })
             }
             _ => Err(error::Error::IoUnsupported {
@@ -71,58 +89,141 @@ impl Storage {
     pub(crate) fn create<'a>(&self, path: &'a str) -> crate::Result<(Operator, 
&'a str)> {
         match self {
             #[cfg(feature = "storage-memory")]
-            Storage::Memory => {
-                let op = super::memory_config_build()?;
-
-                if let Some(stripped) = path.strip_prefix("memory:/") {
-                    Ok((op, stripped))
-                } else {
-                    Ok((op, &path[1..]))
-                }
-            }
+            Storage::Memory { op } => Ok((op.clone(), 
Self::memory_relative_path(path)?)),
             #[cfg(feature = "storage-fs")]
-            Storage::LocalFs => {
-                let op = super::fs_config_build()?;
-
-                if let Some(stripped) = path.strip_prefix("file:/") {
-                    Ok((op, stripped))
-                } else {
-                    Ok((op, &path[1..]))
-                }
-            }
+            Storage::LocalFs { op } => Ok((op.clone(), 
Self::fs_relative_path(path)?)),
             #[cfg(feature = "storage-oss")]
-            Storage::Oss { config } => {
-                let op = super::oss_config_build(config, path)?;
-                let prefix = format!("oss://{}/", op.info().name());
-                if let Some(stripped) = path.strip_prefix(&prefix) {
-                    Ok((op, stripped))
-                } else {
-                    Err(error::Error::ConfigInvalid {
-                        message: format!("Invalid OSS url: {path}, should 
start with {prefix}"),
-                    })
-                }
+            Storage::Oss { config, operators } => {
+                let (bucket, relative_path) = 
Self::oss_bucket_and_relative_path(path)?;
+                let op = Self::cached_oss_operator(config, operators, path, 
&bucket)?;
+                Ok((op, relative_path))
             }
             #[cfg(feature = "storage-s3")]
-            Storage::S3 { config } => {
-                let op = super::s3_config_build(config, path)?;
-                // Support both s3:// and s3a:// URL prefixes.
-                let info = op.info();
-                let bucket = info.name();
-                let s3_prefix = format!("s3://{}/", bucket);
-                let s3a_prefix = format!("s3a://{}/", bucket);
-                if let Some(stripped) = path.strip_prefix(&s3_prefix) {
-                    Ok((op, stripped))
-                } else if let Some(stripped) = path.strip_prefix(&s3a_prefix) {
-                    Ok((op, stripped))
-                } else {
-                    Err(error::Error::ConfigInvalid {
-                        message: format!(
-                            "Invalid S3 url: {path}, should start with 
{s3_prefix} or {s3a_prefix}"
-                        ),
-                    })
-                }
+            Storage::S3 { config, operators } => {
+                let (bucket, relative_path) = 
Self::s3_bucket_and_relative_path(path)?;
+                let op = Self::cached_s3_operator(config, operators, path, 
&bucket)?;
+                Ok((op, relative_path))
+            }
+        }
+    }
+
+    #[cfg(feature = "storage-memory")]
+    fn memory_relative_path(path: &str) -> crate::Result<&str> {
+        if let Some(stripped) = path.strip_prefix("memory:/") {
+            Ok(stripped)
+        } else {
+            path.get(1..).ok_or_else(|| error::Error::ConfigInvalid {
+                message: format!("Invalid memory path: {path}"),
+            })
+        }
+    }
+
+    #[cfg(feature = "storage-fs")]
+    fn fs_relative_path(path: &str) -> crate::Result<&str> {
+        if let Some(stripped) = path.strip_prefix("file:/") {
+            Ok(stripped)
+        } else {
+            path.get(1..).ok_or_else(|| error::Error::ConfigInvalid {
+                message: format!("Invalid file path: {path}"),
+            })
+        }
+    }
+
+    #[cfg(feature = "storage-oss")]
+    fn oss_bucket_and_relative_path(path: &str) -> crate::Result<(String, 
&str)> {
+        let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
+            message: format!("Invalid OSS url: {path}"),
+        })?;
+        let bucket = url
+            .host_str()
+            .ok_or_else(|| error::Error::ConfigInvalid {
+                message: format!("Invalid OSS url: {path}, missing bucket"),
+            })?
+            .to_string();
+        let prefix = format!("oss://{bucket}/");
+        let relative_path =
+            path.strip_prefix(&prefix)
+                .ok_or_else(|| error::Error::ConfigInvalid {
+                    message: format!("Invalid OSS url: {path}, should start 
with {prefix}"),
+                })?;
+        Ok((bucket, relative_path))
+    }
+
+    #[cfg(feature = "storage-s3")]
+    fn s3_bucket_and_relative_path<'a>(path: &'a str) -> 
crate::Result<(String, &'a str)> {
+        let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
+            message: format!("Invalid S3 url: {path}"),
+        })?;
+        let bucket = url
+            .host_str()
+            .ok_or_else(|| error::Error::ConfigInvalid {
+                message: format!("Invalid S3 url: {path}, missing bucket"),
+            })?
+            .to_string();
+        let scheme = url.scheme();
+        let prefix = match scheme {
+            "s3" | "s3a" => format!("{scheme}://{bucket}/"),
+            _ => {
+                return Err(error::Error::ConfigInvalid {
+                    message: format!(
+                        "Invalid S3 url: {path}, should start with 
s3://{bucket}/ or s3a://{bucket}/"
+                    ),
+                });
             }
+        };
+        let relative_path =
+            path.strip_prefix(&prefix)
+                .ok_or_else(|| error::Error::ConfigInvalid {
+                    message: format!(
+                    "Invalid S3 url: {path}, should start with s3://{bucket}/ 
or s3a://{bucket}/"
+                ),
+                })?;
+        Ok((bucket, relative_path))
+    }
+
+    #[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+    fn lock_operator_cache<'a>(
+        operators: &'a Mutex<HashMap<String, Operator>>,
+        storage_name: &str,
+    ) -> crate::Result<MutexGuard<'a, HashMap<String, Operator>>> {
+        operators.lock().map_err(|_| error::Error::UnexpectedError {
+            message: format!("Failed to lock {storage_name} operator cache"),
+            source: None,
+        })
+    }
+
+    #[cfg(feature = "storage-oss")]
+    fn cached_oss_operator(
+        config: &OssConfig,
+        operators: &Mutex<HashMap<String, Operator>>,
+        path: &str,
+        bucket: &str,
+    ) -> crate::Result<Operator> {
+        let mut operators = Self::lock_operator_cache(operators, "OSS")?;
+        if let Some(op) = operators.get(bucket) {
+            return Ok(op.clone());
+        }
+
+        let op = super::oss_config_build(config, path)?;
+        operators.insert(bucket.to_string(), op.clone());
+        Ok(op)
+    }
+
+    #[cfg(feature = "storage-s3")]
+    fn cached_s3_operator(
+        config: &S3Config,
+        operators: &Mutex<HashMap<String, Operator>>,
+        path: &str,
+        bucket: &str,
+    ) -> crate::Result<Operator> {
+        let mut operators = Self::lock_operator_cache(operators, "S3")?;
+        if let Some(op) = operators.get(bucket) {
+            return Ok(op.clone());
         }
+
+        let op = super::s3_config_build(config, path)?;
+        operators.insert(bucket.to_string(), op.clone());
+        Ok(op)
     }
 
     fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {

Reply via email to