This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4efea04db rfc: Making Storage a Trait (#1885)
4efea04db is described below

commit 4efea04db1d91b9aa0cd2f57ef97b991cac1d40c
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Jan 15 16:54:14 2026 -0800

    rfc: Making Storage a Trait (#1885)
    
    ## Which issue does this PR close?
    
    - Related to
      - #1314
      - #172
    
    ## What changes are included in this PR?
    
    
    ## Are these changes tested?
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 docs/rfcs/0002_storage_trait.md | 862 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 862 insertions(+)

diff --git a/docs/rfcs/0002_storage_trait.md b/docs/rfcs/0002_storage_trait.md
new file mode 100644
index 000000000..b68141aec
--- /dev/null
+++ b/docs/rfcs/0002_storage_trait.md
@@ -0,0 +1,862 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+# Making Storage a Trait
+
+## Background
+
+### Existing Implementation
+
+The existing code implements storage functionality through a concrete 
`Storage` enum that handles different storage backends (S3, local filesystem, 
GCS, etc.). This implementation is tightly coupled with OpenDAL as the 
underlying storage layer. The `FileIO` struct wraps this `Storage` enum and 
provides a high-level API for file operations.
+
+```rust
+// Current: Concrete enum with variants for each backend
+pub(crate) enum Storage {
+    #[cfg(feature = "storage-memory")]
+    Memory(Operator),
+    #[cfg(feature = "storage-fs")]
+    LocalFs,
+    #[cfg(feature = "storage-s3")]
+    S3 {
+        configured_scheme: String,
+        config: Arc<S3Config>,
+        customized_credential_load: Option<CustomAwsCredentialLoader>,
+    },
+    #[cfg(feature = "storage-gcs")]
+    Gcs { config: Arc<GcsConfig> },
+    // ... other variants
+}
+```
+
+Current structure:
+
+- **FileIO:** Main interface for file operations, wraps `Arc<Storage>`
+- **Storage:** Enum with variants for different storage backends
+- **InputFile / OutputFile:** Concrete structs that hold an `Operator` and path
+
+### Problem Statement
+
+The original design has several limitations:
+
+- **Tight Coupling** – All storage logic depends on OpenDAL, limiting 
flexibility. Users cannot easily opt in for other storage implementations like 
`object_store`
+- **Customization Barriers** – Users cannot easily add custom behaviors or 
optimizations
+- **No Extensibility** – Adding new backends requires modifying the core enum 
in the `iceberg` crate
+
+As discussed in Issue #1314, making Storage a trait would allow pluggable 
storage and better integration with existing systems.
+
+---
+
+## High-Level Architecture
+
+The new design introduces a trait-based storage abstraction with a factory 
pattern for creating storage instances. This enables pluggable storage backends 
while maintaining a clean separation between the core Iceberg library and 
storage implementations.
+
+### Component Overview
+
+```
+┌─────────────────────────────────────────────────────────────────────────────┐
+│                              User Application                                
│
+└─────────────────────────────────────────────────────────────────────────────┘
+                                      │
+                                      ▼
+┌─────────────────────────────────────────────────────────────────────────────┐
+│                                 Catalog                                      
│
+│  ┌─────────────────────────────────────────────────────────────────────┐    │
+│  │  CatalogBuilder::with_storage_factory(storage_factory)              │    │
+│  │  - Accepts optional StorageFactory injection                        │    │
+│  │  - Falls back to LocalFsStorageFactory if not provided              │    │
+│  └─────────────────────────────────────────────────────────────────────┘    │
+└─────────────────────────────────────────────────────────────────────────────┘
+                                      │
+                                      ▼
+┌─────────────────────────────────────────────────────────────────────────────┐
+│                          FileIO / FileIOBuilder                              
│
+│  ┌─────────────────────────────────────────────────────────────────────┐    │
+│  │  FileIOBuilder:                                                     │    │
+│  │  - factory: Arc<dyn StorageFactory>                                 │    │
+│  │  - config: StorageConfig                                            │    │
+│  │  - Methods: new(), with_prop(), with_props(), config(), build()     │    │
+│  │                                                                     │    │
+│  │  FileIO:                                                            │    │
+│  │  - config: StorageConfig (properties only, no scheme)               │    │
+│  │  - factory: Arc<dyn StorageFactory>                                 │    │
+│  │  - storage: OnceCell<Arc<dyn Storage>> (lazy initialization)        │    │
+│  │  - Methods: new_with_memory(), new_with_fs(), into_builder()        │    │
+│  └─────────────────────────────────────────────────────────────────────┘    │
+│                                                                              
│
+│  Methods: new_input(), new_output(), delete(), exists(), delete_prefix()    │
+└─────────────────────────────────────────────────────────────────────────────┘
+                                      │
+                    ┌─────────────────┴─────────────────┐
+                    ▼                                   ▼
+┌───────────────────────────────┐     ┌───────────────────────────────────────┐
+│        StorageFactory         │     │              Storage                   
│
+│  (trait in iceberg crate)     │     │        (trait in iceberg crate)        
│
+│                               │     │                                        
│
+│  fn build(&self, config)      │────▶│  async fn exists(&self, path)          
│
+│     -> Arc<dyn Storage>       │     │  async fn read(&self, path)            
│
+│                               │     │  async fn write(&self, path, bytes)    
│
+│                               │     │  async fn delete(&self, path)          
│
+│                               │     │  fn new_input(&self, path)             
│
+│                               │     │  fn new_output(&self, path)            
│
+└───────────────────────────────┘     └───────────────────────────────────────┘
+            │                                           ▲
+            │                                           │
+            ▼                                           │
+┌─────────────────────────────────────────────────────────────────────────────┐
+│                        Storage Implementations                               
│
+│                                                                              
│
+│  ┌─────────────────────┐  ┌─────────────────────┐  ┌─────────────────────┐  │
+│  │   MemoryStorage     │  │   LocalFsStorage    │  │   OpenDalStorage    │  │
+│  │   (iceberg crate)   │  │   (iceberg crate)   │  │ (iceberg-storage-   │  │
+│  │                     │  │                     │  │      opendal)       │  │
+│  │  - In-memory HashMap│  │  - std::fs ops      │  │  - S3, GCS, Azure   │  │
+│  │  - For testing      │  │  - For local files  │  │  - OSS, filesystem  │  │
+│  └─────────────────────┘  └─────────────────────┘  └─────────────────────┘  │
+└─────────────────────────────────────────────────────────────────────────────┘
+```
+
+### Data Flow
+
+```
+┌──────────────────────────────────────────────────────────────────────────────┐
+│                           FileIO Creation Flow                               
 │
+└──────────────────────────────────────────────────────────────────────────────┘
+
+  User Code                 FileIOBuilder                  StorageFactory
+      │                          │                              │
+      │  FileIOBuilder::new()    │                              │
+      │─────────────────────────▶│                              │
+      │                          │                              │
+      │  .with_prop()            │                              │
+      │─────────────────────────▶│                              │
+      │                          │                              │
+      │  .build()                │                              │
+      │─────────────────────────▶│                              │
+      │                          │                              │
+      │◀─────────────────────────│                              │
+      │  FileIO                  │                              │
+      │                          │                              │
+      │  new_input(path)         │                              │
+      │─────────────────────────▶│                              │
+      │                          │  (lazy) factory.build()      │
+      │                          │─────────────────────────────▶│
+      │                          │                              │
+      │                          │◀─────────────────────────────│
+      │                          │  Arc<dyn Storage>            │
+      │                          │                              │
+      │◀─────────────────────────│                              │
+      │  InputFile               │                              │
+
+
+┌──────────────────────────────────────────────────────────────────────────────┐
+│                        Catalog with FileIO Injection                         
 │
+└──────────────────────────────────────────────────────────────────────────────┘
+
+  User Code                  CatalogBuilder                  Catalog
+      │                          │                              │
+      │  ::default()             │                              │
+      │─────────────────────────▶│                              │
+      │                          │                              │
+      │  .with_storage_factory(factory)  │                              │
+      │─────────────────────────────────▶│                              │
+      │                          │                              │
+      │  .load(name, props)      │                              │
+      │─────────────────────────▶│                              │
+      │                          │  new(config, Some(file_io))  │
+      │                          │─────────────────────────────▶│
+      │                          │                              │
+      │◀─────────────────────────│◀─────────────────────────────│
+      │  Catalog                 │                              │
+```
+
+### Crate Structure
+
+```
+crates/
+├── iceberg/                         # Core Iceberg functionality
+│   └── src/
+│       └── io/
+│           ├── mod.rs               # Re-exports
+│           ├── storage.rs           # Storage + StorageFactory traits
+│           ├── file_io.rs           # FileIO, InputFile, OutputFile
+│           ├── config/              # StorageConfig and backend configs
+│           │   ├── mod.rs           # StorageConfig
+│           │   ├── s3.rs            # S3Config constants
+│           │   ├── gcs.rs           # GcsConfig constants
+│           │   ├── oss.rs           # OssConfig constants
+│           │   └── azdls.rs         # AzdlsConfig constants
+│           ├── memory.rs            # MemoryStorage (built-in)
+│           └── local_fs.rs          # LocalFsStorage (built-in)
+│
+├── storage/
+│   └── opendal/                     # OpenDAL-based implementations
+│       └── src/
+│           ├── lib.rs               # Re-exports
+│           ├── storage.rs           # OpenDalStorage + OpenDalStorageFactory
+│           ├── storage_s3.rs        # S3 support
+│           ├── storage_gcs.rs       # GCS support
+│           ├── storage_oss.rs       # OSS support
+│           ├── storage_azdls.rs     # Azure support
+│           └── storage_fs.rs        # Filesystem support
+│
+└── catalog/                         # Catalog implementations
+    ├── rest/                        # Uses with_storage_factory injection
+    ├── glue/                        # Uses with_storage_factory injection
+    ├── hms/                         # Uses with_storage_factory injection
+    ├── s3tables/                    # Uses with_storage_factory injection
+    └── sql/                         # Uses with_storage_factory injection
+```
+
+---
+
+## Design Phase 1: Storage Trait and Core Types
+
+Phase 1 focuses on converting Storage from an enum to a trait, introducing 
`StorageFactory` and `StorageConfig`, and updating `FileIO`, `InputFile`, and 
`OutputFile` to use the trait-based abstraction.
+
+### Storage Trait
+
+The `Storage` trait is defined in the `iceberg` crate and provides the 
interface for all storage operations. It uses `typetag` for serialization 
support across process boundaries.
+
+```rust
+#[async_trait]
+#[typetag::serde(tag = "type")]
+pub trait Storage: Debug + Send + Sync {
+    /// Check if a file exists at the given path
+    async fn exists(&self, path: &str) -> Result<bool>;
+
+    /// Get metadata from an input path
+    async fn metadata(&self, path: &str) -> Result<FileMetadata>;
+
+    /// Read bytes from a path
+    async fn read(&self, path: &str) -> Result<Bytes>;
+
+    /// Get FileRead from a path
+    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>>;
+
+    /// Write bytes to an output path
+    async fn write(&self, path: &str, bs: Bytes) -> Result<()>;
+
+    /// Get FileWrite from a path
+    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>>;
+
+    /// Delete a file at the given path
+    async fn delete(&self, path: &str) -> Result<()>;
+
+    /// Delete all files with the given prefix
+    async fn delete_prefix(&self, path: &str) -> Result<()>;
+
+    /// Create a new input file for reading
+    fn new_input(&self, path: &str) -> Result<InputFile>;
+
+    /// Create a new output file for writing
+    fn new_output(&self, path: &str) -> Result<OutputFile>;
+}
+```
+
+### StorageFactory Trait
+
+The `StorageFactory` trait creates `Storage` instances from configuration. 
This enables lazy initialization and custom storage injection.
+
+```rust
+#[typetag::serde(tag = "type")]
+pub trait StorageFactory: Debug + Send + Sync {
+    /// Build a new Storage instance from the given configuration.
+    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>>;
+}
+```
+
+### StorageConfig
+
+`StorageConfig` is a pure property container without scheme. The storage type 
is determined by explicit factory selection:
+
+```rust
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
+pub struct StorageConfig {
+    /// Configuration properties for the storage backend
+    props: HashMap<String, String>,
+}
+
+impl StorageConfig {
+    pub fn new() -> Self;
+    pub fn from_props(props: HashMap<String, String>) -> Self;
+    pub fn props(&self) -> &HashMap<String, String>;
+    pub fn get(&self, key: &str) -> Option<&String>;
+    pub fn with_prop(self, key: impl Into<String>, value: impl Into<String>) 
-> Self;
+    pub fn with_props(self, props: impl IntoIterator<Item = (impl 
Into<String>, impl Into<String>)>) -> Self;
+}
+```
+
+#### Backend-Specific Configuration Types
+
+In addition to `StorageConfig`, we provide typed configuration structs for 
each storage backend.
+These can be constructed from `StorageConfig` using `TryFrom` and provide a 
structured way to access backend-specific settings:
+
+- `S3Config` - Amazon S3 configuration
+- `GcsConfig` - Google Cloud Storage configuration
+- `OssConfig` - Alibaba Cloud OSS configuration
+- `AzdlsConfig` - Azure Data Lake Storage configuration
+
+Example of `S3Config`:
+
+```rust
+#[derive(Clone, Debug, Default, Serialize, Deserialize, TypedBuilder)]
+pub struct S3Config {
+    #[builder(default, setter(strip_option, into))]
+    endpoint: Option<String>,
+    #[builder(default, setter(strip_option, into))]
+    access_key_id: Option<String>,
+    #[builder(default, setter(strip_option, into))]
+    secret_access_key: Option<String>,
+    #[builder(default, setter(strip_option, into))]
+    region: Option<String>,
+    #[builder(default)]
+    allow_anonymous: bool,
+    // ... other S3-specific fields
+}
+
+impl S3Config {
+    /// Returns the S3 endpoint URL.
+    pub fn endpoint(&self) -> Option<&str> {
+        self.endpoint.as_deref()
+    }
+
+    // ... other getter methods
+}
+
+// Fallible conversion from StorageConfig
+impl TryFrom<&StorageConfig> for S3Config {
+    type Error = iceberg::Error;
+
+    fn try_from(config: &StorageConfig) -> Result<Self> {/* ... */}
+}
+```
+
+Usage with the builder pattern:
+
+```rust
+let s3_config = S3Config::builder()
+    .region("us-east-1")
+    .access_key_id("my-access-key")
+    .secret_access_key("my-secret-key")
+    .build();
+
+assert_eq!(s3_config.region(), Some("us-east-1"));
+```
+
+These typed configs are used internally by storage implementations (e.g., 
`OpenDalStorage`) to
+parse properties from `StorageConfig` into strongly-typed configuration.
+
+### FileIO and FileIOBuilder
+
+`FileIO` is redesigned to use lazy storage initialization with a factory 
pattern. Configuration is done via `FileIOBuilder`:
+
+```rust
+#[derive(Clone)]
+pub struct FileIO {
+    /// Storage configuration containing properties
+    config: StorageConfig,
+    /// Factory for creating storage instances
+    factory: Arc<dyn StorageFactory>,
+    /// Cached storage instance (lazily initialized)
+    storage: Arc<OnceCell<Arc<dyn Storage>>>,
+}
+
+impl FileIO {
+    /// Create a new FileIO backed by in-memory storage.
+    pub fn new_with_memory() -> Self;
+
+    /// Create a new FileIO backed by local filesystem storage.
+    pub fn new_with_fs() -> Self;
+
+    /// Convert this FileIO into a FileIOBuilder for modification.
+    pub fn into_builder(self) -> FileIOBuilder;
+
+    // File operations delegate to the lazily-initialized storage
+    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()>;
+    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()>;
+    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool>;
+    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile>;
+    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile>;
+}
+
+/// Builder for creating FileIO instances.
+pub struct FileIOBuilder {
+    factory: Arc<dyn StorageFactory>,
+    config: StorageConfig,
+}
+
+impl FileIOBuilder {
+    /// Create a new FileIOBuilder with the given storage factory.
+    pub fn new(factory: Arc<dyn StorageFactory>) -> Self;
+
+    /// Add a configuration property.
+    pub fn with_prop(self, key: impl Into<String>, value: impl Into<String>) 
-> Self;
+
+    /// Add multiple configuration properties.
+    pub fn with_props(self, props: impl IntoIterator<Item = (impl 
Into<String>, impl Into<String>)>) -> Self;
+
+    /// Get the storage configuration.
+    pub fn config(&self) -> &StorageConfig;
+
+    /// Build the FileIO instance.
+    pub fn build(self) -> Result<FileIO>;
+}
+```
+
+Key changes from the old design:
+- `FileIOBuilder` is used for configuration with explicit factory injection
+- `FileIO` has convenience constructors (`new_with_memory()`, `new_with_fs()`) 
for common cases
+- Removed `Extensions` - custom behavior is now provided via `StorageFactory`
+- Storage is lazily initialized on first use via `OnceCell`
+
+### InputFile and OutputFile Changes
+
+`InputFile` and `OutputFile` now hold a reference to `Arc<dyn Storage>` 
instead of an `Operator`:
+
+```rust
+pub struct InputFile {
+    storage: Arc<dyn Storage>,
+    path: String,
+}
+
+impl InputFile {
+    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self;
+    pub fn location(&self) -> &str;
+    pub async fn exists(&self) -> Result<bool>;
+    pub async fn metadata(&self) -> Result<FileMetadata>;
+    pub async fn read(&self) -> Result<Bytes>;
+    pub async fn reader(&self) -> Result<Box<dyn FileRead>>;
+}
+
+pub struct OutputFile {
+    storage: Arc<dyn Storage>,
+    path: String,
+}
+
+impl OutputFile {
+    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self;
+    pub fn location(&self) -> &str;
+    pub async fn exists(&self) -> Result<bool>;
+    pub async fn delete(&self) -> Result<()>;
+    pub fn to_input_file(self) -> InputFile;
+    pub async fn write(&self, bs: Bytes) -> Result<()>;
+    pub async fn writer(&self) -> Result<Box<dyn FileWrite>>;
+}
+```
+
+### Built-in Storage Implementations
+
+The `iceberg` crate includes two built-in storage implementations for testing 
and basic use cases:
+
+#### MemoryStorage
+
+In-memory storage using a thread-safe `HashMap`, primarily for testing:
+
+```rust
+#[derive(Debug, Clone, Default)]
+pub struct MemoryStorage {
+    data: Arc<RwLock<HashMap<String, Bytes>>>,
+}
+
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
+pub struct MemoryStorageFactory;
+
+#[typetag::serde]
+impl StorageFactory for MemoryStorageFactory {
+    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+        if config.scheme() != "memory" {
+            return Err(/* error */);
+        }
+        Ok(Arc::new(MemoryStorage::new()))
+    }
+}
+```
+
+#### LocalFsStorage
+
+Local filesystem storage using standard Rust `std::fs` operations:
+
+```rust
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct LocalFsStorage;
+
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
+pub struct LocalFsStorageFactory;
+
+#[typetag::serde]
+impl StorageFactory for LocalFsStorageFactory {
+    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+        if config.scheme() != "file" {
+            return Err(/* error */);
+        }
+        Ok(Arc::new(LocalFsStorage::new()))
+    }
+}
+```
+
+### CatalogBuilder Changes
+
+The `CatalogBuilder` trait is extended with `with_storage_factory()` to allow 
StorageFactory injection:
+
+```rust
+pub trait CatalogBuilder: Default + Debug + Send + Sync {
+    type C: Catalog;
+
+    /// Set a custom StorageFactory to use for storage operations.
+    ///
+    /// When a StorageFactory is provided, the catalog will use it to build 
FileIO
+    /// instances for all storage operations instead of using the default 
factory.
+    fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> 
Self;
+
+    /// Create a new catalog instance.
+    fn load(
+        self,
+        name: impl Into<String>,
+        props: HashMap<String, String>,
+    ) -> impl Future<Output = Result<Self::C>> + Send;
+}
+```
+
+Catalog implementations store the optional `StorageFactory` and use it when 
provided:
+
+```rust
+pub struct GlueCatalogBuilder {
+    config: GlueCatalogConfig,
+    storage_factory: Option<Arc<dyn StorageFactory>>,  // New field
+}
+
+impl CatalogBuilder for GlueCatalogBuilder {
+    fn with_storage_factory(mut self, storage_factory: Arc<dyn 
StorageFactory>) -> Self {
+        self.storage_factory = Some(storage_factory);
+        self
+    }
+
+    // In load():
+    // Use provided StorageFactory or LocalFsStorageFactory as fallback
+    let factory = storage_factory.unwrap_or_else(|| 
Arc::new(LocalFsStorageFactory));
+    let file_io = FileIOBuilder::new(factory)
+        .with_props(file_io_props)
+        .build()?;
+}
+```
+
+---
+
+## Design Part 2: Separate Storage Crate
+
+Phase 2 moves concrete OpenDAL-based implementations to a separate crate 
(`iceberg-storage-opendal`).
+
+### iceberg-storage-opendal Crate
+
+This crate provides OpenDAL-based storage implementations for cloud storage 
backends:
+
+```rust
+// crates/storage/opendal/src/storage.rs
+
+/// Explicit storage factory variants for OpenDAL-based backends.
+///
+/// Each variant represents a specific storage backend. Path scheme
+/// validation is handled by the underlying Storage implementation
+/// when operations are performed.
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum OpenDalStorageFactory {
+    /// Local filesystem storage factory.
+    #[cfg(feature = "storage-fs")]
+    Fs,
+
+    /// Amazon S3 storage factory.
+    #[cfg(feature = "storage-s3")]
+    S3,
+
+    /// Google Cloud Storage factory.
+    #[cfg(feature = "storage-gcs")]
+    Gcs,
+
+    /// Alibaba Cloud OSS storage factory.
+    #[cfg(feature = "storage-oss")]
+    Oss,
+
+    /// Azure Data Lake Storage factory.
+    #[cfg(feature = "storage-azdls")]
+    Azdls,
+}
+
+#[typetag::serde]
+impl StorageFactory for OpenDalStorageFactory {
+    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+        let storage = match self {
+            #[cfg(feature = "storage-fs")]
+            Self::Fs => OpenDalStorage::LocalFs,
+
+            #[cfg(feature = "storage-s3")]
+            Self::S3 => {
+                let iceberg_s3_config = S3Config::try_from(config)?;
+                let opendal_s3_config = 
s3_config_to_opendal(&iceberg_s3_config);
+                OpenDalStorage::S3 {
+                    configured_scheme: "s3".to_string(),
+                    config: opendal_s3_config.into(),
+                    customized_credential_load: None,
+                }
+            }
+            // ... other variants
+        };
+        Ok(Arc::new(storage))
+    }
+}
+
+/// Unified OpenDAL-based storage implementation.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub enum OpenDalStorage {
+    #[cfg(feature = "storage-fs")]
+    LocalFs,
+
+    #[cfg(feature = "storage-s3")]
+    S3 {
+        configured_scheme: String,
+        config: Arc<S3Config>,
+        #[serde(skip)]
+        customized_credential_load: Option<CustomAwsCredentialLoader>,
+    },
+
+    #[cfg(feature = "storage-gcs")]
+    Gcs { config: Arc<GcsConfig> },
+
+    #[cfg(feature = "storage-oss")]
+    Oss { config: Arc<OssConfig> },
+
+    #[cfg(feature = "storage-azdls")]
+    Azdls {
+        configured_scheme: AzureStorageScheme,
+        config: Arc<AzdlsConfig>,
+    },
+}
+
+impl OpenDalStorage {
+    /// Creates operator from path.
+    fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a 
str)>;
+}
+
+#[async_trait]
+#[typetag::serde]
+impl Storage for OpenDalStorage {
+    // Delegates all operations to the appropriate OpenDAL operator
+}
+```
+
+Feature flags in `iceberg-storage-opendal`:
+- `storage-s3` : Enables S3 storage backend
+- `storage-gcs`: Enables Google Cloud Storage backend
+- `storage-oss`: Enables Alibaba Cloud OSS backend
+- `storage-azdls`: Enables Azure Data Lake Storage backend
+- `storage-fs`: Enables OpenDAL filesystem backend
+- `storage-all`: Enables all storage backends
+
+---
+
+## Example Usage
+
+### Basic Usage with Memory Storage (Testing)
+
+```rust
+use iceberg::io::FileIO;
+
+// Create in-memory FileIO for testing
+let file_io = FileIO::new_with_memory();
+
+// Write and read files
+let output = file_io.new_output("memory://test/file.txt")?;
+output.write("Hello, World!".into()).await?;
+
+let input = file_io.new_input("memory://test/file.txt")?;
+let content = input.read().await?;
+assert_eq!(content, bytes::Bytes::from("Hello, World!"));
+```
+
+### Using OpenDAL Storage Factory
+
+```rust
+use std::sync::Arc;
+use iceberg::io::FileIOBuilder;
+use iceberg_storage_opendal::OpenDalStorageFactory;
+
+// Create FileIO with explicit S3 factory
+let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3))
+    .with_prop("s3.region", "us-east-1")
+    .with_prop("s3.access-key-id", "my-access-key")
+    .with_prop("s3.secret-access-key", "my-secret-key")
+    .build()?;
+
+// Use the FileIO
+let input = file_io.new_input("s3://my-bucket/warehouse/table/metadata.json")?;
+let metadata = input.read().await?;
+```
+
+### Using Catalogs with Custom Storage
+
+When using a catalog without injecting a custom `StorageFactory`, the catalog 
falls back to
+`LocalFsStorageFactory`. For cloud storage, inject the appropriate factory:
+
+```rust
+use std::collections::HashMap;
+use std::sync::Arc;
+use iceberg::CatalogBuilder;
+use iceberg_catalog_glue::GlueCatalogBuilder;
+use iceberg_storage_opendal::OpenDalStorageFactory;
+
+// Inject S3 storage factory for cloud storage support
+let catalog = GlueCatalogBuilder::default()
+    .with_storage_factory(Arc::new(OpenDalStorageFactory::S3))
+    .load("my_catalog", HashMap::from([
+        ("warehouse".to_string(), "s3://my-bucket/warehouse".to_string()),
+        ("s3.region".to_string(), "us-east-1".to_string()),
+    ]))
+    .await?;
+
+// Load and scan a table - storage is handled automatically
+let table = catalog.load_table(&TableIdent::from_strs(["db", 
"my_table"])?).await?;
+let scan = table.scan().build()?;
+```
+
+### Injecting Custom StorageFactory into Catalogs
+
+For advanced use cases, you can inject a custom `StorageFactory` with specific 
storage configuration:
+
+```rust
+use std::collections::HashMap;
+use std::sync::Arc;
+use iceberg::CatalogBuilder;
+use iceberg::io::FileIOBuilder;
+use iceberg_catalog_glue::GlueCatalogBuilder;
+use iceberg_storage_opendal::OpenDalStorageFactory;
+
+// Create a custom StorageFactory
+let storage_factory = Arc::new(OpenDalStorageFactory::S3);
+
+// Inject StorageFactory into catalog
+let catalog = GlueCatalogBuilder::default()
+    .with_storage_factory(storage_factory)
+    .load("my_catalog", HashMap::from([
+        ("warehouse".to_string(), "s3://my-bucket/warehouse".to_string()),
+        ("s3.region".to_string(), "us-east-1".to_string()),
+    ]))
+    .await?;
+```
+
+### Implementing Custom Storage
+
+To implement a custom storage backend, implement the `Storage` trait with 
`#[typetag::serde]`:
+
+```rust
+use std::sync::Arc;
+use async_trait::async_trait;
+use iceberg::io::{Storage, StorageFactory, StorageConfig, InputFile, 
OutputFile};
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct MyCustomStorage { /* fields */ }
+
+#[async_trait]
+#[typetag::serde]
+impl Storage for MyCustomStorage {
+    // Implement all required methods: exists, metadata, read, reader,
+    // write, writer, delete, delete_prefix, new_input, new_output
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct MyCustomStorageFactory;
+
+#[typetag::serde]
+impl StorageFactory for MyCustomStorageFactory {
+    fn build(&self, config: &StorageConfig) -> iceberg::Result<Arc<dyn 
Storage>> {
+        Ok(Arc::new(MyCustomStorage { /* ... */ }))
+    }
+}
+```
+
+### Routing to Multiple Storage Backends
+
+To use different storage implementations for different schemes (e.g., a native 
S3 client
+for S3 and OpenDAL for other schemes), implement routing at the `Storage` 
level:
+
+```rust
+use std::sync::Arc;
+use async_trait::async_trait;
+use iceberg::io::{Storage, StorageFactory, StorageConfig, InputFile, 
OutputFile};
+
+/// A storage that routes to different backends based on path scheme
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct RoutingStorage {
+    s3_storage: NativeS3Storage,      // Custom S3 implementation
+    opendal_storage: OpenDalStorage,  // OpenDAL for other schemes
+}
+
+#[async_trait]
+#[typetag::serde]
+impl Storage for RoutingStorage {
+    async fn read(&self, path: &str) -> iceberg::Result<bytes::Bytes> {
+        if path.starts_with("s3://") || path.starts_with("s3a://") {
+            self.s3_storage.read(path).await
+        } else {
+            self.opendal_storage.read(path).await
+        }
+    }
+
+    // Route other methods similarly...
+}
+
+/// Factory that creates RoutingStorage
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct RoutingStorageFactory;
+
+#[typetag::serde]
+impl StorageFactory for RoutingStorageFactory {
+    fn build(&self, config: &StorageConfig) -> iceberg::Result<Arc<dyn 
Storage>> {
+        Ok(Arc::new(RoutingStorage {
+            s3_storage: NativeS3Storage::new(config)?,
+            opendal_storage: OpenDalStorage::build_from_config(config)?,
+        }))
+    }
+}
+```
+
+---
+
+
+## Implementation Plan
+
+### Phase 1: Storage Trait
+- Define `Storage` trait in `iceberg` crate
+- Define `StorageFactory` trait in `iceberg` crate
+- Introduce `StorageConfig` for configuration properties
+- Update `FileIO` to use lazy storage initialization with factory pattern
+- Update `InputFile`/`OutputFile` to use `Arc<dyn Storage>`
+- Implement `MemoryStorage` and `LocalFsStorage` in `iceberg` crate
+- Add `with_storage_factory()` to `CatalogBuilder` trait
+- Update all catalog implementations to support StorageFactory injection
+- Improve naming: Storage handles locations rather than paths
+
+### Phase 2: Separate Storage Crate
+- Create `iceberg-storage-opendal` crate with `OpenDalStorage` and 
`OpenDalStorageFactory`
+- Move S3, GCS, OSS, Azure implementations to `iceberg-storage-opendal`
+- Remove storage feature flags from `iceberg` crate
+
+### Future Work
+- Implement ResolvingStorage backed by OpenDal
+- Add `object_store`-based storage implementations
+- Consider introducing `IoErrorKind` for storage-specific error handling
+- Introduce custom key values in StorageConfigs


Reply via email to