This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2a43943b3 feat(io): Add Storage, StorageFactory, StorageConfig (#2061)
2a43943b3 is described below
commit 2a43943b3a8c252679ab86c77c2adfa1d41cd0a1
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Jan 26 17:57:50 2026 -0800
feat(io): Add Storage, StorageFactory, StorageConfig (#2061)
## Which issue does this PR close?
- Closes #2053 #2054
- Related to #2055
## What changes are included in this PR?
- `Storage`, `StorageFactory` trait, and `StorageConfig`
- They are not used anywhere for now.
## Are these changes tested?
Added some simple uts for `StorageConfig`
---
Cargo.lock | 73 +++++++++--
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 1 +
crates/iceberg/src/io/config/mod.rs | 237 ++++++++++++++++++++++++++++++++++++
crates/iceberg/src/io/mod.rs | 3 +
crates/iceberg/src/io/storage.rs | 129 ++++++++++++++++++--
6 files changed, 425 insertions(+), 19 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 3eba261ba..003a52a38 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2497,7 +2497,7 @@ dependencies = [
"libc",
"option-ext",
"redox_users",
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -2632,6 +2632,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
+[[package]]
+name = "erased-serde"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3"
+dependencies = [
+ "serde",
+ "serde_core",
+ "typeid",
+]
+
[[package]]
name = "errno"
version = "0.3.14"
@@ -2639,7 +2650,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -3312,7 +3323,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
- "socket2 0.5.10",
+ "socket2 0.6.1",
"tokio",
"tower-service",
"tracing",
@@ -3398,6 +3409,7 @@ dependencies = [
"tempfile",
"tokio",
"typed-builder",
+ "typetag",
"url",
"uuid",
"zstd",
@@ -3787,6 +3799,15 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "inventory"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e"
+dependencies = [
+ "rustversion",
+]
+
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -3845,7 +3866,7 @@ dependencies = [
"portable-atomic",
"portable-atomic-util",
"serde_core",
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -4366,7 +4387,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -5166,7 +5187,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls 0.23.35",
- "socket2 0.5.10",
+ "socket2 0.6.1",
"thiserror 2.0.17",
"tokio",
"tracing",
@@ -5203,9 +5224,9 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
- "socket2 0.5.10",
+ "socket2 0.6.1",
"tracing",
- "windows-sys 0.59.0",
+ "windows-sys 0.60.2",
]
[[package]]
@@ -5697,7 +5718,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys",
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -6699,7 +6720,7 @@ dependencies = [
"getrandom 0.3.4",
"once_cell",
"rustix",
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -7111,12 +7132,42 @@ dependencies = [
"syn 2.0.111",
]
+[[package]]
+name = "typeid"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
+
[[package]]
name = "typenum"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
+[[package]]
+name = "typetag"
+version = "0.2.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be2212c8a9b9bcfca32024de14998494cf9a5dfa59ea1b829de98bac374b86bf"
+dependencies = [
+ "erased-serde",
+ "inventory",
+ "once_cell",
+ "serde",
+ "typetag-impl",
+]
+
+[[package]]
+name = "typetag-impl"
+version = "0.2.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "27a7a9b72ba121f6f1f6c3632b85604cac41aedb5ddc70accbebb6cac83de846"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.111",
+]
+
[[package]]
name = "typify"
version = "0.5.0"
@@ -7514,7 +7565,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.61.2",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 517bfa36e..148a45449 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -128,6 +128,7 @@ toml = "0.8"
tracing = "0.1.41"
tracing-subscriber = "0.3.20"
typed-builder = "0.20"
+typetag = "0.2"
url = "2.5.7"
uuid = { version = "1.18", features = ["v7"] }
volo = "0.10.6"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 6f1332a44..9a5de7736 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -86,6 +86,7 @@ serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
tokio = { workspace = true, optional = false, features = ["sync"] }
typed-builder = { workspace = true }
+typetag = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
diff --git a/crates/iceberg/src/io/config/mod.rs
b/crates/iceberg/src/io/config/mod.rs
new file mode 100644
index 000000000..648e8baf4
--- /dev/null
+++ b/crates/iceberg/src/io/config/mod.rs
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// TODO Add specific configs
+//! Storage configuration for storage backends.
+//!
+//! This module provides configuration types for various storage backends.
+//! The configuration types are designed to be used with the `StorageFactory`
+//! trait to create storage instances.
+//!
+//! # Available Configurations
+//!
+//! - [`StorageConfig`]: Base configuration containing properties for storage
backends
+//! - [`S3Config`]: Amazon S3 specific configuration
+//! - [`GcsConfig`]: Google Cloud Storage specific configuration
+//! - [`OssConfig`]: Alibaba Cloud OSS specific configuration
+//! - [`AzdlsConfig`]: Azure Data Lake Storage specific configuration
+
+// TODO Add specific configs
+// mod azdls;
+// mod gcs;
+// mod oss;
+// mod s3;
+
+use std::collections::HashMap;
+
+// TODO Add specific configs
+// pub use azdls::*;
+// pub use gcs::*;
+// pub use oss::*;
+// pub use s3::*;
+use serde::{Deserialize, Serialize};
+
+/// Configuration properties for storage backends.
+///
+/// This struct contains only configuration properties without specifying
+/// which storage backend to use. The storage type is determined by the
+/// explicit factory selection.
+/// ```
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
+pub struct StorageConfig {
+ /// Configuration properties for the storage backend
+ props: HashMap<String, String>,
+}
+
+impl StorageConfig {
+ /// Create a new empty StorageConfig.
+ pub fn new() -> Self {
+ Self {
+ props: HashMap::new(),
+ }
+ }
+
+ /// Create a StorageConfig from existing properties.
+ ///
+ /// # Arguments
+ ///
+ /// * `props` - Configuration properties for the storage backend
+ pub fn from_props(props: HashMap<String, String>) -> Self {
+ Self { props }
+ }
+
+ /// Get all configuration properties.
+ pub fn props(&self) -> &HashMap<String, String> {
+ &self.props
+ }
+
+ /// Get a specific configuration property by key.
+ ///
+ /// # Arguments
+ ///
+ /// * `key` - The property key to look up
+ ///
+ /// # Returns
+ ///
+ /// An `Option` containing a reference to the property value if it exists.
+ pub fn get(&self, key: &str) -> Option<&String> {
+ self.props.get(key)
+ }
+
+ /// Add a configuration property.
+ ///
+ /// This is a builder-style method that returns `self` for chaining.
+ ///
+ /// # Arguments
+ ///
+ /// * `key` - The property key
+ /// * `value` - The property value
+ pub fn with_prop(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
+ self.props.insert(key.into(), value.into());
+ self
+ }
+
+ /// Add multiple configuration properties.
+ ///
+ /// This is a builder-style method that returns `self` for chaining.
+ ///
+ /// # Arguments
+ ///
+ /// * `props` - An iterator of key-value pairs to add
+ pub fn with_props(
+ mut self,
+ props: impl IntoIterator<Item = (impl Into<String>, impl
Into<String>)>,
+ ) -> Self {
+ self.props
+ .extend(props.into_iter().map(|(k, v)| (k.into(), v.into())));
+ self
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_storage_config_new() {
+ let config = StorageConfig::new();
+
+ assert!(config.props().is_empty());
+ }
+
+ #[test]
+ fn test_storage_config_from_props() {
+ let props = HashMap::from([
+ ("region".to_string(), "us-east-1".to_string()),
+ ("bucket".to_string(), "my-bucket".to_string()),
+ ]);
+ let config = StorageConfig::from_props(props.clone());
+
+ assert_eq!(config.props(), &props);
+ }
+
+ #[test]
+ fn test_storage_config_default() {
+ let config = StorageConfig::default();
+
+ assert!(config.props().is_empty());
+ }
+
+ #[test]
+ fn test_storage_config_get() {
+ let config = StorageConfig::new().with_prop("region", "us-east-1");
+
+ assert_eq!(config.get("region"), Some(&"us-east-1".to_string()));
+ assert_eq!(config.get("nonexistent"), None);
+ }
+
+ #[test]
+ fn test_storage_config_with_prop() {
+ let config = StorageConfig::new()
+ .with_prop("region", "us-east-1")
+ .with_prop("bucket", "my-bucket");
+
+ assert_eq!(config.get("region"), Some(&"us-east-1".to_string()));
+ assert_eq!(config.get("bucket"), Some(&"my-bucket".to_string()));
+ }
+
+ #[test]
+ fn test_storage_config_with_props() {
+ let additional_props = vec![("key1", "value1"), ("key2", "value2")];
+ let config = StorageConfig::new().with_props(additional_props);
+
+ assert_eq!(config.get("key1"), Some(&"value1".to_string()));
+ assert_eq!(config.get("key2"), Some(&"value2".to_string()));
+ }
+
+ #[test]
+ fn test_storage_config_clone() {
+ let config = StorageConfig::new().with_prop("region", "us-east-1");
+ let cloned = config.clone();
+
+ assert_eq!(config, cloned);
+ assert_eq!(cloned.get("region"), Some(&"us-east-1".to_string()));
+ }
+
+ #[test]
+ fn test_storage_config_serialization_roundtrip() {
+ let config = StorageConfig::new()
+ .with_prop("region", "us-east-1")
+ .with_prop("bucket", "my-bucket");
+
+ let serialized = serde_json::to_string(&config).unwrap();
+ let deserialized: StorageConfig =
serde_json::from_str(&serialized).unwrap();
+
+ assert_eq!(config, deserialized);
+ }
+
+ #[test]
+ fn test_storage_config_clone_independence() {
+ let original = StorageConfig::new().with_prop("region", "us-east-1");
+ let mut cloned = original.clone();
+
+ // Modify the clone
+ cloned = cloned.with_prop("region", "eu-west-1");
+ cloned = cloned.with_prop("new_key", "new_value");
+
+ // Original should be unchanged
+ assert_eq!(original.get("region"), Some(&"us-east-1".to_string()));
+ assert_eq!(original.get("new_key"), None);
+
+ // Clone should have the new values
+ assert_eq!(cloned.get("region"), Some(&"eu-west-1".to_string()));
+ assert_eq!(cloned.get("new_key"), Some(&"new_value".to_string()));
+ }
+
+ #[test]
+ fn test_storage_config_from_props_empty() {
+ let config = StorageConfig::from_props(HashMap::new());
+
+ assert!(config.props().is_empty());
+ }
+
+ #[test]
+ fn test_storage_config_serialization_empty() {
+ let config = StorageConfig::new();
+
+ let serialized = serde_json::to_string(&config).unwrap();
+ let deserialized: StorageConfig =
serde_json::from_str(&serialized).unwrap();
+
+ assert_eq!(config, deserialized);
+ assert!(deserialized.props().is_empty());
+ }
+}
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 5eb596434..55f2f262c 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -66,6 +66,7 @@
//! - `new_input`: Create input file for reading.
//! - `new_output`: Create output file for writing.
+mod config;
mod file_io;
mod storage;
@@ -85,6 +86,8 @@ mod storage_oss;
#[cfg(feature = "storage-s3")]
mod storage_s3;
+pub use config::*;
+pub use storage::{Storage, StorageFactory};
#[cfg(feature = "storage-azdls")]
pub use storage_azdls::*;
#[cfg(feature = "storage-fs")]
diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs
index 32a95eb1d..77978e223 100644
--- a/crates/iceberg/src/io/storage.rs
+++ b/crates/iceberg/src/io/storage.rs
@@ -15,14 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#[cfg(any(
- feature = "storage-s3",
- feature = "storage-gcs",
- feature = "storage-oss",
- feature = "storage-azdls",
-))]
+use std::fmt::Debug;
use std::sync::Arc;
+use async_trait::async_trait;
+use bytes::Bytes;
use opendal::layers::RetryLayer;
#[cfg(feature = "storage-azdls")]
use opendal::services::AzdlsConfig;
@@ -36,10 +33,126 @@ use opendal::{Operator, Scheme};
#[cfg(feature = "storage-azdls")]
use super::AzureStorageScheme;
-use super::FileIOBuilder;
+use super::{
+ FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile,
StorageConfig,
+};
#[cfg(feature = "storage-s3")]
use crate::io::CustomAwsCredentialLoader;
-use crate::{Error, ErrorKind};
+use crate::{Error, ErrorKind, Result};
+
+/// Trait for storage operations in Iceberg.
+///
+/// The trait supports serialization via `typetag`, allowing storage instances
to be
+/// serialized and deserialized across process boundaries.
+///
+/// Third-party implementations can implement this trait to provide custom
storage backends.
+///
+/// # Implementing Custom Storage
+///
+/// To implement a custom storage backend:
+///
+/// 1. Create a struct that implements this trait
+/// 2. Add `#[typetag::serde]` attribute for serialization support
+/// 3. Implement all required methods
+///
+/// # Example
+///
+/// ```rust,ignore
+/// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+/// struct MyStorage {
+/// // custom fields
+/// }
+///
+/// #[async_trait]
+/// #[typetag::serde]
+/// impl Storage for MyStorage {
+/// async fn exists(&self, path: &str) -> Result<bool> {
+/// // implementation
+/// todo!()
+/// }
+/// // ... implement other methods
+/// }
+///
+/// TODO remove below when the trait is integrated with FileIO and Catalog
+/// # NOTE
+/// This trait is under heavy development and is not used anywhere as of now
+/// Please DO NOT implement it
+/// ```
+#[async_trait]
+#[typetag::serde(tag = "type")]
+pub trait Storage: Debug + Send + Sync {
+ /// Check if a file exists at the given path
+ async fn exists(&self, path: &str) -> Result<bool>;
+
+ /// Get metadata from an input path
+ async fn metadata(&self, path: &str) -> Result<FileMetadata>;
+
+ /// Read bytes from a path
+ async fn read(&self, path: &str) -> Result<Bytes>;
+
+ /// Get FileRead from a path
+ async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>>;
+
+ /// Write bytes to an output path
+ async fn write(&self, path: &str, bs: Bytes) -> Result<()>;
+
+ /// Get FileWrite from a path
+ async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>>;
+
+ /// Delete a file at the given path
+ async fn delete(&self, path: &str) -> Result<()>;
+
+ /// Delete all files with the given prefix
+ async fn delete_prefix(&self, path: &str) -> Result<()>;
+
+ /// Create a new input file for reading
+ fn new_input(&self, path: &str) -> Result<InputFile>;
+
+ /// Create a new output file for writing
+ fn new_output(&self, path: &str) -> Result<OutputFile>;
+}
+
+/// Factory for creating Storage instances from configuration.
+///
+/// Implement this trait to provide custom storage backends. The factory
pattern
+/// allows for lazy initialization of storage instances and enables users to
+/// inject custom storage implementations into catalogs.
+///
+/// # Example
+///
+/// ```rust,ignore
+/// #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+/// struct MyCustomStorageFactory {
+/// // custom configuration
+/// }
+///
+/// #[typetag::serde]
+/// impl StorageFactory for MyCustomStorageFactory {
+/// fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+/// // Create and return custom storage implementation
+/// todo!()
+/// }
+/// }
+///
+/// TODO remove below when the trait is integrated with FileIO and Catalog
+/// # NOTE
+/// This trait is under heavy development and is not used anywhere as of now
+/// Please DO NOT implement it
+/// ```
+#[typetag::serde(tag = "type")]
+pub trait StorageFactory: Debug + Send + Sync {
+ /// Build a new Storage instance from the given configuration.
+ ///
+ /// # Arguments
+ ///
+ /// * `config` - The storage configuration containing scheme and properties
+ ///
+ /// # Returns
+ ///
+ /// A `Result` containing an `Arc<dyn Storage>` on success, or an error
+ /// if the storage could not be created.
+ fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>>;
+}
/// The storage carries all supported storage services in iceberg
#[derive(Debug)]