This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new eae0761 feature: add more object storage backends (#346)
eae0761 is described below
commit eae07614ea6f5e578bf5974214afdca703f8b496
Author: Jiwen liu <[email protected]>
AuthorDate: Tue Jun 2 09:22:54 2026 +0800
feature: add more object storage backends (#346)
---
crates/paimon/Cargo.toml | 16 +-
.../paimon/src/catalog/rest/rest_token_file_io.rs | 2 +-
crates/paimon/src/io/file_io.rs | 156 +++++++-
crates/paimon/src/io/mod.rs | 29 ++
crates/paimon/src/io/storage.rs | 239 +++++++++---
crates/paimon/src/io/storage_azdls.rs | 405 +++++++++++++++++++++
crates/paimon/src/io/storage_config.rs | 60 +++
crates/paimon/src/io/storage_cos.rs | 136 +++++++
crates/paimon/src/io/storage_gcs.rs | 201 ++++++++++
crates/paimon/src/io/storage_obs.rs | 133 +++++++
crates/paimon/src/io/storage_s3.rs | 76 +---
docs/src/architecture.md | 2 +-
docs/src/getting-started.md | 36 ++
docs/src/index.md | 2 +-
14 files changed, 1369 insertions(+), 124 deletions(-)
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 184c25e..eeae73a 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -30,7 +30,17 @@ version.workspace = true
[features]
default = ["storage-memory", "storage-fs", "storage-oss"]
-storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3",
"storage-hdfs"]
+storage-all = [
+ "storage-memory",
+ "storage-fs",
+ "storage-oss",
+ "storage-s3",
+ "storage-cos",
+ "storage-azdls",
+ "storage-obs",
+ "storage-gcs",
+ "storage-hdfs",
+]
fulltext = ["tantivy", "tempfile"]
vortex = ["dep:vortex", "dep:kanal"]
@@ -38,6 +48,10 @@ storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-oss = ["opendal/services-oss"]
storage-s3 = ["opendal/services-s3"]
+storage-cos = ["opendal/services-cos"]
+storage-azdls = ["opendal/services-azdls"]
+storage-obs = ["opendal/services-obs"]
+storage-gcs = ["opendal/services-gcs"]
storage-hdfs = ["opendal/services-hdfs-native"]
[dependencies]
diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs
b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
index 6233eb1..502a0d9 100644
--- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs
+++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs
@@ -29,7 +29,6 @@ use crate::api::rest_api::RESTApi;
use crate::api::rest_util::RESTUtil;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
-use crate::io::storage_oss::OSS_ENDPOINT;
use crate::io::FileIO;
use crate::Result;
@@ -37,6 +36,7 @@ use super::rest_token::RESTToken;
/// Safe time margin (in milliseconds) before token expiration to trigger
refresh.
const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
+const OSS_ENDPOINT: &str = "fs.oss.endpoint";
/// A FileIO wrapper that supports getting data access tokens from a REST
Server.
///
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 7e78004..2144ed7 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -148,7 +148,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
- path: format!("{base_path}{entry_path}"),
+ path: status_path(base_path, entry_path),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
@@ -186,7 +186,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: false,
- path: format!("{base_path}{entry_path}"),
+ path: status_path(base_path, entry_path),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
@@ -280,6 +280,14 @@ impl FileIO {
}
}
+fn status_path(base_path: &str, entry_path: &str) -> String {
+ if base_path.ends_with('/') || entry_path.starts_with('/') {
+ format!("{base_path}{entry_path}")
+ } else {
+ format!("{base_path}/{entry_path}")
+ }
+}
+
fn looks_like_windows_drive_path(path: &str) -> bool {
let bytes = path.as_bytes();
bytes.len() >= 3
@@ -708,6 +716,150 @@ mod file_action_test {
}
}
+#[cfg(all(
+ test,
+ any(
+ feature = "storage-cos",
+ feature = "storage-obs",
+ feature = "storage-gcs",
+ feature = "storage-azdls"
+ )
+))]
+mod object_storage_path_test {
+ use super::*;
+
+ fn assert_relative_paths(file_io: &FileIO, path: &str,
expected_relative_path: &str) {
+ let input = file_io.new_input(path).unwrap();
+ assert_eq!(input.location(), path);
+ assert_eq!(
+ &input.path[input.relative_path_pos..],
+ expected_relative_path
+ );
+
+ let output = file_io.new_output(path).unwrap();
+ assert_eq!(output.location(), path);
+ assert_eq!(
+ &output.path[output.relative_path_pos..],
+ expected_relative_path
+ );
+
+ let (_op, relative_path) = file_io.storage.create(path).unwrap();
+ assert_eq!(relative_path, expected_relative_path);
+
+ let base_path = &path[..path.len() - relative_path.len()];
+ assert_eq!(format!("{base_path}{relative_path}"), path);
+ }
+
+ #[cfg(feature = "storage-azdls")]
+ #[test]
+ fn test_azdls_root_status_path_without_trailing_slash() {
+ assert_eq!(
+ status_path(
+ "abfs://[email protected]",
+ "warehouse/"
+ ),
+ "abfs://[email protected]/warehouse/"
+ );
+ assert_eq!(
+ status_path(
+ "abfs://[email protected]/",
+ "warehouse/"
+ ),
+ "abfs://[email protected]/warehouse/"
+ );
+ }
+
+ #[cfg(feature = "storage-cos")]
+ #[test]
+ fn test_cos_file_io_relative_paths_and_scheme_aliases() {
+ for scheme in ["cosn", "cos"] {
+ let path =
format!("{scheme}://bucket/warehouse/table/data.parquet");
+ let dir_path = format!("{scheme}://bucket/warehouse/table/");
+ let file_io = FileIO::from_path(&path)
+ .unwrap()
+ .with_props([
+ ("fs.cosn.endpoint",
"https://cos.ap-shanghai.myqcloud.com"),
+ ("fs.cosn.userinfo.secretId", "secret-id"),
+ ("fs.cosn.userinfo.secretKey", "secret-key"),
+ ("fs.cosn.disable-config-load", "true"),
+ ])
+ .build()
+ .unwrap();
+
+ assert_relative_paths(&file_io, &path,
"warehouse/table/data.parquet");
+ assert_relative_paths(&file_io, &dir_path, "warehouse/table/");
+ }
+ }
+
+ #[cfg(feature = "storage-obs")]
+ #[test]
+ fn test_obs_file_io_relative_paths() {
+ let file_io = FileIO::from_path("obs://bucket/warehouse")
+ .unwrap()
+ .with_props([
+ (
+ "fs.obs.endpoint",
+ "https://obs.cn-north-4.myhuaweicloud.com",
+ ),
+ ("fs.obs.access.key", "access-key"),
+ ("fs.obs.secret.key", "secret-key"),
+ ])
+ .build()
+ .unwrap();
+
+ assert_relative_paths(
+ &file_io,
+ "obs://bucket/warehouse/table/data.parquet",
+ "warehouse/table/data.parquet",
+ );
+ assert_relative_paths(
+ &file_io,
+ "obs://bucket/warehouse/table/",
+ "warehouse/table/",
+ );
+ }
+
+ #[cfg(feature = "storage-gcs")]
+ #[test]
+ fn test_gcs_file_io_relative_paths_and_scheme_aliases() {
+ for scheme in ["gs", "gcs"] {
+ let path =
format!("{scheme}://bucket/warehouse/table/data.parquet");
+ let dir_path = format!("{scheme}://bucket/warehouse/table/");
+ let file_io = FileIO::from_path(&path)
+ .unwrap()
+ .with_props([
+ ("gcs.allow-anonymous", "true"),
+ ("gcs.disable-config-load", "true"),
+ ("gcs.disable-vm-metadata", "true"),
+ ])
+ .build()
+ .unwrap();
+
+ assert_relative_paths(&file_io, &path,
"warehouse/table/data.parquet");
+ assert_relative_paths(&file_io, &dir_path, "warehouse/table/");
+ }
+ }
+
+ #[cfg(feature = "storage-azdls")]
+ #[test]
+ fn test_azdls_file_io_relative_paths_and_scheme_aliases() {
+ for scheme in ["abfs", "abfss"] {
+ let path = format!(
+
"{scheme}://[email protected]/warehouse/data.parquet"
+ );
+ let dir_path =
format!("{scheme}://[email protected]/warehouse/");
+ let file_io = FileIO::from_path(&path)
+ .unwrap()
+ .with_prop("azure.account-key", "account-key")
+ .build()
+ .unwrap();
+
+ assert_relative_paths(&file_io, &path, "warehouse/data.parquet");
+ assert_relative_paths(&file_io, &dir_path, "warehouse/");
+ }
+ }
+}
+
#[cfg(test)]
mod input_output_test {
use super::*;
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index 7e49c3c..31f7c0d 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -21,6 +21,15 @@ pub use file_io::*;
mod storage;
pub use storage::*;
+#[cfg(any(
+ feature = "storage-s3",
+ feature = "storage-cos",
+ feature = "storage-azdls",
+ feature = "storage-obs",
+ feature = "storage-gcs"
+))]
+mod storage_config;
+
#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-fs")]
@@ -41,6 +50,26 @@ mod storage_s3;
#[cfg(feature = "storage-s3")]
use storage_s3::*;
+#[cfg(feature = "storage-cos")]
+mod storage_cos;
+#[cfg(feature = "storage-cos")]
+use storage_cos::*;
+
+#[cfg(feature = "storage-azdls")]
+mod storage_azdls;
+#[cfg(feature = "storage-azdls")]
+use storage_azdls::*;
+
+#[cfg(feature = "storage-obs")]
+mod storage_obs;
+#[cfg(feature = "storage-obs")]
+use storage_obs::*;
+
+#[cfg(feature = "storage-gcs")]
+mod storage_gcs;
+#[cfg(feature = "storage-gcs")]
+use storage_gcs::*;
+
#[cfg(feature = "storage-hdfs")]
mod storage_hdfs;
#[cfg(feature = "storage-hdfs")]
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
index a57fcfc..59d2740 100644
--- a/crates/paimon/src/io/storage.rs
+++ b/crates/paimon/src/io/storage.rs
@@ -17,22 +17,47 @@
use std::collections::HashMap;
#[cfg(any(
+ feature = "storage-azdls",
+ feature = "storage-cos",
+ feature = "storage-gcs",
feature = "storage-oss",
+ feature = "storage-obs",
feature = "storage-s3",
feature = "storage-hdfs"
))]
use std::sync::Mutex;
-#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+#[cfg(any(
+ feature = "storage-azdls",
+ feature = "storage-cos",
+ feature = "storage-gcs",
+ feature = "storage-oss",
+ feature = "storage-obs",
+ feature = "storage-s3"
+))]
use std::sync::MutexGuard;
+#[cfg(feature = "storage-azdls")]
+use super::AzdlsStorageConfig;
+#[cfg(feature = "storage-cos")]
+use opendal::services::CosConfig;
+#[cfg(feature = "storage-gcs")]
+use opendal::services::GcsConfig;
#[cfg(feature = "storage-hdfs")]
use opendal::services::HdfsNativeConfig;
+#[cfg(feature = "storage-obs")]
+use opendal::services::ObsConfig;
#[cfg(feature = "storage-oss")]
use opendal::services::OssConfig;
#[cfg(feature = "storage-s3")]
use opendal::services::S3Config;
use opendal::{Operator, Scheme};
-#[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+#[cfg(any(
+ feature = "storage-cos",
+ feature = "storage-gcs",
+ feature = "storage-oss",
+ feature = "storage-obs",
+ feature = "storage-s3"
+))]
use url::Url;
use crate::error;
@@ -56,6 +81,26 @@ pub enum Storage {
config: Box<S3Config>,
operators: Mutex<HashMap<String, Operator>>,
},
+ #[cfg(feature = "storage-cos")]
+ Cos {
+ config: Box<CosConfig>,
+ operators: Mutex<HashMap<String, Operator>>,
+ },
+ #[cfg(feature = "storage-azdls")]
+ Azdls {
+ config: Box<AzdlsStorageConfig>,
+ operators: Mutex<HashMap<String, Operator>>,
+ },
+ #[cfg(feature = "storage-obs")]
+ Obs {
+ config: Box<ObsConfig>,
+ operators: Mutex<HashMap<String, Operator>>,
+ },
+ #[cfg(feature = "storage-gcs")]
+ Gcs {
+ config: Box<GcsConfig>,
+ operators: Mutex<HashMap<String, Operator>>,
+ },
#[cfg(feature = "storage-hdfs")]
Hdfs {
config: Box<HdfsNativeConfig>,
@@ -93,6 +138,38 @@ impl Storage {
operators: Mutex::new(HashMap::new()),
})
}
+ #[cfg(feature = "storage-cos")]
+ Scheme::Cos => {
+ let config = super::cos_config_parse(props)?;
+ Ok(Self::Cos {
+ config: Box::new(config),
+ operators: Mutex::new(HashMap::new()),
+ })
+ }
+ #[cfg(feature = "storage-azdls")]
+ Scheme::Azdls => {
+ let config = super::azdls_config_parse(props)?;
+ Ok(Self::Azdls {
+ config: Box::new(config),
+ operators: Mutex::new(HashMap::new()),
+ })
+ }
+ #[cfg(feature = "storage-obs")]
+ Scheme::Obs => {
+ let config = super::obs_config_parse(props)?;
+ Ok(Self::Obs {
+ config: Box::new(config),
+ operators: Mutex::new(HashMap::new()),
+ })
+ }
+ #[cfg(feature = "storage-gcs")]
+ Scheme::Gcs => {
+ let config = super::gcs_config_parse(props)?;
+ Ok(Self::Gcs {
+ config: Box::new(config),
+ operators: Mutex::new(HashMap::new()),
+ })
+ }
#[cfg(feature = "storage-hdfs")]
Scheme::HdfsNative => {
let config = super::hdfs_config_parse(props)?;
@@ -115,16 +192,54 @@ impl Storage {
Storage::LocalFs { op } => Ok((op.clone(),
Self::fs_relative_path(path)?)),
#[cfg(feature = "storage-oss")]
Storage::Oss { config, operators } => {
- let (bucket, relative_path) =
Self::oss_bucket_and_relative_path(path)?;
+ let (bucket, relative_path) =
+ Self::bucket_and_relative_path(path, "OSS", &["oss"])?;
let op = Self::cached_oss_operator(config, operators, path,
&bucket)?;
Ok((op, relative_path))
}
#[cfg(feature = "storage-s3")]
Storage::S3 { config, operators } => {
- let (bucket, relative_path) =
Self::s3_bucket_and_relative_path(path)?;
+ let (bucket, relative_path) =
+ Self::bucket_and_relative_path(path, "S3", &["s3",
"s3a"])?;
let op = Self::cached_s3_operator(config, operators, path,
&bucket)?;
Ok((op, relative_path))
}
+ #[cfg(feature = "storage-cos")]
+ Storage::Cos { config, operators } => {
+ let (bucket, relative_path) =
+ Self::bucket_and_relative_path(path, "COS", &["cos",
"cosn"])?;
+ let op = Self::cached_operator(operators, "COS", &bucket, || {
+ super::cos_config_build(config, path)
+ })?;
+ Ok((op, relative_path))
+ }
+ #[cfg(feature = "storage-azdls")]
+ Storage::Azdls { config, operators } => {
+ let relative_path = super::azdls_relative_path(path)?;
+ let cache_key = super::azdls_operator_cache_key(config, path)?;
+ let op = Self::cached_operator(operators, "Azure", &cache_key,
|| {
+ super::azdls_config_build(config, path)
+ })?;
+ Ok((op, relative_path))
+ }
+ #[cfg(feature = "storage-obs")]
+ Storage::Obs { config, operators } => {
+ let (bucket, relative_path) =
+ Self::bucket_and_relative_path(path, "OBS", &["obs"])?;
+ let op = Self::cached_operator(operators, "OBS", &bucket, || {
+ super::obs_config_build(config, path)
+ })?;
+ Ok((op, relative_path))
+ }
+ #[cfg(feature = "storage-gcs")]
+ Storage::Gcs { config, operators } => {
+ let (bucket, relative_path) =
+ Self::bucket_and_relative_path(path, "GCS", &["gcs",
"gs"])?;
+ let op = Self::cached_operator(operators, "GCS", &bucket, || {
+ super::gcs_config_build(config, path)
+ })?;
+ Ok((op, relative_path))
+ }
#[cfg(feature = "storage-hdfs")]
Storage::Hdfs { config, op } => {
let relative_path = super::hdfs_relative_path(path)?;
@@ -166,59 +281,52 @@ impl Storage {
}
}
- #[cfg(feature = "storage-oss")]
- fn oss_bucket_and_relative_path(path: &str) -> crate::Result<(String,
&str)> {
- let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
- message: format!("Invalid OSS url: {path}"),
- })?;
- let bucket = url
- .host_str()
- .ok_or_else(|| error::Error::ConfigInvalid {
- message: format!("Invalid OSS url: {path}, missing bucket"),
- })?
- .to_string();
- let prefix = format!("oss://{bucket}/");
- let relative_path =
- path.strip_prefix(&prefix)
- .ok_or_else(|| error::Error::ConfigInvalid {
- message: format!("Invalid OSS url: {path}, should start
with {prefix}"),
- })?;
- Ok((bucket, relative_path))
- }
-
- #[cfg(feature = "storage-s3")]
- fn s3_bucket_and_relative_path(path: &str) -> crate::Result<(String,
&str)> {
+ #[cfg(any(
+ feature = "storage-cos",
+ feature = "storage-gcs",
+ feature = "storage-obs",
+ feature = "storage-oss",
+ feature = "storage-s3"
+ ))]
+ fn bucket_and_relative_path<'a>(
+ path: &'a str,
+ storage_name: &str,
+ allowed_schemes: &[&str],
+ ) -> crate::Result<(String, &'a str)> {
let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid {
- message: format!("Invalid S3 url: {path}"),
+ message: format!("Invalid {storage_name} url: {path}"),
})?;
let bucket = url
.host_str()
.ok_or_else(|| error::Error::ConfigInvalid {
- message: format!("Invalid S3 url: {path}, missing bucket"),
+ message: format!("Invalid {storage_name} url: {path}, missing
bucket"),
})?
.to_string();
let scheme = url.scheme();
- let prefix = match scheme {
- "s3" | "s3a" => format!("{scheme}://{bucket}/"),
- _ => {
- return Err(error::Error::ConfigInvalid {
- message: format!(
- "Invalid S3 url: {path}, should start with
s3://{bucket}/ or s3a://{bucket}/"
- ),
- });
- }
- };
+ if !allowed_schemes.contains(&scheme) {
+ return Err(error::Error::ConfigInvalid {
+ message: format!("Invalid {storage_name} url: {path},
unsupported scheme {scheme}"),
+ });
+ }
+ let prefix = format!("{scheme}://{bucket}/");
let relative_path =
path.strip_prefix(&prefix)
.ok_or_else(|| error::Error::ConfigInvalid {
message: format!(
- "Invalid S3 url: {path}, should start with s3://{bucket}/
or s3a://{bucket}/"
- ),
+ "Invalid {storage_name} url: {path}, should start with
{prefix}"
+ ),
})?;
Ok((bucket, relative_path))
}
- #[cfg(any(feature = "storage-oss", feature = "storage-s3"))]
+ #[cfg(any(
+ feature = "storage-azdls",
+ feature = "storage-cos",
+ feature = "storage-gcs",
+ feature = "storage-oss",
+ feature = "storage-obs",
+ feature = "storage-s3"
+ ))]
fn lock_operator_cache<'a>(
operators: &'a Mutex<HashMap<String, Operator>>,
storage_name: &str,
@@ -229,6 +337,30 @@ impl Storage {
})
}
+ #[cfg(any(
+ feature = "storage-azdls",
+ feature = "storage-cos",
+ feature = "storage-gcs",
+ feature = "storage-oss",
+ feature = "storage-obs",
+ feature = "storage-s3"
+ ))]
+ fn cached_operator(
+ operators: &Mutex<HashMap<String, Operator>>,
+ storage_name: &str,
+ cache_key: &str,
+ build: impl FnOnce() -> crate::Result<Operator>,
+ ) -> crate::Result<Operator> {
+ let mut operators = Self::lock_operator_cache(operators,
storage_name)?;
+ if let Some(op) = operators.get(cache_key) {
+ return Ok(op.clone());
+ }
+
+ let op = build()?;
+ operators.insert(cache_key.to_string(), op.clone());
+ Ok(op)
+ }
+
#[cfg(feature = "storage-oss")]
fn cached_oss_operator(
config: &OssConfig,
@@ -236,14 +368,9 @@ impl Storage {
path: &str,
bucket: &str,
) -> crate::Result<Operator> {
- let mut operators = Self::lock_operator_cache(operators, "OSS")?;
- if let Some(op) = operators.get(bucket) {
- return Ok(op.clone());
- }
-
- let op = super::oss_config_build(config, path)?;
- operators.insert(bucket.to_string(), op.clone());
- Ok(op)
+ Self::cached_operator(operators, "OSS", bucket, || {
+ super::oss_config_build(config, path)
+ })
}
#[cfg(feature = "storage-s3")]
@@ -253,14 +380,9 @@ impl Storage {
path: &str,
bucket: &str,
) -> crate::Result<Operator> {
- let mut operators = Self::lock_operator_cache(operators, "S3")?;
- if let Some(op) = operators.get(bucket) {
- return Ok(op.clone());
- }
-
- let op = super::s3_config_build(config, path)?;
- operators.insert(bucket.to_string(), op.clone());
- Ok(op)
+ Self::cached_operator(operators, "S3", bucket, || {
+ super::s3_config_build(config, path)
+ })
}
fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
@@ -268,6 +390,9 @@ impl Storage {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
+ "cosn" => Ok(Scheme::Cos),
+ "abfs" | "abfss" | "az" | "azure" => Ok(Scheme::Azdls),
+ "gs" => Ok(Scheme::Gcs),
"hdfs" => Ok(Scheme::HdfsNative),
s => Ok(s.parse::<Scheme>()?),
}
diff --git a/crates/paimon/src/io/storage_azdls.rs
b/crates/paimon/src/io/storage_azdls.rs
new file mode 100644
index 0000000..8a71cdd
--- /dev/null
+++ b/crates/paimon/src/io/storage_azdls.rs
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::AzdlsConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const AZURE_ENDPOINT: &str = "azure.endpoint";
+const AZURE_ACCOUNT_NAME: &str = "azure.account-name";
+const AZURE_ACCOUNT_KEY: &str = "azure.account-key";
+const AZURE_SAS_TOKEN: &str = "azure.sas-token";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.azure.", "fs.abfs.", "abfs.", "abfss.",
"azure."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+ ("azure.account-name", "azure.account.name"),
+ ("azure.account_name", "azure.account.name"),
+ ("azure.account-key", "azure.account.key"),
+ ("azure.account_key", "azure.account.key"),
+ ("azure.sas-token", "azure.sas.token"),
+ ("azure.sas_token", "azure.sas.token"),
+ ("azure.client-id", "azure.client.id"),
+ ("azure.client_id", "azure.client.id"),
+ ("azure.client-secret", "azure.client.secret"),
+ ("azure.client_secret", "azure.client.secret"),
+ ("azure.tenant-id", "azure.tenant.id"),
+ ("azure.tenant_id", "azure.tenant.id"),
+ ("azure.authority-host", "azure.authority.host"),
+ ("azure.authority_host", "azure.authority.host"),
+];
+
+#[derive(Debug, Clone)]
+pub struct AzdlsStorageConfig {
+ config: AzdlsConfig,
+ normalized: HashMap<String, String>,
+}
+
+pub(crate) fn azdls_config_parse(props: HashMap<String, String>) ->
Result<AzdlsStorageConfig> {
+ let normalized = normalize_storage_config(props, CONFIG_PREFIXES,
"azure.", MIRRORED_KEYS);
+ let config = config_from_normalized(&normalized);
+
+ Ok(AzdlsStorageConfig { config, normalized })
+}
+
+pub(crate) fn azdls_config_build(cfg: &AzdlsStorageConfig, path: &str) ->
Result<Operator> {
+ let (cfg, relative_path) = azdls_config_for_path(cfg, path)?;
+
+ let builder = cfg.into_builder();
+ let op = Operator::new(builder)?.finish();
+
+ debug_assert_eq!(
+ relative_path,
+ azdls_relative_path(path).unwrap_or(relative_path)
+ );
+ Ok(op)
+}
+
+pub(crate) fn azdls_operator_cache_key(cfg: &AzdlsStorageConfig, path: &str)
-> Result<String> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}"),
+ })?;
+ let filesystem = if cfg.config.filesystem.is_empty() {
+ filesystem_from_url(&url, path)?
+ } else {
+ cfg.config.filesystem.clone()
+ };
+ let endpoint = effective_endpoint(&cfg.config, &url)?;
+
+ Ok(format!("{}|{}", endpoint.trim_end_matches('/'), filesystem))
+}
+
+fn azdls_config_for_path<'a>(
+ storage_cfg: &AzdlsStorageConfig,
+ path: &'a str,
+) -> Result<(AzdlsConfig, &'a str)> {
+ let (filesystem, relative_path) =
azdls_filesystem_and_relative_path(path)?;
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}"),
+ })?;
+
+ let mut cfg = storage_cfg.config.clone();
+ if cfg.filesystem.is_empty() {
+ cfg.filesystem = filesystem;
+ }
+
+ let endpoint = effective_endpoint(&cfg, &url)?;
+ apply_account_scoped_config(&mut cfg, &storage_cfg.normalized, &endpoint);
+ cfg.endpoint = Some(endpoint);
+ cfg.root = Some("/".to_string());
+
+ Ok((cfg, relative_path))
+}
+
+fn config_from_normalized(normalized: &HashMap<String, String>) -> AzdlsConfig
{
+ AzdlsConfig {
+ endpoint: normalized.get(AZURE_ENDPOINT).cloned(),
+ account_name: normalized.get(AZURE_ACCOUNT_NAME).cloned(),
+ account_key: normalized.get(AZURE_ACCOUNT_KEY).cloned(),
+ sas_token: normalized.get(AZURE_SAS_TOKEN).cloned(),
+ client_id: normalized.get("azure.client-id").cloned(),
+ client_secret: normalized.get("azure.client-secret").cloned(),
+ tenant_id: normalized.get("azure.tenant-id").cloned(),
+ authority_host: normalized.get("azure.authority-host").cloned(),
+ ..Default::default()
+ }
+}
+
+fn effective_endpoint(cfg: &AzdlsConfig, url: &Url) -> Result<String> {
+ cfg.endpoint
+ .as_ref()
+ .map(|endpoint| endpoint.trim_end_matches('/').to_string())
+ .map(Ok)
+ .unwrap_or_else(|| default_endpoint(url))
+}
+
+pub(crate) fn azdls_filesystem_and_relative_path(path: &str) ->
Result<(String, &str)> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}"),
+ })?;
+
+ let filesystem = filesystem_from_url(&url, path)?;
+
+ Ok((filesystem, azdls_relative_path(path)?))
+}
+
+pub(crate) fn azdls_relative_path(path: &str) -> Result<&str> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}"),
+ })?;
+
+ let path_start = path
+ .find("://")
+ .map(|pos| pos + 3)
+ .ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}"),
+ })?;
+ let after_scheme = &path[path_start..];
+ let path_start = after_scheme.find('/').map(|pos| path_start + pos + 1);
+ let url_path = path_start.map(|pos| &path[pos..]).unwrap_or("");
+
+ if !url.username().is_empty()
+ || !url
+ .host_str()
+ .ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}, missing
filesystem"),
+ })?
+ .contains('.')
+ {
+ Ok(url_path)
+ } else {
+ let (_filesystem, relative_path) =
url_path.split_once('/').unwrap_or((url_path, ""));
+ Ok(relative_path)
+ }
+}
+
+fn filesystem_from_url(url: &Url, path: &str) -> Result<String> {
+ if !url.username().is_empty() {
+ return Ok(url.username().to_string());
+ }
+
+ let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}, missing filesystem"),
+ })?;
+
+ if !host.contains('.') {
+ return Ok(host.to_string());
+ }
+
+ url.path()
+ .strip_prefix('/')
+ .unwrap_or(url.path())
+ .split('/')
+ .next()
+ .filter(|v| !v.is_empty())
+ .ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {path}, missing filesystem"),
+ })
+ .map(|v| v.to_string())
+}
+
+fn default_endpoint(url: &Url) -> Result<String> {
+ if !url.username().is_empty() {
+ let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {url}, missing account host"),
+ })?;
+ return Ok(format!("https://{host}"));
+ }
+
+ let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid Azure url: {url}, missing account"),
+ })?;
+
+ if host.contains('.') {
+ Ok(format!("https://{host}"))
+ } else {
+ Err(Error::ConfigInvalid {
+ message: format!(
+ "Invalid Azure url: {url}, missing account host; set
azure.endpoint for {host}"
+ ),
+ })
+ }
+}
+
+fn apply_account_scoped_config(
+ cfg: &mut AzdlsConfig,
+ normalized: &HashMap<String, String>,
+ endpoint: &str,
+) {
+ let Some(host) = endpoint_host(endpoint) else {
+ return;
+ };
+ let account = host.split('.').next().unwrap_or(host.as_str());
+
+ if cfg.account_key.is_none() {
+ cfg.account_key = first_scoped_value(
+ normalized,
+ &[
+ "azure.account.key",
+ "azure.account-key",
+ "azure.account_key",
+ ],
+ &[host.as_str(), account],
+ );
+ }
+
+ if cfg.sas_token.is_none() {
+ cfg.sas_token = first_scoped_value(
+ normalized,
+ &[
+ "azure.sas.token",
+ "azure.sas-token",
+ "azure.sas_token",
+ "azure.sas.fixed.token",
+ "azure.fixed.sas.token",
+ ],
+ &[host.as_str(), account],
+ );
+ }
+}
+
+fn endpoint_host(endpoint: &str) -> Option<String> {
+ Url::parse(endpoint)
+ .ok()
+ .and_then(|url| url.host_str().map(|host| host.to_string()))
+}
+
+fn first_scoped_value(
+ normalized: &HashMap<String, String>,
+ prefixes: &[&str],
+ suffixes: &[&str],
+) -> Option<String> {
+ prefixes.iter().find_map(|prefix| {
+ suffixes
+ .iter()
+ .find_map(|suffix|
normalized.get(&format!("{prefix}.{suffix}")).cloned())
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+ pairs
+ .iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect()
+ }
+
+ #[test]
+ fn test_azdls_config_parse_keys() {
+ let props = make_props(&[
+ ("fs.azure.account.key", "key"),
+ ("fs.azure.sas.token", "sas"),
+ ("azure.endpoint", "https://account.dfs.core.windows.net"),
+ ]);
+
+ let cfg = azdls_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.config.endpoint.as_deref(),
+ Some("https://account.dfs.core.windows.net")
+ );
+ assert_eq!(cfg.config.account_key.as_deref(), Some("key"));
+ assert_eq!(cfg.config.sas_token.as_deref(), Some("sas"));
+ }
+
+ #[test]
+ fn test_azdls_config_parse_aliases() {
+ let props = make_props(&[
+ ("azure.account-name", "account"),
+ ("azure.client-secret", "secret"),
+ ("azure.tenant-id", "tenant"),
+ ]);
+
+ let cfg = azdls_config_parse(props).unwrap();
+ assert_eq!(cfg.config.account_name.as_deref(), Some("account"));
+ assert_eq!(cfg.config.client_secret.as_deref(), Some("secret"));
+ assert_eq!(cfg.config.tenant_id.as_deref(), Some("tenant"));
+ }
+
+ #[test]
+ fn test_azdls_config_uses_account_scoped_hadoop_key() {
+ let cfg = azdls_config_parse(make_props(&[(
+ "fs.azure.account.key.account.dfs.core.windows.net",
+ "account-key",
+ )]))
+ .unwrap();
+
+ let (cfg, _) =
+ azdls_config_for_path(&cfg,
"abfs://[email protected]/path/to/file")
+ .unwrap();
+
+ assert_eq!(cfg.account_key.as_deref(), Some("account-key"));
+ }
+
+ #[test]
+ fn test_azdls_path_hadoop_authority_form() {
+ let (filesystem, relative_path) =
+
azdls_filesystem_and_relative_path("abfs://[email protected]/a/b")
+ .unwrap();
+ assert_eq!(filesystem, "fs");
+ assert_eq!(relative_path, "a/b");
+ }
+
+ #[test]
+ fn test_azdls_path_fsspec_form() {
+ let (filesystem, relative_path) =
+ azdls_filesystem_and_relative_path("abfs://fs/a/b").unwrap();
+ assert_eq!(filesystem, "fs");
+ assert_eq!(relative_path, "a/b");
+ }
+
+ #[test]
+ fn test_azdls_config_build_hadoop_form() {
+ let cfg = azdls_config_parse(HashMap::new()).unwrap();
+
+ let op = azdls_config_build(&cfg,
"abfs://[email protected]/a/b").unwrap();
+ assert_eq!(op.info().name(), "fs");
+ }
+
+ #[test]
+ fn test_azdls_config_build_fsspec_form_requires_endpoint() {
+ let cfg = azdls_config_parse(HashMap::new()).unwrap();
+ let result = azdls_config_build(&cfg, "abfs://fs/a/b");
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_azdls_config_build_fsspec_form_with_endpoint() {
+ let cfg = azdls_config_parse(make_props(&[(
+ "azure.endpoint",
+ "https://account.dfs.core.windows.net",
+ )]))
+ .unwrap();
+
+ let op = azdls_config_build(&cfg, "abfs://fs/a/b").unwrap();
+ assert_eq!(op.info().name(), "fs");
+ }
+
+ #[test]
+ fn test_azdls_cache_key_includes_account_host() {
+ let cfg = azdls_config_parse(HashMap::new()).unwrap();
+
+ let account_a = azdls_operator_cache_key(
+ &cfg,
+ "abfs://[email protected]/path/to/file",
+ )
+ .unwrap();
+ let account_b = azdls_operator_cache_key(
+ &cfg,
+ "abfs://[email protected]/path/to/file",
+ )
+ .unwrap();
+
+ assert_ne!(account_a, account_b);
+ assert_eq!(account_a, "https://account-a.dfs.core.windows.net|fs");
+ }
+
+ #[test]
+ fn test_azdls_config_build_missing_filesystem() {
+ let cfg = azdls_config_parse(HashMap::new()).unwrap();
+ let result = azdls_config_build(&cfg,
"abfs:///path/without/filesystem");
+ assert!(result.is_err());
+ }
+}
diff --git a/crates/paimon/src/io/storage_config.rs
b/crates/paimon/src/io/storage_config.rs
new file mode 100644
index 0000000..90206db
--- /dev/null
+++ b/crates/paimon/src/io/storage_config.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+pub(super) fn normalize_storage_config(
+ props: HashMap<String, String>,
+ config_prefixes: &[&str],
+ canonical_prefix: &str,
+ mirrored_keys: &[(&str, &str)],
+) -> HashMap<String, String> {
+ let mut result = HashMap::new();
+
+ for prefix in config_prefixes {
+ for (key, value) in &props {
+ if let Some(suffix) = key.strip_prefix(prefix) {
+ result.insert(format!("{canonical_prefix}{suffix}"),
value.clone());
+ }
+ }
+ }
+
+ let mirrored_additions: Vec<(String, String)> = mirrored_keys
+ .iter()
+ .flat_map(|(a, b)| {
+ let mut pairs = Vec::new();
+
+ if !result.contains_key(*b) {
+ if let Some(v) = result.get(*a) {
+ pairs.push((b.to_string(), v.clone()));
+ }
+ }
+ if !result.contains_key(*a) {
+ if let Some(v) = result.get(*b) {
+ pairs.push((a.to_string(), v.clone()));
+ }
+ }
+ pairs
+ })
+ .collect();
+
+ for (k, v) in mirrored_additions {
+ result.insert(k, v);
+ }
+
+ result
+}
diff --git a/crates/paimon/src/io/storage_cos.rs
b/crates/paimon/src/io/storage_cos.rs
new file mode 100644
index 0000000..b8bb3ea
--- /dev/null
+++ b/crates/paimon/src/io/storage_cos.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::CosConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const COS_ENDPOINT: &str = "fs.cosn.endpoint";
+const COS_SECRET_ID: &str = "fs.cosn.userinfo.secretId";
+const COS_SECRET_KEY: &str = "fs.cosn.userinfo.secretKey";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.cosn.", "cosn.", "cos."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+ ("fs.cosn.endpoint", "fs.cosn.userinfo.endpoint"),
+ ("fs.cosn.secret_id", "fs.cosn.userinfo.secretId"),
+ ("fs.cosn.secret-id", "fs.cosn.userinfo.secretId"),
+ ("fs.cosn.secret_key", "fs.cosn.userinfo.secretKey"),
+ ("fs.cosn.secret-key", "fs.cosn.userinfo.secretKey"),
+];
+
+pub(crate) fn cos_config_parse(props: HashMap<String, String>) ->
Result<CosConfig> {
+ let normalized = normalize_storage_config(props, CONFIG_PREFIXES,
"fs.cosn.", MIRRORED_KEYS);
+
+ let cfg = CosConfig {
+ endpoint: normalized.get(COS_ENDPOINT).cloned(),
+ secret_id: normalized.get(COS_SECRET_ID).cloned(),
+ secret_key: normalized.get(COS_SECRET_KEY).cloned(),
+ enable_versioning: normalized
+ .get("fs.cosn.enable-versioning")
+ .is_some_and(|v| v.eq_ignore_ascii_case("true")),
+ disable_config_load: normalized
+ .get("fs.cosn.disable-config-load")
+ .is_some_and(|v| v.eq_ignore_ascii_case("true")),
+ ..Default::default()
+ };
+
+ Ok(cfg)
+}
+
+pub(crate) fn cos_config_build(cfg: &CosConfig, path: &str) ->
Result<Operator> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid COS url: {path}"),
+ })?;
+
+ let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid COS url: {path}, missing bucket"),
+ })?;
+
+ let builder = cfg.clone().into_builder().bucket(bucket);
+ Ok(Operator::new(builder)?.finish())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+ pairs
+ .iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect()
+ }
+
+ #[test]
+ fn test_cos_config_parse_hadoop_keys() {
+ let props = make_props(&[
+ ("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"),
+ ("fs.cosn.userinfo.secretId", "sid"),
+ ("fs.cosn.userinfo.secretKey", "skey"),
+ ]);
+
+ let cfg = cos_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.endpoint.as_deref(),
+ Some("https://cos.ap-shanghai.myqcloud.com")
+ );
+ assert_eq!(cfg.secret_id.as_deref(), Some("sid"));
+ assert_eq!(cfg.secret_key.as_deref(), Some("skey"));
+ }
+
+ #[test]
+ fn test_cos_config_parse_canonical_aliases() {
+ let props = make_props(&[
+ ("cos.endpoint", "https://cos.ap-singapore.myqcloud.com"),
+ ("cos.secret-id", "sid"),
+ ("cos.secret-key", "skey"),
+ ]);
+
+ let cfg = cos_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.endpoint.as_deref(),
+ Some("https://cos.ap-singapore.myqcloud.com")
+ );
+ assert_eq!(cfg.secret_id.as_deref(), Some("sid"));
+ assert_eq!(cfg.secret_key.as_deref(), Some("skey"));
+ }
+
+ #[test]
+ fn test_cos_config_build_extracts_bucket() {
+ let cfg = CosConfig {
+ endpoint: Some("https://cos.ap-shanghai.myqcloud.com".to_string()),
+ ..Default::default()
+ };
+
+ let op = cos_config_build(&cfg, "cosn://my-bucket/some/path").unwrap();
+ assert_eq!(op.info().name(), "my-bucket");
+ }
+
+ #[test]
+ fn test_cos_config_build_missing_bucket() {
+ let cfg = CosConfig::default();
+ let result = cos_config_build(&cfg, "cosn:///path/without/bucket");
+ assert!(result.is_err());
+ }
+}
diff --git a/crates/paimon/src/io/storage_gcs.rs
b/crates/paimon/src/io/storage_gcs.rs
new file mode 100644
index 0000000..575dae3
--- /dev/null
+++ b/crates/paimon/src/io/storage_gcs.rs
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::GcsConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const GCS_ENDPOINT: &str = "gcs.endpoint";
+const GCS_CREDENTIAL: &str = "gcs.credential";
+const GCS_CREDENTIAL_PATH: &str = "gcs.credential-path";
+const GCS_SERVICE_ACCOUNT: &str = "gcs.service-account";
+const GCS_ALLOW_ANONYMOUS: &str = "gcs.allow-anonymous";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.gs.", "fs.gcs.", "gs.", "gcs."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+ ("gcs.credential-path", "gcs.google_application_credentials"),
+ ("gcs.credential-path", "gcs.google-application-credentials"),
+ ("gcs.credential-path", "gcs.application-credentials"),
+ ("gcs.credential", "gcs.google_service_account_key"),
+ ("gcs.credential", "gcs.google-service-account-key"),
+ ("gcs.credential", "gcs.service-account-key"),
+ ("gcs.credential", "gcs.service_account_key"),
+ ("gcs.service-account", "gcs.google_service_account"),
+ ("gcs.service-account", "gcs.google-service-account"),
+ ("gcs.service-account", "gcs.service_account"),
+ ("gcs.predefined-acl", "gcs.predefined_acl"),
+ ("gcs.default-storage-class", "gcs.default_storage_class"),
+ ("gcs.allow-anonymous", "gcs.google_skip_signature"),
+ ("gcs.allow-anonymous", "gcs.google-skip-signature"),
+ ("gcs.allow_anonymous", "gcs.google_skip_signature"),
+ ("gcs.allow-anonymous", "gcs.allow_anonymous"),
+ ("gcs.allow-anonymous", "gcs.skip-signature"),
+ ("gcs.allow-anonymous", "gcs.skip_signature"),
+ ("gcs.skip-signature", "gcs.google_skip_signature"),
+ ("gcs.skip_signature", "gcs.google_skip_signature"),
+ ("gcs.disable-vm-metadata", "gcs.disable_vm_metadata"),
+ ("gcs.disable-config-load", "gcs.disable_config_load"),
+];
+
+#[allow(clippy::field_reassign_with_default)]
+pub(crate) fn gcs_config_parse(props: HashMap<String, String>) ->
Result<GcsConfig> {
+ let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "gcs.",
MIRRORED_KEYS);
+
+ let mut cfg = GcsConfig::default();
+ cfg.endpoint = normalized.get(GCS_ENDPOINT).cloned();
+ cfg.credential = normalized.get(GCS_CREDENTIAL).cloned();
+ cfg.credential_path = normalized.get(GCS_CREDENTIAL_PATH).cloned();
+ cfg.service_account = normalized.get(GCS_SERVICE_ACCOUNT).cloned();
+ cfg.scope = normalized.get("gcs.scope").cloned();
+ cfg.predefined_acl = normalized.get("gcs.predefined-acl").cloned();
+ cfg.default_storage_class =
normalized.get("gcs.default-storage-class").cloned();
+ cfg.token = normalized.get("gcs.token").cloned();
+ cfg.allow_anonymous = normalized
+ .get(GCS_ALLOW_ANONYMOUS)
+ .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+ cfg.disable_vm_metadata = normalized
+ .get("gcs.disable-vm-metadata")
+ .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+ cfg.disable_config_load = normalized
+ .get("gcs.disable-config-load")
+ .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+
+ Ok(cfg)
+}
+
+pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) ->
Result<Operator> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid GCS url: {path}"),
+ })?;
+
+ let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid GCS url: {path}, missing bucket"),
+ })?;
+
+ let builder = cfg.clone().into_builder().bucket(bucket);
+ Ok(Operator::new(builder)?.finish())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+ pairs
+ .iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect()
+ }
+
+ #[test]
+ fn test_gcs_config_parse_keys() {
+ let props = make_props(&[
+ ("fs.gs.endpoint", "https://storage.googleapis.com"),
+ ("fs.gs.google_application_credentials", "/tmp/gcs.json"),
+ ("fs.gs.google_service_account_key", "credential-json"),
+ (
+ "fs.gs.google_service_account",
+ "[email protected]",
+ ),
+ ("fs.gs.predefined_acl", "bucketOwnerFullControl"),
+ ("fs.gs.default_storage_class", "NEARLINE"),
+ ]);
+
+ let cfg = gcs_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.endpoint.as_deref(),
+ Some("https://storage.googleapis.com")
+ );
+ assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json"));
+ assert_eq!(cfg.credential.as_deref(), Some("credential-json"));
+ assert_eq!(
+ cfg.service_account.as_deref(),
+ Some("[email protected]")
+ );
+ assert_eq!(
+ cfg.predefined_acl.as_deref(),
+ Some("bucketOwnerFullControl")
+ );
+ assert_eq!(cfg.default_storage_class.as_deref(), Some("NEARLINE"));
+ }
+
+ #[test]
+ fn test_gcs_config_parse_canonical_aliases() {
+ let props = make_props(&[
+ ("gcs.credential-path", "/tmp/gcs.json"),
+ ("gcs.allow-anonymous", "true"),
+ ]);
+
+ let cfg = gcs_config_parse(props).unwrap();
+ assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json"));
+ assert!(cfg.allow_anonymous);
+ }
+
+ #[test]
+ fn test_gcs_config_parse_opendal_aliases() {
+ let props = make_props(&[
+ (
+ "gcs.google_application_credentials",
+ "/tmp/opendal-gcs.json",
+ ),
+ ("gcs.google_service_account_key", "credential-json"),
+ (
+ "gcs.google_service_account",
+ "[email protected]",
+ ),
+ ("gcs.google_skip_signature", "true"),
+ ("gcs.disable_vm_metadata", "true"),
+ ("gcs.disable_config_load", "true"),
+ ]);
+
+ let cfg = gcs_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.credential_path.as_deref(),
+ Some("/tmp/opendal-gcs.json")
+ );
+ assert_eq!(cfg.credential.as_deref(), Some("credential-json"));
+ assert_eq!(
+ cfg.service_account.as_deref(),
+ Some("[email protected]")
+ );
+ assert!(cfg.allow_anonymous);
+ assert!(cfg.disable_vm_metadata);
+ assert!(cfg.disable_config_load);
+ }
+
+ #[test]
+ fn test_gcs_config_build_extracts_bucket() {
+ let cfg = GcsConfig::default();
+
+ let op = gcs_config_build(&cfg, "gs://my-bucket/some/path").unwrap();
+ assert_eq!(op.info().name(), "my-bucket");
+ }
+
+ #[test]
+ fn test_gcs_config_build_missing_bucket() {
+ let cfg = GcsConfig::default();
+ let result = gcs_config_build(&cfg, "gs:///path/without/bucket");
+ assert!(result.is_err());
+ }
+}
diff --git a/crates/paimon/src/io/storage_obs.rs
b/crates/paimon/src/io/storage_obs.rs
new file mode 100644
index 0000000..2fde849
--- /dev/null
+++ b/crates/paimon/src/io/storage_obs.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use opendal::services::ObsConfig;
+use opendal::{Configurator, Operator};
+use url::Url;
+
+use crate::error::Error;
+use crate::Result;
+
+use super::storage_config::normalize_storage_config;
+
+const OBS_ENDPOINT: &str = "fs.obs.endpoint";
+const OBS_ACCESS_KEY_ID: &str = "fs.obs.access.key";
+const OBS_SECRET_ACCESS_KEY: &str = "fs.obs.secret.key";
+
+const CONFIG_PREFIXES: &[&str] = &["fs.obs.", "obs."];
+const MIRRORED_KEYS: &[(&str, &str)] = &[
+ ("fs.obs.access-key-id", "fs.obs.access.key"),
+ ("fs.obs.access_key_id", "fs.obs.access.key"),
+ ("fs.obs.secret-access-key", "fs.obs.secret.key"),
+ ("fs.obs.secret_access_key", "fs.obs.secret.key"),
+];
+
+#[allow(clippy::field_reassign_with_default)]
+pub(crate) fn obs_config_parse(props: HashMap<String, String>) ->
Result<ObsConfig> {
+ let normalized = normalize_storage_config(props, CONFIG_PREFIXES,
"fs.obs.", MIRRORED_KEYS);
+
+ let mut cfg = ObsConfig::default();
+ cfg.endpoint = normalized.get(OBS_ENDPOINT).cloned();
+ cfg.access_key_id = normalized.get(OBS_ACCESS_KEY_ID).cloned();
+ cfg.secret_access_key = normalized.get(OBS_SECRET_ACCESS_KEY).cloned();
+ cfg.enable_versioning = normalized
+ .get("fs.obs.enable-versioning")
+ .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+
+ Ok(cfg)
+}
+
+pub(crate) fn obs_config_build(cfg: &ObsConfig, path: &str) ->
Result<Operator> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid OBS url: {path}"),
+ })?;
+
+ let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid {
+ message: format!("Invalid OBS url: {path}, missing bucket"),
+ })?;
+
+ let builder = cfg.clone().into_builder().bucket(bucket);
+ Ok(Operator::new(builder)?.finish())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
+ pairs
+ .iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect()
+ }
+
+ #[test]
+ fn test_obs_config_parse_hadoop_keys() {
+ let props = make_props(&[
+ (
+ "fs.obs.endpoint",
+ "https://obs.cn-north-4.myhuaweicloud.com",
+ ),
+ ("fs.obs.access.key", "ak"),
+ ("fs.obs.secret.key", "sk"),
+ ]);
+
+ let cfg = obs_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.endpoint.as_deref(),
+ Some("https://obs.cn-north-4.myhuaweicloud.com")
+ );
+ assert_eq!(cfg.access_key_id.as_deref(), Some("ak"));
+ assert_eq!(cfg.secret_access_key.as_deref(), Some("sk"));
+ }
+
+ #[test]
+ fn test_obs_config_parse_canonical_aliases() {
+ let props = make_props(&[
+ ("obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com"),
+ ("obs.access-key-id", "ak"),
+ ("obs.secret-access-key", "sk"),
+ ]);
+
+ let cfg = obs_config_parse(props).unwrap();
+ assert_eq!(
+ cfg.endpoint.as_deref(),
+ Some("https://obs.cn-north-4.myhuaweicloud.com")
+ );
+ assert_eq!(cfg.access_key_id.as_deref(), Some("ak"));
+ assert_eq!(cfg.secret_access_key.as_deref(), Some("sk"));
+ }
+
+ #[test]
+ #[allow(clippy::field_reassign_with_default)]
+ fn test_obs_config_build_extracts_bucket() {
+ let mut cfg = ObsConfig::default();
+ cfg.endpoint =
Some("https://obs.cn-north-4.myhuaweicloud.com".to_string());
+
+ let op = obs_config_build(&cfg, "obs://my-bucket/some/path").unwrap();
+ assert_eq!(op.info().name(), "my-bucket");
+ }
+
+ #[test]
+ fn test_obs_config_build_missing_bucket() {
+ let cfg = ObsConfig::default();
+ let result = obs_config_build(&cfg, "obs:///path/without/bucket");
+ assert!(result.is_err());
+ }
+}
diff --git a/crates/paimon/src/io/storage_s3.rs
b/crates/paimon/src/io/storage_s3.rs
index 57b77c7..6fe20b8 100644
--- a/crates/paimon/src/io/storage_s3.rs
+++ b/crates/paimon/src/io/storage_s3.rs
@@ -24,6 +24,8 @@ use url::Url;
use crate::error::Error;
use crate::Result;
+use super::storage_config::normalize_storage_config;
+
/// Configuration key for S3 endpoint.
///
/// Compatible with paimon-java's `s3.endpoint` / `fs.s3a.endpoint`.
@@ -79,14 +81,18 @@ const MIRRORED_KEYS: &[(&str, &str)] = &[
/// By default, virtual-hosted style addressing is enabled (matching AWS
/// and Java Paimon behavior). Set `s3.path-style-access=true` to switch
/// to path-style for S3-compatible stores like MinIO.
+#[allow(clippy::field_reassign_with_default)]
pub(crate) fn s3_config_parse(props: HashMap<String, String>) ->
Result<S3Config> {
- let normalized = normalize_config(props);
+ let normalized = normalize_storage_config(props, JAVA_CONFIG_PREFIXES,
"s3.", MIRRORED_KEYS);
let mut cfg = S3Config::default();
// Default to virtual-hosted style, matching AWS and Java Paimon.
// Only disable when path-style-access is explicitly set to true.
- cfg.enable_virtual_host_style = true;
+ let path_style_access = normalized
+ .get(S3_PATH_STYLE_ACCESS)
+ .is_some_and(|v| v.eq_ignore_ascii_case("true"));
+ cfg.enable_virtual_host_style = !path_style_access;
// Core connection settings.
cfg.endpoint = normalized.get(S3_ENDPOINT).cloned();
@@ -94,12 +100,6 @@ pub(crate) fn s3_config_parse(props: HashMap<String,
String>) -> Result<S3Config
cfg.secret_access_key = normalized.get(S3_SECRET_KEY).cloned();
cfg.region = normalized.get(S3_REGION).cloned();
- if let Some(v) = normalized.get(S3_PATH_STYLE_ACCESS) {
- if v.eq_ignore_ascii_case("true") {
- cfg.enable_virtual_host_style = false;
- }
- }
-
// Session / assume-role credentials.
cfg.session_token = normalized.get("s3.session.token").cloned();
cfg.role_arn = normalized.get("s3.assumed.role.arn").cloned();
@@ -107,11 +107,9 @@ pub(crate) fn s3_config_parse(props: HashMap<String,
String>) -> Result<S3Config
cfg.role_session_name =
normalized.get("s3.assumed.role.session.name").cloned();
// Anonymous access.
- if let Some(v) = normalized.get("s3.anonymous") {
- if v.eq_ignore_ascii_case("true") {
- cfg.allow_anonymous = true;
- }
- }
+ cfg.allow_anonymous = normalized
+ .get("s3.anonymous")
+ .is_some_and(|v| v.eq_ignore_ascii_case("true"));
// Server-side encryption.
cfg.server_side_encryption = normalized.get("s3.sse.type").cloned();
@@ -144,53 +142,6 @@ pub(crate) fn s3_config_build(cfg: &S3Config, path: &str)
-> Result<Operator> {
Ok(Operator::new(builder)?.finish())
}
-/// Normalize Java-compatible config keys to canonical `s3.*` form.
-///
-/// 1. Strips known prefixes (`fs.s3a.`, `s3a.`, `s3.`) and remaps to `s3.*`.
-/// 2. Applies mirrored key mappings for cross-compatibility.
-/// 3. Earlier prefixes in the list take lower priority (later ones overwrite).
-fn normalize_config(props: HashMap<String, String>) -> HashMap<String, String>
{
- let mut result = HashMap::new();
-
- // First pass: normalize prefixes. Process in priority order —
- // `fs.s3a.` (lowest) → `s3a.` → `s3.` (highest, canonical).
- for prefix in JAVA_CONFIG_PREFIXES {
- for (key, value) in &props {
- if let Some(suffix) = key.strip_prefix(prefix) {
- let canonical = format!("s3.{suffix}");
- result.insert(canonical, value.clone());
- }
- }
- }
-
- // Second pass: apply mirrored keys bidirectionally (only if target not
already set).
- let mirrored_additions: Vec<(String, String)> = MIRRORED_KEYS
- .iter()
- .flat_map(|(a, b)| {
- let mut pairs = Vec::new();
- // a → b
- if !result.contains_key(*b) {
- if let Some(v) = result.get(*a) {
- pairs.push((b.to_string(), v.clone()));
- }
- }
- // b → a
- if !result.contains_key(*a) {
- if let Some(v) = result.get(*b) {
- pairs.push((a.to_string(), v.clone()));
- }
- }
- pairs
- })
- .collect();
-
- for (k, v) in mirrored_additions {
- result.insert(k, v);
- }
-
- result
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -353,6 +304,7 @@ mod tests {
}
#[test]
+ #[allow(clippy::field_reassign_with_default)]
fn test_s3_config_build_extracts_bucket() {
let mut cfg = S3Config::default();
cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string());
@@ -363,6 +315,7 @@ mod tests {
}
#[test]
+ #[allow(clippy::field_reassign_with_default)]
fn test_s3_config_build_s3a_scheme() {
let mut cfg = S3Config::default();
cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string());
@@ -390,7 +343,8 @@ mod tests {
fn test_mirrored_keys() {
// `s3.access.key` (dot form) should be mirrored from `s3.access-key`
(dash form)
let props = make_props(&[("s3.access-key", "AKID")]);
- let normalized = normalize_config(props);
+ let normalized =
+ normalize_storage_config(props, JAVA_CONFIG_PREFIXES, "s3.",
MIRRORED_KEYS);
assert_eq!(
normalized.get("s3.access.key").map(|s| s.as_str()),
Some("AKID")
diff --git a/docs/src/architecture.md b/docs/src/architecture.md
index f12950e..e47fb75 100644
--- a/docs/src/architecture.md
+++ b/docs/src/architecture.md
@@ -33,7 +33,7 @@ The core crate implements the Paimon table format, including:
- **Table** — Table abstraction for reading Paimon tables
- **Snapshot & Manifest** — Reading snapshot and manifest metadata
- **Schema** — Table schema management and evolution
-- **File IO** — Abstraction layer for storage backends (local filesystem, S3)
+- **File IO** — Abstraction layer for storage backends (local filesystem,
object stores, HDFS)
- **File Format** — Parquet file reading and writing via Apache Arrow
### `crates/integrations/datafusion` — DataFusion Integration
diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md
index 0b6d88a..90ec311 100644
--- a/docs/src/getting-started.md
+++ b/docs/src/getting-started.md
@@ -44,6 +44,11 @@ Available storage features:
| `storage-memory` | In-memory |
| `storage-s3` | Amazon S3 |
| `storage-oss` | Alibaba Cloud OSS|
+| `storage-cos` | Tencent Cloud COS|
+| `storage-azdls` | Azure Data Lake Storage Gen2 |
+| `storage-obs` | Huawei Cloud OBS |
+| `storage-gcs` | Google Cloud Storage |
+| `storage-hdfs` | HDFS |
| `storage-all` | All of the above |
## Catalog Management
@@ -78,6 +83,37 @@ options.set("fs.oss.accessKeySecret",
"your-access-key-secret");
options.set("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
let catalog = CatalogFactory::create(options).await?;
+// Tencent Cloud COS
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, "cosn://bucket/warehouse");
+options.set("fs.cosn.userinfo.secretId", "your-secret-id");
+options.set("fs.cosn.userinfo.secretKey", "your-secret-key");
+options.set("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com");
+let catalog = CatalogFactory::create(options).await?;
+
+// Azure Data Lake Storage Gen2
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE,
"abfs://[email protected]/warehouse");
+options.set("azure.account-key", "your-account-key");
+let catalog = CatalogFactory::create(options).await?;
+
+// If you use the short form "abfs://filesystem/warehouse", set the endpoint
explicitly:
+// options.set("azure.endpoint", "https://account.dfs.core.windows.net");
+
+// Huawei Cloud OBS
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, "obs://bucket/warehouse");
+options.set("fs.obs.access.key", "your-access-key-id");
+options.set("fs.obs.secret.key", "your-secret-access-key");
+options.set("fs.obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com");
+let catalog = CatalogFactory::create(options).await?;
+
+// Google Cloud Storage
+let mut options = Options::new();
+options.set(CatalogOptions::WAREHOUSE, "gs://bucket/warehouse");
+options.set("gcs.credential-path", "/path/to/service-account.json");
+let catalog = CatalogFactory::create(options).await?;
+
// REST catalog
let mut options = Options::new();
options.set(CatalogOptions::METASTORE, "rest");
diff --git a/docs/src/index.md b/docs/src/index.md
index b9c1472..252c9dd 100644
--- a/docs/src/index.md
+++ b/docs/src/index.md
@@ -28,7 +28,7 @@ Apache Paimon Rust provides native Rust libraries for reading
and writing Paimon
Key features:
- Native Rust reader for Paimon table format
-- Support for local filesystem, S3, and OSS storage backends
+- Support for local filesystem, S3, OSS, COS, Azure, OBS, GCS, and HDFS
storage backends
- REST Catalog integration
- Apache DataFusion integration for SQL queries