This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new eae0761  feature: add more object storage backends (#346)
eae0761 is described below

commit eae07614ea6f5e578bf5974214afdca703f8b496
Author: Jiwen liu <[email protected]>
AuthorDate: Tue Jun 2 09:22:54 2026 +0800

    feature: add more object storage backends (#346)
---
 crates/paimon/Cargo.toml                           |  16 +-
 .../paimon/src/catalog/rest/rest_token_file_io.rs  |   2 +-
 crates/paimon/src/io/file_io.rs                    | 156 +++++++-
 crates/paimon/src/io/mod.rs                        |  29 ++
 crates/paimon/src/io/storage.rs                    | 239 +++++++++---
 crates/paimon/src/io/storage_azdls.rs              | 405 +++++++++++++++++++++
 crates/paimon/src/io/storage_config.rs             |  60 +++
 crates/paimon/src/io/storage_cos.rs                | 136 +++++++
 crates/paimon/src/io/storage_gcs.rs                | 201 ++++++++++
 crates/paimon/src/io/storage_obs.rs                | 133 +++++++
 crates/paimon/src/io/storage_s3.rs                 |  76 +---
 docs/src/architecture.md                           |   2 +-
 docs/src/getting-started.md                        |  36 ++
 docs/src/index.md                                  |   2 +-
 14 files changed, 1369 insertions(+), 124 deletions(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 184c25e..eeae73a 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -30,7 +30,17 @@ version.workspace = true
 
 [features]
 default = ["storage-memory", "storage-fs", "storage-oss"]
-storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3", 
"storage-hdfs"]
+storage-all = [
+    "storage-memory",
+    "storage-fs",
+    "storage-oss",
+    "storage-s3",
+    "storage-cos",
+    "storage-azdls",
+    "storage-obs",
+    "storage-gcs",
+    "storage-hdfs",
+]
 fulltext = ["tantivy", "tempfile"]
 vortex = ["dep:vortex", "dep:kanal"]
 
@@ -38,6 +48,10 @@ storage-memory = ["opendal/services-memory"]
 storage-fs = ["opendal/services-fs"]
 storage-oss = ["opendal/services-oss"]
 storage-s3 = ["opendal/services-s3"]
+storage-cos = ["opendal/services-cos"]
+storage-azdls = ["opendal/services-azdls"]
+storage-obs = ["opendal/services-obs"]
+storage-gcs = ["opendal/services-gcs"]
 storage-hdfs = ["opendal/services-hdfs-native"]
 
 [dependencies]
diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs 
b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
index 6233eb1..502a0d9 100644
--- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs
+++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
@@ -29,7 +29,6 @@ use crate::api::rest_api::RESTApi;
 use crate::api::rest_util::RESTUtil;
 use crate::catalog::Identifier;
 use crate::common::{CatalogOptions, Options};
-use crate::io::storage_oss::OSS_ENDPOINT;
 use crate::io::FileIO;
 use crate::Result;
 
@@ -37,6 +36,7 @@ use super::rest_token::RESTToken;
 
 /// Safe time margin (in milliseconds) before token expiration to trigger 
refresh.
 const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
+const OSS_ENDPOINT: &str = "fs.oss.endpoint";
 
 /// A FileIO wrapper that supports getting data access tokens from a REST 
Server.
 ///
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 7e78004..2144ed7 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -148,7 +148,7 @@ impl FileIO {
             statuses.push(FileStatus {
                 size: meta.content_length(),
                 is_dir: meta.is_dir(),
-                path: format!("{base_path}{entry_path}"),
+                path: status_path(base_path, entry_path),
                 last_modified: meta
                     .last_modified()
                     .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
@@ -186,7 +186,7 @@ impl FileIO {
             statuses.push(FileStatus {
                 size: meta.content_length(),
                 is_dir: false,
-                path: format!("{base_path}{entry_path}"),
+                path: status_path(base_path, entry_path),
                 last_modified: meta
                     .last_modified()
                     .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
@@ -280,6 +280,14 @@ impl FileIO {
     }
 }
 
+fn status_path(base_path: &str, entry_path: &str) -> String {
+    if base_path.ends_with('/') || entry_path.starts_with('/') {
+        format!("{base_path}{entry_path}")
+    } else {
+        format!("{base_path}/{entry_path}")
+    }
+}
+
 fn looks_like_windows_drive_path(path: &str) -> bool {
     let bytes = path.as_bytes();
     bytes.len() >= 3
@@ -708,6 +716,150 @@ mod file_action_test {
     }
 }
 
+#[cfg(all(
+    test,
+    any(
+        feature = "storage-cos",
+        feature = "storage-obs",
+        feature = "storage-gcs",
+        feature = "storage-azdls"
+    )
+))]
+mod object_storage_path_test {
+    use super::*;
+
+    fn assert_relative_paths(file_io: &FileIO, path: &str, 
expected_relative_path: &str) {
+        let input = file_io.new_input(path).unwrap();
+        assert_eq!(input.location(), path);
+        assert_eq!(
+            &input.path[input.relative_path_pos..],
+            expected_relative_path
+        );
+
+        let output = file_io.new_output(path).unwrap();
+        assert_eq!(output.location(), path);
+        assert_eq!(
+            &output.path[output.relative_path_pos..],
+            expected_relative_path
+        );
+
+        let (_op, relative_path) = file_io.storage.create(path).unwrap();
+        assert_eq!(relative_path, expected_relative_path);
+
+        let base_path = &path[..path.len() - relative_path.len()];
+        assert_eq!(format!("{base_path}{relative_path}"), path);
+    }
+
+    #[cfg(feature = "storage-azdls")]
+    #[test]
+    fn test_azdls_root_status_path_without_trailing_slash() {
+        assert_eq!(
+            status_path(
+                "abfs://[email protected]",
+                "warehouse/"
+            ),
+            "abfs://[email protected]/warehouse/"
+        );
+        assert_eq!(
+            status_path(
+                "abfs://[email protected]/",
+                "warehouse/"
+            ),
+            "abfs://[email protected]/warehouse/"
+        );
+    }
+
+    #[cfg(feature = "storage-cos")]
+    #[test]
+    fn test_cos_file_io_relative_paths_and_scheme_aliases() {
+        for scheme in ["cosn", "cos"] {
+            let path = 
format!("{scheme}://bucket/warehouse/table/data.parquet");
+            let dir_path = format!("{scheme}://bucket/warehouse/table/");
+            let file_io = FileIO::from_path(&path)
+                .unwrap()
+                .with_props([
+                    ("fs.cosn.endpoint", 
"https://cos.ap-shanghai.myqcloud.com";),
+                    ("fs.cosn.userinfo.secretId", "secret-id"),
+                    ("fs.cosn.userinfo.secretKey", "secret-key"),
+                    ("fs.cosn.disable-config-load", "true"),
+                ])
+                .build()
+                .unwrap();
+
+            assert_relative_paths(&file_io, &path, 
"warehouse/table/data.parquet");
+            assert_relative_paths(&file_io, &dir_path, "warehouse/table/");
+        }
+    }
+
+    #[cfg(feature = "storage-obs")]
+    #[test]
+    fn test_obs_file_io_relative_paths() {
+        let file_io = FileIO::from_path("obs://bucket/warehouse")
+            .unwrap()
+            .with_props([
+                (
+                    "fs.obs.endpoint",
+                    "https://obs.cn-north-4.myhuaweicloud.com";,
+                ),
+                ("fs.obs.access.key", "access-key"),
+                ("fs.obs.secret.key", "secret-key"),
+            ])
+            .build()
+            .unwrap();
+
+        assert_relative_paths(
+            &file_io,
+            "obs://bucket/warehouse/table/data.parquet",
+            "warehouse/table/data.parquet",
+        );
+        assert_relative_paths(
+            &file_io,
+            "obs://bucket/warehouse/table/",
+            "warehouse/table/",
+        );
+    }
+
+    #[cfg(feature = "storage-gcs")]
+    #[test]
+    fn test_gcs_file_io_relative_paths_and_scheme_aliases() {
+        for scheme in ["gs", "gcs"] {
+            let path = 
format!("{scheme}://bucket/warehouse/table/data.parquet");
+            let dir_path = format!("{scheme}://bucket/warehouse/table/");
+            let file_io = FileIO::from_path(&path)
+                .unwrap()
+                .with_props([
+                    ("gcs.allow-anonymous", "true"),
+                    ("gcs.disable-config-load", "true"),
+                    ("gcs.disable-vm-metadata", "true"),
+                ])
+                .build()
+                .unwrap();
+
+            assert_relative_paths(&file_io, &path, 
"warehouse/table/data.parquet");
+            assert_relative_paths(&file_io, &dir_path, "warehouse/table/");
+        }
+    }
+
+    #[cfg(feature = "storage-azdls")]
+    #[test]
+    fn test_azdls_file_io_relative_paths_and_scheme_aliases() {
+        for scheme in ["abfs", "abfss"] {
+            let path = format!(
+                
"{scheme}://[email protected]/warehouse/data.parquet"
+            );
+            let dir_path = 
format!("{scheme}://[email protected]/warehouse/");
+            let file_io = FileIO::from_path(&path)
+                .unwrap()
+                .with_prop("azure.account-key", "account-key")
+                .build()
+                .unwrap();
+
+            assert_relative_paths(&file_io, &path, "warehouse/data.parquet");
+            assert_relative_paths(&file_io, &dir_path, "warehouse/");
+        }
+    }
+}
+
 #[cfg(test)]
 mod input_output_test {
     use super::*;
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index 7e49c3c..31f7c0d 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -21,6 +21,15 @@ pub use file_io::*;
 mod storage;
 pub use storage::*;
 
+#[cfg(any(
+    feature = "storage-s3",
+    feature = "storage-cos",
+    feature = "storage-azdls",
+    feature = "storage-obs",
+    feature = "storage-gcs"
+))]
+mod storage_config;
+
 #[cfg(feature = "storage-fs")]
 mod storage_fs;
 #[cfg(feature = "storage-fs")]
@@ -41,6 +50,26 @@ mod storage_s3;
 #[cfg(feature = "storage-s3")]
 use storage_s3::*;
 
+#[cfg(feature = "storage-cos")]
+mod storage_cos;
+#[cfg(feature = "storage-cos")]
+use storage_cos::*;
+
+#[cfg(feature = "storage-azdls")]
+mod storage_azdls;
+#[cfg(feature = "storage-azdls")]
+use storage_azdls::*;
+
+#[cfg(feature = "storage-obs")]
+mod storage_obs;
+#[cfg(feature = "storage-obs")]
+use storage_obs::*;
+
+#[cfg(feature = "storage-gcs")]
+mod storage_gcs;
+#[cfg(feature = "storage-gcs")]
+use storage_gcs::*;
+
 #[cfg(feature = "storage-hdfs")]
 mod storage_hdfs;
 #[cfg(feature = "storage-hdfs")]
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
index a57fcfc..59d2740 100644
--- a/crates/paimon/src/io/storage.rs
+++ b/crates/paimon/src/io/storage.rs
@@ -17,22 +17,47 @@
 
 use std::collections::HashMap;
 #[cfg(any(
+    feature = "storage-azdls",
+    feature = "storage-cos",
+    feature = "storage-gcs",
     feature = "storage-oss",
+    feature = "storage-obs",
     feature = "storage-s3",
     feature = "storage-hdfs"
 ))]
 use std::sync::Mutex;
-#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+#[cfg(any(
+    feature = "storage-azdls",
+    feature = "storage-cos",
+    feature = "storage-gcs",
+    feature = "storage-oss",
+    feature = "storage-obs",
+    feature = "storage-s3"
+))]
 use std::sync::MutexGuard;
 
+#[cfg(feature = "storage-azdls")]
+use super::AzdlsStorageConfig;
+#[cfg(feature = "storage-cos")]
+use opendal::services::CosConfig;
+#[cfg(feature = "storage-gcs")]
+use opendal::services::GcsConfig;
 #[cfg(feature = "storage-hdfs")]
 use opendal::services::HdfsNativeConfig;
+#[cfg(feature = "storage-obs")]
+use opendal::services::ObsConfig;
 #[cfg(feature = "storage-oss")]
 use opendal::services::OssConfig;
 #[cfg(feature = "storage-s3")]
 use opendal::services::S3Config;
 use opendal::{Operator, Scheme};
-#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+#[cfg(any(
+    feature = "storage-cos",
+    feature = "storage-gcs",
+    feature = "storage-oss",
+    feature = "storage-obs",
+    feature = "storage-s3"
+))]
 use url::Url;
 
 use crate::error;
@@ -56,6 +81,26 @@ pub enum Storage {
         config: Box<S3Config>,
         operators: Mutex<HashMap<String, Operator>>,
     },
+    #[cfg(feature = "storage-cos")]
+    Cos {
+        config: Box<CosConfig>,
+        operators: Mutex<HashMap<String, Operator>>,
+    },
+    #[cfg(feature = "storage-azdls")]
+    Azdls {
+        config: Box<AzdlsStorageConfig>,
+        operators: Mutex<HashMap<String, Operator>>,
+    },
+    #[cfg(feature = "storage-obs")]
+    Obs {
+        config: Box<ObsConfig>,
+        operators: Mutex<HashMap<String, Operator>>,
+    },
+    #[cfg(feature = "storage-gcs")]
+    Gcs {
+        config: Box<GcsConfig>,
+        operators: Mutex<HashMap<String, Operator>>,
+    },
     #[cfg(feature = "storage-hdfs")]
     Hdfs {
         config: Box<HdfsNativeConfig>,
@@ -93,6 +138,38 @@ impl Storage {
                     operators: Mutex::new(HashMap::new()),
                 })
             }
+            #[cfg(feature = "storage-cos")]
+            Scheme::Cos => {
+                let config = super::cos_config_parse(props)?;
+                Ok(Self::Cos {
+                    config: Box::new(config),
+                    operators: Mutex::new(HashMap::new()),
+                })
+            }
+            #[cfg(feature = "storage-azdls")]
+            Scheme::Azdls => {
+                let config = super::azdls_config_parse(props)?;
+                Ok(Self::Azdls {
+                    config: Box::new(config),
+                    operators: Mutex::new(HashMap::new()),
+                })
+            }
+            #[cfg(feature = "storage-obs")]
+            Scheme::Obs => {
+                let config = super::obs_config_parse(props)?;
+                Ok(Self::Obs {
+                    config: Box::new(config),
+                    operators: Mutex::new(HashMap::new()),
+                })
+            }
+            #[cfg(feature = "storage-gcs")]
+            Scheme::Gcs => {
+                let config = super::gcs_config_parse(props)?;
+                Ok(Self::Gcs {
+                    config: Box::new(config),
+                    operators: Mutex::new(HashMap::new()),
+                })
+            }
             #[cfg(feature = "storage-hdfs")]
             Scheme::HdfsNative => {
                 let config = super::hdfs_config_parse(props)?;
@@ -115,16 +192,54 @@ impl Storage {
             Storage::LocalFs { op } => Ok((op.clone(), 
Self::fs_relative_path(path)?)),
             #[cfg(feature = "storage-oss")]
             Storage::Oss { config, operators } => {
-                let (bucket, relative_path) = 
Self::oss_bucket_and_relative_path(path)?;
+                let (bucket, relative_path) =
+                    Self::bucket_and_relative_path(path, "OSS", &["oss"])?;
                 let op = Self::cached_oss_operator(config, operators, path, 
&bucket)?;
                 Ok((op, relative_path))
             }
             #[cfg(feature = "storage-s3")]
             Storage::S3 { config, operators } => {
-                let (bucket, relative_path) = 
Self::s3_bucket_and_relative_path(path)?;
+                let (bucket, relative_path) =
+                    Self::bucket_and_relative_path(path, "S3", &["s3", 
"s3a"])?;
                 let op = Self::cached_s3_operator(config, operators, path, 
&bucket)?;
                 Ok((op, relative_path))
             }
+            #[cfg(feature = "storage-cos")]
+            Storage::Cos { config, operators } => {
+                let (bucket, relative_path) =
+                    Self::bucket_and_relative_path(path, "COS", &["cos", 
"cosn"])?;
+                let op = Self::cached_operator(operators, "COS", &bucket, || {
+                    super::cos_config_build(config, path)
+                })?;
+                Ok((op, relative_path))
+            }
+            #[cfg(feature = "storage-azdls")]
+            Storage::Azdls { config, operators } => {
+                let relative_path = super::azdls_relative_path(path)?;
+                let cache_key = super::azdls_operator_cache_key(config, path)?;
+                let op = Self::cached_operator(operators, "Azure", &cache_key, 
|| {
+                    super::azdls_config_build(config, path)
+                })?;
+                Ok((op, relative_path))
+            }
+            #[cfg(feature = "storage-obs")]
+            Storage::Obs { config, operators } => {
+                let (bucket, relative_path) =
+                    Self::bucket_and_relative_path(path, "OBS", &["obs"])?;
+                let op = Self::cached_operator(operators, "OBS", &bucket, || {
+                    super::obs_config_build(config, path)
+                })?;
+                Ok((op, relative_path))
+            }
+            #[cfg(feature = "storage-gcs")]
+            Storage::Gcs { config, operators } => {
+                let (bucket, relative_path) =
+                    Self::bucket_and_relative_path(path, "GCS", &["gcs", 
"gs"])?;
+                let op = Self::cached_operator(operators, "GCS", &bucket, || {
+                    super::gcs_config_build(config, path)
+                })?;
+                Ok((op, relative_path))
+            }
             #[cfg(feature = "storage-hdfs")]
             Storage::Hdfs { config, op } => {
                 let relative_path = super::hdfs_relative_path(path)?;
@@ -166,59 +281,52 @@ impl Storage {
         }
     }
 
-    #[cfg(feature = "storage-oss")]
-    fn oss_bucket_and_relative_path(path: &str) -> crate::Result<(String, 
&str)> {
-        let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
-            message: format!("Invalid OSS url: {path}"),
-        })?;
-        let bucket = url
-            .host_str()
-            .ok_or_else(|| error::Error::ConfigInvalid {
-                message: format!("Invalid OSS url: {path}, missing bucket"),
-            })?
-            .to_string();
-        let prefix = format!("oss://{bucket}/");
-        let relative_path =
-            path.strip_prefix(&prefix)
-                .ok_or_else(|| error::Error::ConfigInvalid {
-                    message: format!("Invalid OSS url: {path}, should start 
with {prefix}"),
-                })?;
-        Ok((bucket, relative_path))
-    }
-
-    #[cfg(feature = "storage-s3")]
-    fn s3_bucket_and_relative_path(path: &str) -> crate::Result<(String, 
&str)> {
+    #[cfg(any(
+        feature = "storage-cos",
+        feature = "storage-gcs",
+        feature = "storage-obs",
+        feature = "storage-oss",
+        feature = "storage-s3"
+    ))]
+    fn bucket_and_relative_path<'a>(
+        path: &'a str,
+        storage_name: &str,
+        allowed_schemes: &[&str],
+    ) -> crate::Result<(String, &'a str)> {
         let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
-            message: format!("Invalid S3 url: {path}"),
+            message: format!("Invalid {storage_name} url: {path}"),
         })?;
         let bucket = url
             .host_str()
             .ok_or_else(|| error::Error::ConfigInvalid {
-                message: format!("Invalid S3 url: {path}, missing bucket"),
+                message: format!("Invalid {storage_name} url: {path}, missing 
bucket"),
             })?
             .to_string();
         let scheme = url.scheme();
-        let prefix = match scheme {
-            "s3" | "s3a" => format!("{scheme}://{bucket}/"),
-            _ => {
-                return Err(error::Error::ConfigInvalid {
-                    message: format!(
-                        "Invalid S3 url: {path}, should start with 
s3://{bucket}/ or s3a://{bucket}/"
-                    ),
-                });
-            }
-        };
+        if !allowed_schemes.contains(&scheme) {
+            return Err(error::Error::ConfigInvalid {
+                message: format!("Invalid {storage_name} url: {path}, 
unsupported scheme {scheme}"),
+            });
+        }
+        let prefix = format!("{scheme}://{bucket}/");
         let relative_path =
             path.strip_prefix(&prefix)
                 .ok_or_else(|| error::Error::ConfigInvalid {
                     message: format!(
-                    "Invalid S3 url: {path}, should start with s3://{bucket}/ 
or s3a://{bucket}/"
-                ),
+                        "Invalid {storage_name} url: {path}, should start with 
{prefix}"
+                    ),
                 })?;
         Ok((bucket, relative_path))
     }
 
-    #[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+    #[cfg(any(
+        feature = "storage-azdls",
+        feature = "storage-cos",
+        feature = "storage-gcs",
+        feature = "storage-oss",
+        feature = "storage-obs",
+        feature = "storage-s3"
+    ))]
     fn lock_operator_cache<'a>(
         operators: &'a Mutex<HashMap<String, Operator>>,
         storage_name: &str,
@@ -229,6 +337,30 @@ impl Storage {
         })
     }
 
+    #[cfg(any(
+        feature = "storage-azdls",
+        feature = "storage-cos",
+        feature = "storage-gcs",
+        feature = "storage-oss",
+        feature = "storage-obs",
+        feature = "storage-s3"
+    ))]
+    fn cached_operator(
+        operators: &Mutex<HashMap<String, Operator>>,
+        storage_name: &str,
+        cache_key: &str,
+        build: impl FnOnce() -> crate::Result<Operator>,
+    ) -> crate::Result<Operator> {
+        let mut operators = Self::lock_operator_cache(operators, 
storage_name)?;
+        if let Some(op) = operators.get(cache_key) {
+            return Ok(op.clone());
+        }
+
+        let op = build()?;
+        operators.insert(cache_key.to_string(), op.clone());
+        Ok(op)
+    }
+
     #[cfg(feature = "storage-oss")]
     fn cached_oss_operator(
         config: &OssConfig,
@@ -236,14 +368,9 @@ impl Storage {
         path: &str,
         bucket: &str,
     ) -> crate::Result<Operator> {
-        let mut operators = Self::lock_operator_cache(operators, "OSS")?;
-        if let Some(op) = operators.get(bucket) {
-            return Ok(op.clone());
-        }
-
-        let op = super::oss_config_build(config, path)?;
-        operators.insert(bucket.to_string(), op.clone());
-        Ok(op)
+        Self::cached_operator(operators, "OSS", bucket, || {
+            super::oss_config_build(config, path)
+        })
     }
 
     #[cfg(feature = "storage-s3")]
@@ -253,14 +380,9 @@ impl Storage {
         path: &str,
         bucket: &str,
     ) -> crate::Result<Operator> {
-        let mut operators = Self::lock_operator_cache(operators, "S3")?;
-        if let Some(op) = operators.get(bucket) {
-            return Ok(op.clone());
-        }
-
-        let op = super::s3_config_build(config, path)?;
-        operators.insert(bucket.to_string(), op.clone());
-        Ok(op)
+        Self::cached_operator(operators, "S3", bucket, || {
+            super::s3_config_build(config, path)
+        })
     }
 
     fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
@@ -268,6 +390,9 @@ impl Storage {
             "memory" => Ok(Scheme::Memory),
             "file" | "" => Ok(Scheme::Fs),
             "s3" | "s3a" => Ok(Scheme::S3),
+            "cosn" => Ok(Scheme::Cos),
+            "abfs" | "abfss" | "az" | "azure" => Ok(Scheme::Azdls),
+            "gs" => Ok(Scheme::Gcs),
             "hdfs" => Ok(Scheme::HdfsNative),
             s => Ok(s.parse::<Scheme>()?),
         }
diff --git a/crates/paimon/src/io/storage_azdls.rs 
b/crates/paimon/src/io/storage_azdls.rs
new file mode 100644
index 0000000..8a71cdd
--- /dev/null
+++ b/crates/paimon/src/io/storage_azdls.rs
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::AzdlsConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const AZURE_ENDPOINT: &str = "azure.endpoint";
+const AZURE_ACCOUNT_NAME: &str = "azure.account-name";
+const AZURE_ACCOUNT_KEY: &str = "azure.account-key";
+const AZURE_SAS_TOKEN: &str = "azure.sas-token";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.azure.", "fs.abfs.", "abfs.", "abfss.", 
"azure."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+    ("azure.account-name", "azure.account.name"),
+    ("azure.account_name", "azure.account.name"),
+    ("azure.account-key", "azure.account.key"),
+    ("azure.account_key", "azure.account.key"),
+    ("azure.sas-token", "azure.sas.token"),
+    ("azure.sas_token", "azure.sas.token"),
+    ("azure.client-id", "azure.client.id"),
+    ("azure.client_id", "azure.client.id"),
+    ("azure.client-secret", "azure.client.secret"),
+    ("azure.client_secret", "azure.client.secret"),
+    ("azure.tenant-id", "azure.tenant.id"),
+    ("azure.tenant_id", "azure.tenant.id"),
+    ("azure.authority-host", "azure.authority.host"),
+    ("azure.authority_host", "azure.authority.host"),
+];
+
+#[derive(Debug, Clone)]
+pub struct AzdlsStorageConfig {
+    config: AzdlsConfig,
+    normalized: HashMap<String, String>,
+}
+
+pub(crate) fn azdls_config_parse(props: HashMap<String, String>) -> 
Result<AzdlsStorageConfig> {
+    let normalized = normalize_storage_config(props, CONFIG_PREFIXES, 
"azure.", MIRRORED_KEYS);
+    let config = config_from_normalized(&normalized);
+
+    Ok(AzdlsStorageConfig { config, normalized })
+}
+
+pub(crate) fn azdls_config_build(cfg: &AzdlsStorageConfig, path: &str) -> 
Result<Operator> {
+    let (cfg, relative_path) = azdls_config_for_path(cfg, path)?;
+
+    let builder = cfg.into_builder();
+    let op = Operator::new(builder)?.finish();
+
+    debug_assert_eq!(
+        relative_path,
+        azdls_relative_path(path).unwrap_or(relative_path)
+    );
+    Ok(op)
+}
+
+pub(crate) fn azdls_operator_cache_key(cfg: &AzdlsStorageConfig, path: &str) 
-> Result<String> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid Azure url: {path}"),
+    })?;
+    let filesystem = if cfg.config.filesystem.is_empty() {
+        filesystem_from_url(&url, path)?
+    } else {
+        cfg.config.filesystem.clone()
+    };
+    let endpoint = effective_endpoint(&cfg.config, &url)?;
+
+    Ok(format!("{}|{}", endpoint.trim_end_matches('/'), filesystem))
+}
+
+fn azdls_config_for_path<'a>(
+    storage_cfg: &AzdlsStorageConfig,
+    path: &'a str,
+) -> Result<(AzdlsConfig, &'a str)> {
+    let (filesystem, relative_path) = 
azdls_filesystem_and_relative_path(path)?;
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid Azure url: {path}"),
+    })?;
+
+    let mut cfg = storage_cfg.config.clone();
+    if cfg.filesystem.is_empty() {
+        cfg.filesystem = filesystem;
+    }
+
+    let endpoint = effective_endpoint(&cfg, &url)?;
+    apply_account_scoped_config(&mut cfg, &storage_cfg.normalized, &endpoint);
+    cfg.endpoint = Some(endpoint);
+    cfg.root = Some("/".to_string());
+
+    Ok((cfg, relative_path))
+}
+
+fn config_from_normalized(normalized: &HashMap<String, String>) -> AzdlsConfig 
{
+    AzdlsConfig {
+        endpoint: normalized.get(AZURE_ENDPOINT).cloned(),
+        account_name: normalized.get(AZURE_ACCOUNT_NAME).cloned(),
+        account_key: normalized.get(AZURE_ACCOUNT_KEY).cloned(),
+        sas_token: normalized.get(AZURE_SAS_TOKEN).cloned(),
+        client_id: normalized.get("azure.client-id").cloned(),
+        client_secret: normalized.get("azure.client-secret").cloned(),
+        tenant_id: normalized.get("azure.tenant-id").cloned(),
+        authority_host: normalized.get("azure.authority-host").cloned(),
+        ..Default::default()
+    }
+}
+
+fn effective_endpoint(cfg: &AzdlsConfig, url: &Url) -> Result<String> {
+    cfg.endpoint
+        .as_ref()
+        .map(|endpoint| endpoint.trim_end_matches('/').to_string())
+        .map(Ok)
+        .unwrap_or_else(|| default_endpoint(url))
+}
+
+pub(crate) fn azdls_filesystem_and_relative_path(path: &str) -> 
Result<(String, &str)> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid Azure url: {path}"),
+    })?;
+
+    let filesystem = filesystem_from_url(&url, path)?;
+
+    Ok((filesystem, azdls_relative_path(path)?))
+}
+
+pub(crate) fn azdls_relative_path(path: &str) -> Result<&str> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid Azure url: {path}"),
+    })?;
+
+    let path_start = path
+        .find("://")
+        .map(|pos| pos + 3)
+        .ok_or_else(|| Error::ConfigInvalid {
+            message: format!("Invalid Azure url: {path}"),
+        })?;
+    let after_scheme = &path[path_start..];
+    let path_start = after_scheme.find('/').map(|pos| path_start + pos + 1);
+    let url_path = path_start.map(|pos| &path[pos..]).unwrap_or("");
+
+    if !url.username().is_empty()
+        || !url
+            .host_str()
+            .ok_or_else(|| Error::ConfigInvalid {
+                message: format!("Invalid Azure url: {path}, missing 
filesystem"),
+            })?
+            .contains('.')
+    {
+        Ok(url_path)
+    } else {
+        let (_filesystem, relative_path) = 
url_path.split_once('/').unwrap_or((url_path, ""));
+        Ok(relative_path)
+    }
+}
+
+fn filesystem_from_url(url: &Url, path: &str) -> Result<String> {
+    if !url.username().is_empty() {
+        return Ok(url.username().to_string());
+    }
+
+    let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+        message: format!("Invalid Azure url: {path}, missing filesystem"),
+    })?;
+
+    if !host.contains('.') {
+        return Ok(host.to_string());
+    }
+
+    url.path()
+        .strip_prefix('/')
+        .unwrap_or(url.path())
+        .split('/')
+        .next()
+        .filter(|v| !v.is_empty())
+        .ok_or_else(|| Error::ConfigInvalid {
+            message: format!("Invalid Azure url: {path}, missing filesystem"),
+        })
+        .map(|v| v.to_string())
+}
+
+fn default_endpoint(url: &Url) -> Result<String> {
+    if !url.username().is_empty() {
+        let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+            message: format!("Invalid Azure url: {url}, missing account host"),
+        })?;
+        return Ok(format!("https://{host}";));
+    }
+
+    let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+        message: format!("Invalid Azure url: {url}, missing account"),
+    })?;
+
+    if host.contains('.') {
+        Ok(format!("https://{host}";))
+    } else {
+        Err(Error::ConfigInvalid {
+            message: format!(
+                "Invalid Azure url: {url}, missing account host; set 
azure.endpoint for {host}"
+            ),
+        })
+    }
+}
+
+fn apply_account_scoped_config(
+    cfg: &mut AzdlsConfig,
+    normalized: &HashMap<String, String>,
+    endpoint: &str,
+) {
+    let Some(host) = endpoint_host(endpoint) else {
+        return;
+    };
+    let account = host.split('.').next().unwrap_or(host.as_str());
+
+    if cfg.account_key.is_none() {
+        cfg.account_key = first_scoped_value(
+            normalized,
+            &[
+                "azure.account.key",
+                "azure.account-key",
+                "azure.account_key",
+            ],
+            &[host.as_str(), account],
+        );
+    }
+
+    if cfg.sas_token.is_none() {
+        cfg.sas_token = first_scoped_value(
+            normalized,
+            &[
+                "azure.sas.token",
+                "azure.sas-token",
+                "azure.sas_token",
+                "azure.sas.fixed.token",
+                "azure.fixed.sas.token",
+            ],
+            &[host.as_str(), account],
+        );
+    }
+}
+
+fn endpoint_host(endpoint: &str) -> Option<String> {
+    Url::parse(endpoint)
+        .ok()
+        .and_then(|url| url.host_str().map(|host| host.to_string()))
+}
+
+fn first_scoped_value(
+    normalized: &HashMap<String, String>,
+    prefixes: &[&str],
+    suffixes: &[&str],
+) -> Option<String> {
+    prefixes.iter().find_map(|prefix| {
+        suffixes
+            .iter()
+            .find_map(|suffix| 
normalized.get(&format!("{prefix}.{suffix}")).cloned())
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), v.to_string()))
+            .collect()
+    }
+
+    #[test]
+    fn test_azdls_config_parse_keys() {
+        let props = make_props(&[
+            ("fs.azure.account.key", "key"),
+            ("fs.azure.sas.token", "sas"),
+            ("azure.endpoint", "https://account.dfs.core.windows.net";),
+        ]);
+
+        let cfg = azdls_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.config.endpoint.as_deref(),
+            Some("https://account.dfs.core.windows.net";)
+        );
+        assert_eq!(cfg.config.account_key.as_deref(), Some("key"));
+        assert_eq!(cfg.config.sas_token.as_deref(), Some("sas"));
+    }
+
+    #[test]
+    fn test_azdls_config_parse_aliases() {
+        let props = make_props(&[
+            ("azure.account-name", "account"),
+            ("azure.client-secret", "secret"),
+            ("azure.tenant-id", "tenant"),
+        ]);
+
+        let cfg = azdls_config_parse(props).unwrap();
+        assert_eq!(cfg.config.account_name.as_deref(), Some("account"));
+        assert_eq!(cfg.config.client_secret.as_deref(), Some("secret"));
+        assert_eq!(cfg.config.tenant_id.as_deref(), Some("tenant"));
+    }
+
+    #[test]
+    fn test_azdls_config_uses_account_scoped_hadoop_key() {
+        let cfg = azdls_config_parse(make_props(&[(
+            "fs.azure.account.key.account.dfs.core.windows.net",
+            "account-key",
+        )]))
+        .unwrap();
+
+        let (cfg, _) =
+            azdls_config_for_path(&cfg, 
"abfs://[email protected]/path/to/file")
+                .unwrap();
+
+        assert_eq!(cfg.account_key.as_deref(), Some("account-key"));
+    }
+
+    #[test]
+    fn test_azdls_path_hadoop_authority_form() {
+        let (filesystem, relative_path) =
+            
azdls_filesystem_and_relative_path("abfs://[email protected]/a/b")
+                .unwrap();
+        assert_eq!(filesystem, "fs");
+        assert_eq!(relative_path, "a/b");
+    }
+
+    #[test]
+    fn test_azdls_path_fsspec_form() {
+        let (filesystem, relative_path) =
+            azdls_filesystem_and_relative_path("abfs://fs/a/b").unwrap();
+        assert_eq!(filesystem, "fs");
+        assert_eq!(relative_path, "a/b");
+    }
+
+    #[test]
+    fn test_azdls_config_build_hadoop_form() {
+        let cfg = azdls_config_parse(HashMap::new()).unwrap();
+
+        let op = azdls_config_build(&cfg, 
"abfs://[email protected]/a/b").unwrap();
+        assert_eq!(op.info().name(), "fs");
+    }
+
+    #[test]
+    fn test_azdls_config_build_fsspec_form_requires_endpoint() {
+        let cfg = azdls_config_parse(HashMap::new()).unwrap();
+        let result = azdls_config_build(&cfg, "abfs://fs/a/b");
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_azdls_config_build_fsspec_form_with_endpoint() {
+        let cfg = azdls_config_parse(make_props(&[(
+            "azure.endpoint",
+            "https://account.dfs.core.windows.net";,
+        )]))
+        .unwrap();
+
+        let op = azdls_config_build(&cfg, "abfs://fs/a/b").unwrap();
+        assert_eq!(op.info().name(), "fs");
+    }
+
+    #[test]
+    fn test_azdls_cache_key_includes_account_host() {
+        let cfg = azdls_config_parse(HashMap::new()).unwrap();
+
+        let account_a = azdls_operator_cache_key(
+            &cfg,
+            "abfs://[email protected]/path/to/file",
+        )
+        .unwrap();
+        let account_b = azdls_operator_cache_key(
+            &cfg,
+            "abfs://[email protected]/path/to/file",
+        )
+        .unwrap();
+
+        assert_ne!(account_a, account_b);
+        assert_eq!(account_a, "https://account-a.dfs.core.windows.net|fs");
+    }
+
+    #[test]
+    fn test_azdls_config_build_missing_filesystem() {
+        let cfg = azdls_config_parse(HashMap::new()).unwrap();
+        let result = azdls_config_build(&cfg, 
"abfs:///path/without/filesystem");
+        assert!(result.is_err());
+    }
+}
diff --git a/crates/paimon/src/io/storage_config.rs 
b/crates/paimon/src/io/storage_config.rs
new file mode 100644
index 0000000..90206db
--- /dev/null
+++ b/crates/paimon/src/io/storage_config.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+pub(super) fn normalize_storage_config(
+    props: HashMap<String, String>,
+    config_prefixes: &[&str],
+    canonical_prefix: &str,
+    mirrored_keys: &[(&str, &str)],
+) -> HashMap<String, String> {
+    let mut result = HashMap::new();
+
+    for prefix in config_prefixes {
+        for (key, value) in &props {
+            if let Some(suffix) = key.strip_prefix(prefix) {
+                result.insert(format!("{canonical_prefix}{suffix}"), 
value.clone());
+            }
+        }
+    }
+
+    let mirrored_additions: Vec<(String, String)> = mirrored_keys
+        .iter()
+        .flat_map(|(a, b)| {
+            let mut pairs = Vec::new();
+
+            if !result.contains_key(*b) {
+                if let Some(v) = result.get(*a) {
+                    pairs.push((b.to_string(), v.clone()));
+                }
+            }
+            if !result.contains_key(*a) {
+                if let Some(v) = result.get(*b) {
+                    pairs.push((a.to_string(), v.clone()));
+                }
+            }
+            pairs
+        })
+        .collect();
+
+    for (k, v) in mirrored_additions {
+        result.insert(k, v);
+    }
+
+    result
+}
diff --git a/crates/paimon/src/io/storage_cos.rs 
b/crates/paimon/src/io/storage_cos.rs
new file mode 100644
index 0000000..b8bb3ea
--- /dev/null
+++ b/crates/paimon/src/io/storage_cos.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::CosConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const COS_ENDPOINT: &str = "fs.cosn.endpoint";
+const COS_SECRET_ID: &str = "fs.cosn.userinfo.secretId";
+const COS_SECRET_KEY: &str = "fs.cosn.userinfo.secretKey";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.cosn.", "cosn.", "cos."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+    ("fs.cosn.endpoint", "fs.cosn.userinfo.endpoint"),
+    ("fs.cosn.secret_id", "fs.cosn.userinfo.secretId"),
+    ("fs.cosn.secret-id", "fs.cosn.userinfo.secretId"),
+    ("fs.cosn.secret_key", "fs.cosn.userinfo.secretKey"),
+    ("fs.cosn.secret-key", "fs.cosn.userinfo.secretKey"),
+];
+
+pub(crate) fn cos_config_parse(props: HashMap<String, String>) -> 
Result<CosConfig> {
+    let normalized = normalize_storage_config(props, CONFIG_PREFIXES, 
"fs.cosn.", MIRRORED_KEYS);
+
+    let cfg = CosConfig {
+        endpoint: normalized.get(COS_ENDPOINT).cloned(),
+        secret_id: normalized.get(COS_SECRET_ID).cloned(),
+        secret_key: normalized.get(COS_SECRET_KEY).cloned(),
+        enable_versioning: normalized
+            .get("fs.cosn.enable-versioning")
+            .is_some_and(|v| v.eq_ignore_ascii_case("true")),
+        disable_config_load: normalized
+            .get("fs.cosn.disable-config-load")
+            .is_some_and(|v| v.eq_ignore_ascii_case("true")),
+        ..Default::default()
+    };
+
+    Ok(cfg)
+}
+
+pub(crate) fn cos_config_build(cfg: &CosConfig, path: &str) -> 
Result<Operator> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid COS url: {path}"),
+    })?;
+
+    let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+        message: format!("Invalid COS url: {path}, missing bucket"),
+    })?;
+
+    let builder = cfg.clone().into_builder().bucket(bucket);
+    Ok(Operator::new(builder)?.finish())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), v.to_string()))
+            .collect()
+    }
+
+    #[test]
+    fn test_cos_config_parse_hadoop_keys() {
+        let props = make_props(&[
+            ("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com";),
+            ("fs.cosn.userinfo.secretId", "sid"),
+            ("fs.cosn.userinfo.secretKey", "skey"),
+        ]);
+
+        let cfg = cos_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://cos.ap-shanghai.myqcloud.com";)
+        );
+        assert_eq!(cfg.secret_id.as_deref(), Some("sid"));
+        assert_eq!(cfg.secret_key.as_deref(), Some("skey"));
+    }
+
+    #[test]
+    fn test_cos_config_parse_canonical_aliases() {
+        let props = make_props(&[
+            ("cos.endpoint", "https://cos.ap-singapore.myqcloud.com";),
+            ("cos.secret-id", "sid"),
+            ("cos.secret-key", "skey"),
+        ]);
+
+        let cfg = cos_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://cos.ap-singapore.myqcloud.com";)
+        );
+        assert_eq!(cfg.secret_id.as_deref(), Some("sid"));
+        assert_eq!(cfg.secret_key.as_deref(), Some("skey"));
+    }
+
+    #[test]
+    fn test_cos_config_build_extracts_bucket() {
+        let cfg = CosConfig {
+            endpoint: Some("https://cos.ap-shanghai.myqcloud.com".to_string()),
+            ..Default::default()
+        };
+
+        let op = cos_config_build(&cfg, "cosn://my-bucket/some/path").unwrap();
+        assert_eq!(op.info().name(), "my-bucket");
+    }
+
+    #[test]
+    fn test_cos_config_build_missing_bucket() {
+        let cfg = CosConfig::default();
+        let result = cos_config_build(&cfg, "cosn:///path/without/bucket");
+        assert!(result.is_err());
+    }
+}
diff --git a/crates/paimon/src/io/storage_gcs.rs 
b/crates/paimon/src/io/storage_gcs.rs
new file mode 100644
index 0000000..575dae3
--- /dev/null
+++ b/crates/paimon/src/io/storage_gcs.rs
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::GcsConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const GCS_ENDPOINT: &str = "gcs.endpoint";
+const GCS_CREDENTIAL: &str = "gcs.credential";
+const GCS_CREDENTIAL_PATH: &str = "gcs.credential-path";
+const GCS_SERVICE_ACCOUNT: &str = "gcs.service-account";
+const GCS_ALLOW_ANONYMOUS: &str = "gcs.allow-anonymous";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.gs.", "fs.gcs.", "gs.", "gcs."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+    ("gcs.credential-path", "gcs.google_application_credentials"),
+    ("gcs.credential-path", "gcs.google-application-credentials"),
+    ("gcs.credential-path", "gcs.application-credentials"),
+    ("gcs.credential", "gcs.google_service_account_key"),
+    ("gcs.credential", "gcs.google-service-account-key"),
+    ("gcs.credential", "gcs.service-account-key"),
+    ("gcs.credential", "gcs.service_account_key"),
+    ("gcs.service-account", "gcs.google_service_account"),
+    ("gcs.service-account", "gcs.google-service-account"),
+    ("gcs.service-account", "gcs.service_account"),
+    ("gcs.predefined-acl", "gcs.predefined_acl"),
+    ("gcs.default-storage-class", "gcs.default_storage_class"),
+    ("gcs.allow-anonymous", "gcs.google_skip_signature"),
+    ("gcs.allow-anonymous", "gcs.google-skip-signature"),
+    ("gcs.allow_anonymous", "gcs.google_skip_signature"),
+    ("gcs.allow-anonymous", "gcs.allow_anonymous"),
+    ("gcs.allow-anonymous", "gcs.skip-signature"),
+    ("gcs.allow-anonymous", "gcs.skip_signature"),
+    ("gcs.skip-signature", "gcs.google_skip_signature"),
+    ("gcs.skip_signature", "gcs.google_skip_signature"),
+    ("gcs.disable-vm-metadata", "gcs.disable_vm_metadata"),
+    ("gcs.disable-config-load", "gcs.disable_config_load"),
+];
+
+#[allow(clippy::field_reassign_with_default)]
+pub(crate) fn gcs_config_parse(props: HashMap<String, String>) -> 
Result<GcsConfig> {
+    let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "gcs.", 
MIRRORED_KEYS);
+
+    let mut cfg = GcsConfig::default();
+    cfg.endpoint = normalized.get(GCS_ENDPOINT).cloned();
+    cfg.credential = normalized.get(GCS_CREDENTIAL).cloned();
+    cfg.credential_path = normalized.get(GCS_CREDENTIAL_PATH).cloned();
+    cfg.service_account = normalized.get(GCS_SERVICE_ACCOUNT).cloned();
+    cfg.scope = normalized.get("gcs.scope").cloned();
+    cfg.predefined_acl = normalized.get("gcs.predefined-acl").cloned();
+    cfg.default_storage_class = 
normalized.get("gcs.default-storage-class").cloned();
+    cfg.token = normalized.get("gcs.token").cloned();
+    cfg.allow_anonymous = normalized
+        .get(GCS_ALLOW_ANONYMOUS)
+        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+    cfg.disable_vm_metadata = normalized
+        .get("gcs.disable-vm-metadata")
+        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+    cfg.disable_config_load = normalized
+        .get("gcs.disable-config-load")
+        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+
+    Ok(cfg)
+}
+
+pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> 
Result<Operator> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid GCS url: {path}"),
+    })?;
+
+    let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+        message: format!("Invalid GCS url: {path}, missing bucket"),
+    })?;
+
+    let builder = cfg.clone().into_builder().bucket(bucket);
+    Ok(Operator::new(builder)?.finish())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), v.to_string()))
+            .collect()
+    }
+
+    #[test]
+    fn test_gcs_config_parse_keys() {
+        let props = make_props(&[
+            ("fs.gs.endpoint", "https://storage.googleapis.com";),
+            ("fs.gs.google_application_credentials", "/tmp/gcs.json"),
+            ("fs.gs.google_service_account_key", "credential-json"),
+            (
+                "fs.gs.google_service_account",
+                "[email protected]",
+            ),
+            ("fs.gs.predefined_acl", "bucketOwnerFullControl"),
+            ("fs.gs.default_storage_class", "NEARLINE"),
+        ]);
+
+        let cfg = gcs_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://storage.googleapis.com";)
+        );
+        assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json"));
+        assert_eq!(cfg.credential.as_deref(), Some("credential-json"));
+        assert_eq!(
+            cfg.service_account.as_deref(),
+            Some("[email protected]")
+        );
+        assert_eq!(
+            cfg.predefined_acl.as_deref(),
+            Some("bucketOwnerFullControl")
+        );
+        assert_eq!(cfg.default_storage_class.as_deref(), Some("NEARLINE"));
+    }
+
+    #[test]
+    fn test_gcs_config_parse_canonical_aliases() {
+        let props = make_props(&[
+            ("gcs.credential-path", "/tmp/gcs.json"),
+            ("gcs.allow-anonymous", "true"),
+        ]);
+
+        let cfg = gcs_config_parse(props).unwrap();
+        assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json"));
+        assert!(cfg.allow_anonymous);
+    }
+
+    #[test]
+    fn test_gcs_config_parse_opendal_aliases() {
+        let props = make_props(&[
+            (
+                "gcs.google_application_credentials",
+                "/tmp/opendal-gcs.json",
+            ),
+            ("gcs.google_service_account_key", "credential-json"),
+            (
+                "gcs.google_service_account",
+                "[email protected]",
+            ),
+            ("gcs.google_skip_signature", "true"),
+            ("gcs.disable_vm_metadata", "true"),
+            ("gcs.disable_config_load", "true"),
+        ]);
+
+        let cfg = gcs_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.credential_path.as_deref(),
+            Some("/tmp/opendal-gcs.json")
+        );
+        assert_eq!(cfg.credential.as_deref(), Some("credential-json"));
+        assert_eq!(
+            cfg.service_account.as_deref(),
+            Some("[email protected]")
+        );
+        assert!(cfg.allow_anonymous);
+        assert!(cfg.disable_vm_metadata);
+        assert!(cfg.disable_config_load);
+    }
+
+    #[test]
+    fn test_gcs_config_build_extracts_bucket() {
+        let cfg = GcsConfig::default();
+
+        let op = gcs_config_build(&cfg, "gs://my-bucket/some/path").unwrap();
+        assert_eq!(op.info().name(), "my-bucket");
+    }
+
+    #[test]
+    fn test_gcs_config_build_missing_bucket() {
+        let cfg = GcsConfig::default();
+        let result = gcs_config_build(&cfg, "gs:///path/without/bucket");
+        assert!(result.is_err());
+    }
+}
diff --git a/crates/paimon/src/io/storage_obs.rs 
b/crates/paimon/src/io/storage_obs.rs
new file mode 100644
index 0000000..2fde849
--- /dev/null
+++ b/crates/paimon/src/io/storage_obs.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::ObsConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const OBS_ENDPOINT: &str = "fs.obs.endpoint";
+const OBS_ACCESS_KEY_ID: &str = "fs.obs.access.key";
+const OBS_SECRET_ACCESS_KEY: &str = "fs.obs.secret.key";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.obs.", "obs."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+    ("fs.obs.access-key-id", "fs.obs.access.key"),
+    ("fs.obs.access_key_id", "fs.obs.access.key"),
+    ("fs.obs.secret-access-key", "fs.obs.secret.key"),
+    ("fs.obs.secret_access_key", "fs.obs.secret.key"),
+];
+
+#[allow(clippy::field_reassign_with_default)]
+pub(crate) fn obs_config_parse(props: HashMap<String, String>) -> 
Result<ObsConfig> {
+    let normalized = normalize_storage_config(props, CONFIG_PREFIXES, 
"fs.obs.", MIRRORED_KEYS);
+
+    let mut cfg = ObsConfig::default();
+    cfg.endpoint = normalized.get(OBS_ENDPOINT).cloned();
+    cfg.access_key_id = normalized.get(OBS_ACCESS_KEY_ID).cloned();
+    cfg.secret_access_key = normalized.get(OBS_SECRET_ACCESS_KEY).cloned();
+    cfg.enable_versioning = normalized
+        .get("fs.obs.enable-versioning")
+        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+
+    Ok(cfg)
+}
+
+pub(crate) fn obs_config_build(cfg: &ObsConfig, path: &str) -> 
Result<Operator> {
+    let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+        message: format!("Invalid OBS url: {path}"),
+    })?;
+
+    let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+        message: format!("Invalid OBS url: {path}, missing bucket"),
+    })?;
+
+    let builder = cfg.clone().into_builder().bucket(bucket);
+    Ok(Operator::new(builder)?.finish())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+        pairs
+            .iter()
+            .map(|(k, v)| (k.to_string(), v.to_string()))
+            .collect()
+    }
+
+    #[test]
+    fn test_obs_config_parse_hadoop_keys() {
+        let props = make_props(&[
+            (
+                "fs.obs.endpoint",
+                "https://obs.cn-north-4.myhuaweicloud.com";,
+            ),
+            ("fs.obs.access.key", "ak"),
+            ("fs.obs.secret.key", "sk"),
+        ]);
+
+        let cfg = obs_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://obs.cn-north-4.myhuaweicloud.com";)
+        );
+        assert_eq!(cfg.access_key_id.as_deref(), Some("ak"));
+        assert_eq!(cfg.secret_access_key.as_deref(), Some("sk"));
+    }
+
+    #[test]
+    fn test_obs_config_parse_canonical_aliases() {
+        let props = make_props(&[
+            ("obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com";),
+            ("obs.access-key-id", "ak"),
+            ("obs.secret-access-key", "sk"),
+        ]);
+
+        let cfg = obs_config_parse(props).unwrap();
+        assert_eq!(
+            cfg.endpoint.as_deref(),
+            Some("https://obs.cn-north-4.myhuaweicloud.com";)
+        );
+        assert_eq!(cfg.access_key_id.as_deref(), Some("ak"));
+        assert_eq!(cfg.secret_access_key.as_deref(), Some("sk"));
+    }
+
+    #[test]
+    #[allow(clippy::field_reassign_with_default)]
+    fn test_obs_config_build_extracts_bucket() {
+        let mut cfg = ObsConfig::default();
+        cfg.endpoint = 
Some("https://obs.cn-north-4.myhuaweicloud.com".to_string());
+
+        let op = obs_config_build(&cfg, "obs://my-bucket/some/path").unwrap();
+        assert_eq!(op.info().name(), "my-bucket");
+    }
+
+    #[test]
+    fn test_obs_config_build_missing_bucket() {
+        let cfg = ObsConfig::default();
+        let result = obs_config_build(&cfg, "obs:///path/without/bucket");
+        assert!(result.is_err());
+    }
+}
diff --git a/crates/paimon/src/io/storage_s3.rs 
b/crates/paimon/src/io/storage_s3.rs
index 57b77c7..6fe20b8 100644
--- a/crates/paimon/src/io/storage_s3.rs
+++ b/crates/paimon/src/io/storage_s3.rs
@@ -24,6 +24,8 @@ use url::Url;
 use crate::error::Error;
 use crate::Result;
 
+use super::storage_config::normalize_storage_config;
+
 /// Configuration key for S3 endpoint.
 ///
 /// Compatible with paimon-java's `s3.endpoint` / `fs.s3a.endpoint`.
@@ -79,14 +81,18 @@ const MIRRORED_KEYS: &[(&str, &str)] = &[
 /// By default, virtual-hosted style addressing is enabled (matching AWS
 /// and Java Paimon behavior). Set `s3.path-style-access=true` to switch
 /// to path-style for S3-compatible stores like MinIO.
+#[allow(clippy::field_reassign_with_default)]
 pub(crate) fn s3_config_parse(props: HashMap<String, String>) -> 
Result<S3Config> {
-    let normalized = normalize_config(props);
+    let normalized = normalize_storage_config(props, JAVA_CONFIG_PREFIXES, 
"s3.", MIRRORED_KEYS);
 
     let mut cfg = S3Config::default();
 
     // Default to virtual-hosted style, matching AWS and Java Paimon.
     // Only disable when path-style-access is explicitly set to true.
-    cfg.enable_virtual_host_style = true;
+    let path_style_access = normalized
+        .get(S3_PATH_STYLE_ACCESS)
+        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+    cfg.enable_virtual_host_style = !path_style_access;
 
     // Core connection settings.
     cfg.endpoint = normalized.get(S3_ENDPOINT).cloned();
@@ -94,12 +100,6 @@ pub(crate) fn s3_config_parse(props: HashMap<String, 
String>) -> Result<S3Config
     cfg.secret_access_key = normalized.get(S3_SECRET_KEY).cloned();
     cfg.region = normalized.get(S3_REGION).cloned();
 
-    if let Some(v) = normalized.get(S3_PATH_STYLE_ACCESS) {
-        if v.eq_ignore_ascii_case("true") {
-            cfg.enable_virtual_host_style = false;
-        }
-    }
-
     // Session / assume-role credentials.
     cfg.session_token = normalized.get("s3.session.token").cloned();
     cfg.role_arn = normalized.get("s3.assumed.role.arn").cloned();
@@ -107,11 +107,9 @@ pub(crate) fn s3_config_parse(props: HashMap<String, 
String>) -> Result<S3Config
     cfg.role_session_name = 
normalized.get("s3.assumed.role.session.name").cloned();
 
     // Anonymous access.
-    if let Some(v) = normalized.get("s3.anonymous") {
-        if v.eq_ignore_ascii_case("true") {
-            cfg.allow_anonymous = true;
-        }
-    }
+    cfg.allow_anonymous = normalized
+        .get("s3.anonymous")
+        .is_some_and(|v| v.eq_ignore_ascii_case("true"));
 
     // Server-side encryption.
     cfg.server_side_encryption = normalized.get("s3.sse.type").cloned();
@@ -144,53 +142,6 @@ pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) 
-> Result<Operator> {
     Ok(Operator::new(builder)?.finish())
 }
 
-/// Normalize Java-compatible config keys to canonical `s3.*` form.
-///
-/// 1. Strips known prefixes (`fs.s3a.`, `s3a.`, `s3.`) and remaps to `s3.*`.
-/// 2. Applies mirrored key mappings for cross-compatibility.
-/// 3. Earlier prefixes in the list take lower priority (later ones overwrite).
-fn normalize_config(props: HashMap<String, String>) -> HashMap<String, String> 
{
-    let mut result = HashMap::new();
-
-    // First pass: normalize prefixes. Process in priority order —
-    // `fs.s3a.` (lowest) → `s3a.` → `s3.` (highest, canonical).
-    for prefix in JAVA_CONFIG_PREFIXES {
-        for (key, value) in &props {
-            if let Some(suffix) = key.strip_prefix(prefix) {
-                let canonical = format!("s3.{suffix}");
-                result.insert(canonical, value.clone());
-            }
-        }
-    }
-
-    // Second pass: apply mirrored keys bidirectionally (only if target not 
already set).
-    let mirrored_additions: Vec<(String, String)> = MIRRORED_KEYS
-        .iter()
-        .flat_map(|(a, b)| {
-            let mut pairs = Vec::new();
-            // a → b
-            if !result.contains_key(*b) {
-                if let Some(v) = result.get(*a) {
-                    pairs.push((b.to_string(), v.clone()));
-                }
-            }
-            // b → a
-            if !result.contains_key(*a) {
-                if let Some(v) = result.get(*b) {
-                    pairs.push((a.to_string(), v.clone()));
-                }
-            }
-            pairs
-        })
-        .collect();
-
-    for (k, v) in mirrored_additions {
-        result.insert(k, v);
-    }
-
-    result
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -353,6 +304,7 @@ mod tests {
     }
 
     #[test]
+    #[allow(clippy::field_reassign_with_default)]
     fn test_s3_config_build_extracts_bucket() {
         let mut cfg = S3Config::default();
         cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string());
@@ -363,6 +315,7 @@ mod tests {
     }
 
     #[test]
+    #[allow(clippy::field_reassign_with_default)]
     fn test_s3_config_build_s3a_scheme() {
         let mut cfg = S3Config::default();
         cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string());
@@ -390,7 +343,8 @@ mod tests {
     fn test_mirrored_keys() {
         // `s3.access.key` (dot form) should be mirrored from `s3.access-key` 
(dash form)
         let props = make_props(&[("s3.access-key", "AKID")]);
-        let normalized = normalize_config(props);
+        let normalized =
+            normalize_storage_config(props, JAVA_CONFIG_PREFIXES, "s3.", 
MIRRORED_KEYS);
         assert_eq!(
             normalized.get("s3.access.key").map(|s| s.as_str()),
             Some("AKID")
diff --git a/docs/src/architecture.md b/docs/src/architecture.md
index f12950e..e47fb75 100644
--- a/docs/src/architecture.md
+++ b/docs/src/architecture.md
@@ -33,7 +33,7 @@ The core crate implements the Paimon table format, including:
 - **Table** — Table abstraction for reading Paimon tables
 - **Snapshot & Manifest** — Reading snapshot and manifest metadata
 - **Schema** — Table schema management and evolution
-- **File IO** — Abstraction layer for storage backends (local filesystem, S3)
+- **File IO** — Abstraction layer for storage backends (local filesystem, 
object stores, HDFS)
 - **File Format** — Parquet file reading and writing via Apache Arrow
 
 ### `crates/integrations/datafusion` — DataFusion Integration
diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md
index 0b6d88a..90ec311 100644
--- a/docs/src/getting-started.md
+++ b/docs/src/getting-started.md
@@ -44,6 +44,11 @@ Available storage features:
 | `storage-memory` | In-memory        |
 | `storage-s3`     | Amazon S3        |
 | `storage-oss`    | Alibaba Cloud OSS|
+| `storage-cos`    | Tencent Cloud COS|
+| `storage-azdls`  | Azure Data Lake Storage Gen2 |
+| `storage-obs`    | Huawei Cloud OBS |
+| `storage-gcs`    | Google Cloud Storage |
+| `storage-hdfs`   | HDFS             |
 | `storage-all`    | All of the above |
 
 ## Catalog Management
@@ -78,6 +83,37 @@ options.set("fs.oss.accessKeySecret", 
"your-access-key-secret");
 options.set("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
 let catalog = CatalogFactory::create(options).await?;
 
+// Tencent Cloud COS
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, "cosn://bucket/warehouse");
+options.set("fs.cosn.userinfo.secretId", "your-secret-id");
+options.set("fs.cosn.userinfo.secretKey", "your-secret-key");
+options.set("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com";);
+let catalog = CatalogFactory::create(options).await?;
+
+// Azure Data Lake Storage Gen2
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, 
"abfs://[email protected]/warehouse");
+options.set("azure.account-key", "your-account-key");
+let catalog = CatalogFactory::create(options).await?;
+
+// If you use the short form "abfs://filesystem/warehouse", set the endpoint 
explicitly:
+// options.set("azure.endpoint", "https://account.dfs.core.windows.net";);
+
+// Huawei Cloud OBS
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, "obs://bucket/warehouse");
+options.set("fs.obs.access.key", "your-access-key-id");
+options.set("fs.obs.secret.key", "your-secret-access-key");
+options.set("fs.obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com";);
+let catalog = CatalogFactory::create(options).await?;
+
+// Google Cloud Storage
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, "gs://bucket/warehouse");
+options.set("gcs.credential-path", "/path/to/service-account.json");
+let catalog = CatalogFactory::create(options).await?;
+
 // REST catalog
 let mut options = Options::new();
 options.set(CatalogOptions::METASTORE, "rest");
diff --git a/docs/src/index.md b/docs/src/index.md
index b9c1472..252c9dd 100644
--- a/docs/src/index.md
+++ b/docs/src/index.md
@@ -28,7 +28,7 @@ Apache Paimon Rust provides native Rust libraries for reading 
and writing Paimon
 Key features:
 
 - Native Rust reader for Paimon table format
-- Support for local filesystem, S3, and OSS storage backends
+- Support for local filesystem, S3, OSS, COS, Azure, OBS, GCS, and HDFS 
storage backends
 - REST Catalog integration
 - Apache DataFusion integration for SQL queries
 

Reply via email to