This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c4084495d refactor!(storage): Move OpenDal Storage to a new crate 
iceberg-storage-opendal (#2207)
c4084495d is described below

commit c4084495d3dd38299324e6f570ebd52ebe845ee8
Author: Shawn Chang <[email protected]>
AuthorDate: Wed Mar 4 18:58:26 2026 -0800

    refactor!(storage): Move OpenDal Storage to a new crate 
iceberg-storage-opendal (#2207)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #2209
    
    ## What changes are included in this PR?
    - Moved OpenDal Storage impl to a new crate
    iceberg-storage-opendal(crates/storage/opendal)
    - Renamed feature flags `storage-s3`, `storage-xxx` to `opendal-s3`,
    `opendal-xxx`
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    ## Are these changes tested?
    Relying on the existing uts
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
---
 .github/workflows/publish.yml                      |   1 +
 Cargo.lock                                         |  26 ++-
 Cargo.toml                                         |   3 +
 bindings/python/Cargo.toml                         |   1 +
 bindings/python/src/datafusion_table_provider.rs   |   3 +-
 crates/catalog/glue/Cargo.toml                     |   1 +
 crates/catalog/glue/src/catalog.rs                 |   5 +-
 crates/catalog/glue/tests/glue_catalog_test.rs     |   6 +-
 crates/catalog/hms/Cargo.toml                      |   1 +
 crates/catalog/hms/tests/hms_catalog_test.rs       |   6 +-
 crates/catalog/s3tables/Cargo.toml                 |   1 +
 crates/catalog/s3tables/src/catalog.rs             |   3 +-
 crates/examples/Cargo.toml                         |   3 +-
 crates/examples/src/oss_backend.rs                 |   2 +-
 crates/iceberg/Cargo.toml                          |  12 +-
 crates/iceberg/src/catalog/mod.rs                  |   3 +-
 crates/iceberg/src/error.rs                        |   6 -
 crates/iceberg/src/io/storage/mod.rs               |  14 --
 crates/integration_tests/Cargo.toml                |   1 +
 crates/integration_tests/tests/common/mod.rs       |   2 +-
 .../tests/conflict_commit_test.rs                  |   2 +-
 .../integration_tests/tests/read_evolved_schema.rs |   2 +-
 .../tests/read_positional_deletes.rs               |   2 +-
 .../s3tables => storage/opendal}/Cargo.toml        |  43 ++--
 .../opendal => storage/opendal/src}/azdls.rs       |  26 ++-
 .../storage/opendal => storage/opendal/src}/fs.rs  |   7 +-
 .../storage/opendal => storage/opendal/src}/gcs.rs |  15 +-
 .../opendal/mod.rs => storage/opendal/src/lib.rs}  | 241 ++++++++++++---------
 .../opendal => storage/opendal/src}/memory.rs      |   7 +-
 .../storage/opendal => storage/opendal/src}/oss.rs |   7 +-
 .../storage/opendal => storage/opendal/src}/s3.rs  |  17 +-
 .../opendal/fs.rs => storage/opendal/src/utils.rs} |  20 +-
 .../opendal}/tests/file_io_gcs_test.rs             |   7 +-
 .../opendal}/tests/file_io_s3_test.rs              |   6 +-
 34 files changed, 292 insertions(+), 210 deletions(-)

diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 4504f6e2f..c1c904615 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -39,6 +39,7 @@ jobs:
         # Order here is sensitive, as it will be used to determine the order 
of publishing
         package:
           - "crates/iceberg"
+          - "crates/storage/opendal"
           - "crates/catalog/glue"
           - "crates/catalog/hms"
           - "crates/catalog/rest"
diff --git a/Cargo.lock b/Cargo.lock
index 8f5df819a..9a460707e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3299,13 +3299,11 @@ dependencies = [
  "moka",
  "murmur3",
  "once_cell",
- "opendal",
  "ordered-float 4.6.0",
  "parquet",
  "pretty_assertions",
  "rand 0.8.5",
  "regex",
- "reqsign",
  "reqwest",
  "roaring",
  "serde",
@@ -3341,6 +3339,7 @@ dependencies = [
  "aws-config",
  "aws-sdk-glue",
  "iceberg",
+ "iceberg-storage-opendal",
  "iceberg_test_utils",
  "serde_json",
  "tokio",
@@ -3357,6 +3356,7 @@ dependencies = [
  "faststr",
  "hive_metastore",
  "iceberg",
+ "iceberg-storage-opendal",
  "iceberg_test_utils",
  "linkedbytes",
  "metainfo",
@@ -3415,6 +3415,7 @@ dependencies = [
  "aws-config",
  "aws-sdk-s3tables",
  "iceberg",
+ "iceberg-storage-opendal",
  "iceberg_test_utils",
  "itertools 0.13.0",
  "tokio",
@@ -3458,6 +3459,7 @@ dependencies = [
  "futures",
  "iceberg",
  "iceberg-catalog-rest",
+ "iceberg-storage-opendal",
  "tokio",
 ]
 
@@ -3470,6 +3472,7 @@ dependencies = [
  "futures",
  "iceberg",
  "iceberg-catalog-rest",
+ "iceberg-storage-opendal",
  "iceberg_test_utils",
  "ordered-float 2.10.1",
  "parquet",
@@ -3521,6 +3524,25 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "iceberg-storage-opendal"
+version = "0.8.0"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "cfg-if",
+ "iceberg",
+ "iceberg_test_utils",
+ "opendal",
+ "reqsign",
+ "reqwest",
+ "serde",
+ "tokio",
+ "typetag",
+ "url",
+]
+
 [[package]]
 name = "iceberg_test_utils"
 version = "0.8.0"
diff --git a/Cargo.toml b/Cargo.toml
index 6eba22459..c5d0c4239 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,6 +24,7 @@ members = [
   "crates/integration_tests",
   "crates/integrations/*",
   "crates/sqllogictest",
+  "crates/storage/*",
   "crates/test_utils",
 ]
 resolver = "2"
@@ -59,6 +60,7 @@ backon = "1.5.1"
 base64 = "0.22.1"
 bimap = "0.6"
 bytes = "1.11"
+cfg-if = "1"
 chrono = "0.4.41"
 clap = { version = "4.5.48", features = ["derive", "cargo"] }
 dashmap = "6"
@@ -85,6 +87,7 @@ iceberg-catalog-rest = { version = "0.8.0", path = 
"./crates/catalog/rest" }
 iceberg-catalog-s3tables = { version = "0.8.0", path = 
"./crates/catalog/s3tables" }
 iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" }
 iceberg-datafusion = { version = "0.8.0", path = 
"./crates/integrations/datafusion" }
+iceberg-storage-opendal = { version = "0.8.0", path = 
"./crates/storage/opendal" }
 indicatif = "0.18"
 itertools = "0.13"
 libtest-mimic = "0.8.1"
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index aaa8c7bec..10826798a 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -33,6 +33,7 @@ crate-type = ["cdylib"]
 [dependencies]
 arrow = { version = "57.1", features = ["pyarrow", "chrono-tz"] }
 iceberg = { path = "../../crates/iceberg" }
+iceberg-storage-opendal = { path = "../../crates/storage/opendal", features = 
["opendal-s3", "opendal-fs", "opendal-memory"] }
 pyo3 = { version = "0.26", features = ["extension-module", "abi3-py310"] }
 iceberg-datafusion = { path = "../../crates/integrations/datafusion" }
 datafusion-ffi = { version = "52.1" }
diff --git a/bindings/python/src/datafusion_table_provider.rs 
b/bindings/python/src/datafusion_table_provider.rs
index d4a0234ff..7fa9f53db 100644
--- a/bindings/python/src/datafusion_table_provider.rs
+++ b/bindings/python/src/datafusion_table_provider.rs
@@ -22,9 +22,10 @@ use std::sync::Arc;
 use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
 use datafusion_ffi::table_provider::FFI_TableProvider;
 use iceberg::TableIdent;
-use iceberg::io::{FileIOBuilder, OpenDalStorageFactory, StorageFactory};
+use iceberg::io::{FileIOBuilder, StorageFactory};
 use iceberg::table::StaticTable;
 use iceberg_datafusion::table::IcebergStaticTableProvider;
+use iceberg_storage_opendal::OpenDalStorageFactory;
 use pyo3::exceptions::{PyRuntimeError, PyValueError};
 use pyo3::prelude::{PyAnyMethods, PyCapsuleMethods, *};
 use pyo3::types::{PyAny, PyCapsule};
diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml
index f42fedeae..e41253de3 100644
--- a/crates/catalog/glue/Cargo.toml
+++ b/crates/catalog/glue/Cargo.toml
@@ -34,6 +34,7 @@ async-trait = { workspace = true }
 aws-config = { workspace = true }
 aws-sdk-glue = { workspace = true }
 iceberg = { workspace = true }
+iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] }
 serde_json = { workspace = true }
 tokio = { workspace = true }
 tracing = { workspace = true }
diff --git a/crates/catalog/glue/src/catalog.rs 
b/crates/catalog/glue/src/catalog.rs
index 789490006..bf2f39233 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -25,8 +25,8 @@ use aws_sdk_glue::operation::create_table::CreateTableError;
 use aws_sdk_glue::operation::update_table::UpdateTableError;
 use aws_sdk_glue::types::TableInput;
 use iceberg::io::{
-    FileIO, FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, 
S3_ENDPOINT, S3_REGION,
-    S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory,
+    FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY,
+    S3_SESSION_TOKEN, StorageFactory,
 };
 use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
@@ -34,6 +34,7 @@ use iceberg::{
     Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, 
NamespaceIdent, Result,
     TableCommit, TableCreation, TableIdent,
 };
+use iceberg_storage_opendal::OpenDalStorageFactory;
 
 use crate::error::{from_aws_build_error, from_aws_sdk_error};
 use crate::utils::{
diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs 
b/crates/catalog/glue/tests/glue_catalog_test.rs
index a209ae09c..0b7dbe9f2 100644
--- a/crates/catalog/glue/tests/glue_catalog_test.rs
+++ b/crates/catalog/glue/tests/glue_catalog_test.rs
@@ -23,10 +23,7 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use iceberg::io::{
-    FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, 
S3_REGION,
-    S3_SECRET_ACCESS_KEY,
-};
+use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
 use iceberg::transaction::{ApplyTransactionAction, Transaction};
 use iceberg::{
@@ -36,6 +33,7 @@ use iceberg_catalog_glue::{
     AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, 
GLUE_CATALOG_PROP_URI,
     GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder,
 };
+use iceberg_storage_opendal::OpenDalStorageFactory;
 use iceberg_test_utils::{
     cleanup_namespace, get_glue_endpoint, get_minio_endpoint, 
normalize_test_name_with_parts,
     set_up,
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index a6517fb7b..c657394b8 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -54,6 +54,7 @@ motore-macros = { workspace = true }
 volo = { workspace = true }
 
 [dev-dependencies]
+iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] }
 iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
 
 [package.metadata.cargo-machete]
diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs 
b/crates/catalog/hms/tests/hms_catalog_test.rs
index c5d77ac37..74c9e52e9 100644
--- a/crates/catalog/hms/tests/hms_catalog_test.rs
+++ b/crates/catalog/hms/tests/hms_catalog_test.rs
@@ -23,16 +23,14 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use iceberg::io::{
-    FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, 
S3_REGION,
-    S3_SECRET_ACCESS_KEY,
-};
+use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, 
TableCreation, TableIdent};
 use iceberg_catalog_hms::{
     HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, 
HMS_CATALOG_PROP_WAREHOUSE,
     HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED,
 };
+use iceberg_storage_opendal::OpenDalStorageFactory;
 use iceberg_test_utils::{
     cleanup_namespace, get_hms_endpoint, get_minio_endpoint, 
normalize_test_name_with_parts, set_up,
 };
diff --git a/crates/catalog/s3tables/Cargo.toml 
b/crates/catalog/s3tables/Cargo.toml
index fde08b9a4..2fe096fec 100644
--- a/crates/catalog/s3tables/Cargo.toml
+++ b/crates/catalog/s3tables/Cargo.toml
@@ -35,6 +35,7 @@ async-trait = { workspace = true }
 aws-config = { workspace = true }
 aws-sdk-s3tables = { workspace = true }
 iceberg = { workspace = true }
+iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] }
 
 
 [dev-dependencies]
diff --git a/crates/catalog/s3tables/src/catalog.rs 
b/crates/catalog/s3tables/src/catalog.rs
index 58100ccbc..afe28ae45 100644
--- a/crates/catalog/s3tables/src/catalog.rs
+++ b/crates/catalog/s3tables/src/catalog.rs
@@ -26,13 +26,14 @@ use aws_sdk_s3tables::operation::get_table::GetTableOutput;
 use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
 use 
aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
 use aws_sdk_s3tables::types::OpenTableFormat;
-use iceberg::io::{FileIO, FileIOBuilder, OpenDalStorageFactory, 
StorageFactory};
+use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
 use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
     Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, 
NamespaceIdent, Result,
     TableCommit, TableCreation, TableIdent,
 };
+use iceberg_storage_opendal::OpenDalStorageFactory;
 
 use crate::utils::create_sdk_config;
 
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index c7874d9a1..cfdd78ee9 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -28,6 +28,7 @@ version = { workspace = true }
 futures = { workspace = true }
 iceberg = { workspace = true }
 iceberg-catalog-rest = { workspace = true }
+iceberg-storage-opendal = { workspace = true, optional = true }
 tokio = { workspace = true, features = ["full"] }
 
 [[example]]
@@ -45,4 +46,4 @@ required-features = ["storage-oss"]
 
 [features]
 default = []
-storage-oss = ["iceberg/storage-oss"]
+storage-oss = ["iceberg-storage-opendal/opendal-oss"]
diff --git a/crates/examples/src/oss_backend.rs 
b/crates/examples/src/oss_backend.rs
index 9835b8dc4..2d56cceb7 100644
--- a/crates/examples/src/oss_backend.rs
+++ b/crates/examples/src/oss_backend.rs
@@ -19,9 +19,9 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use futures::stream::StreamExt;
-use iceberg::io::OpenDalStorageFactory;
 use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent};
 use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder};
+use iceberg_storage_opendal::OpenDalStorageFactory;
 
 // Configure these values according to your environment
 
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index d6d931c86..41ee77161 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -29,15 +29,7 @@ license = { workspace = true }
 repository = { workspace = true }
 
 [features]
-default = ["storage-memory", "storage-fs", "storage-s3"]
-storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
-
-storage-azdls = ["opendal/services-azdls"]
-storage-fs = ["opendal/services-fs"]
-storage-gcs = ["opendal/services-gcs"]
-storage-memory = ["opendal/services-memory"]
-storage-oss = ["opendal/services-oss"]
-storage-s3 = ["opendal/services-s3", "reqsign"]
+default = []
 
 
 [dependencies]
@@ -68,11 +60,9 @@ itertools = { workspace = true }
 moka = { version = "0.12.10", features = ["future"] }
 murmur3 = { workspace = true }
 once_cell = { workspace = true }
-opendal = { workspace = true }
 ordered-float = { workspace = true }
 parquet = { workspace = true, features = ["async"] }
 rand = { workspace = true }
-reqsign = { version = "0.16.3", optional = true, default-features = false }
 reqwest = { workspace = true }
 roaring = { workspace = true }
 fastnum = { workspace = true }
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index e5014c3e7..8db9674ad 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -129,7 +129,8 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync {
     ///
     /// ```rust,ignore
     /// use iceberg::CatalogBuilder;
-    /// use iceberg::io::{OpenDalStorageFactory, StorageFactory};
+    /// use iceberg::io::StorageFactory;
+    /// use iceberg_storage_opendal::OpenDalStorageFactory;
     /// use std::sync::Arc;
     ///
     /// let catalog = MyCatalogBuilder::default()
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 8810ef500..a0399a808 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -384,12 +384,6 @@ define_from_err!(
     "Failure in conversion with avro"
 );
 
-define_from_err!(
-    opendal::Error,
-    ErrorKind::Unexpected,
-    "Failure in doing io operation"
-);
-
 define_from_err!(
     url::ParseError,
     ErrorKind::DataInvalid,
diff --git a/crates/iceberg/src/io/storage/mod.rs 
b/crates/iceberg/src/io/storage/mod.rs
index 31ceaf243..3c7c555a5 100644
--- a/crates/iceberg/src/io/storage/mod.rs
+++ b/crates/iceberg/src/io/storage/mod.rs
@@ -20,7 +20,6 @@
 mod config;
 mod local_fs;
 mod memory;
-mod opendal;
 
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -30,9 +29,6 @@ use bytes::Bytes;
 pub use config::*;
 pub use local_fs::{LocalFsStorage, LocalFsStorageFactory};
 pub use memory::{MemoryStorage, MemoryStorageFactory};
-#[cfg(feature = "storage-s3")]
-pub use opendal::CustomAwsCredentialLoader;
-pub use opendal::{OpenDalStorage, OpenDalStorageFactory};
 
 use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile};
 use crate::Result;
@@ -69,11 +65,6 @@ use crate::Result;
 ///     }
 ///     // ... implement other methods
 /// }
-///
-/// TODO remove below when the trait is integrated with FileIO and Catalog
-/// # NOTE
-/// This trait is under heavy development and is not used anywhere as of now
-/// Please DO NOT implement it
 /// ```
 #[async_trait]
 #[typetag::serde(tag = "type")]
@@ -130,11 +121,6 @@ pub trait Storage: Debug + Send + Sync {
 ///         todo!()
 ///     }
 /// }
-///
-/// TODO remove below when the trait is integrated with FileIO and Catalog
-/// # NOTE
-/// This trait is under heavy development and is not used anywhere as of now
-/// Please DO NOT implement it
 /// ```
 #[typetag::serde(tag = "type")]
 pub trait StorageFactory: Debug + Send + Sync {
diff --git a/crates/integration_tests/Cargo.toml 
b/crates/integration_tests/Cargo.toml
index 2ed211769..774ef13fc 100644
--- a/crates/integration_tests/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -30,6 +30,7 @@ arrow-schema = { workspace = true }
 futures = { workspace = true }
 iceberg = { workspace = true }
 iceberg-catalog-rest = { workspace = true }
+iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] }
 iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
 parquet = { workspace = true }
 tokio = { workspace = true }
diff --git a/crates/integration_tests/tests/common/mod.rs 
b/crates/integration_tests/tests/common/mod.rs
index 957c8be5c..e49a57465 100644
--- a/crates/integration_tests/tests/common/mod.rs
+++ b/crates/integration_tests/tests/common/mod.rs
@@ -18,11 +18,11 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use iceberg::io::OpenDalStorageFactory;
 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent};
 use iceberg_catalog_rest::RestCatalogBuilder;
 use iceberg_integration_tests::get_test_fixture;
+use iceberg_storage_opendal::OpenDalStorageFactory;
 
 pub async fn random_ns() -> Namespace {
     let fixture = get_test_fixture();
diff --git a/crates/integration_tests/tests/conflict_commit_test.rs 
b/crates/integration_tests/tests/conflict_commit_test.rs
index dc3030519..3b1362b95 100644
--- a/crates/integration_tests/tests/conflict_commit_test.rs
+++ b/crates/integration_tests/tests/conflict_commit_test.rs
@@ -24,7 +24,6 @@ use std::sync::Arc;
 use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, 
StringArray};
 use common::{random_ns, test_schema};
 use futures::TryStreamExt;
-use iceberg::io::OpenDalStorageFactory;
 use iceberg::transaction::{ApplyTransactionAction, Transaction};
 use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
 use iceberg::writer::file_writer::ParquetWriterBuilder;
@@ -36,6 +35,7 @@ use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
 use iceberg::{Catalog, CatalogBuilder, TableCreation};
 use iceberg_catalog_rest::RestCatalogBuilder;
 use iceberg_integration_tests::get_test_fixture;
+use iceberg_storage_opendal::OpenDalStorageFactory;
 use parquet::file::properties::WriterProperties;
 
 #[tokio::test]
diff --git a/crates/integration_tests/tests/read_evolved_schema.rs 
b/crates/integration_tests/tests/read_evolved_schema.rs
index 78d833b3a..ae25a0898 100644
--- a/crates/integration_tests/tests/read_evolved_schema.rs
+++ b/crates/integration_tests/tests/read_evolved_schema.rs
@@ -22,11 +22,11 @@ use std::sync::Arc;
 use arrow_array::{Decimal128Array, Float64Array, Int64Array, StringArray};
 use futures::TryStreamExt;
 use iceberg::expr::Reference;
-use iceberg::io::OpenDalStorageFactory;
 use iceberg::spec::Datum;
 use iceberg::{Catalog, CatalogBuilder, TableIdent};
 use iceberg_catalog_rest::RestCatalogBuilder;
 use iceberg_integration_tests::get_test_fixture;
+use iceberg_storage_opendal::OpenDalStorageFactory;
 use ordered_float::OrderedFloat;
 
 #[tokio::test]
diff --git a/crates/integration_tests/tests/read_positional_deletes.rs 
b/crates/integration_tests/tests/read_positional_deletes.rs
index 71ff128d1..d4c4afeaf 100644
--- a/crates/integration_tests/tests/read_positional_deletes.rs
+++ b/crates/integration_tests/tests/read_positional_deletes.rs
@@ -20,10 +20,10 @@
 use std::sync::Arc;
 
 use futures::TryStreamExt;
-use iceberg::io::OpenDalStorageFactory;
 use iceberg::{Catalog, CatalogBuilder, TableIdent};
 use iceberg_catalog_rest::RestCatalogBuilder;
 use iceberg_integration_tests::get_test_fixture;
+use iceberg_storage_opendal::OpenDalStorageFactory;
 
 #[tokio::test]
 async fn test_read_table_with_positional_deletes() {
diff --git a/crates/catalog/s3tables/Cargo.toml 
b/crates/storage/opendal/Cargo.toml
similarity index 52%
copy from crates/catalog/s3tables/Cargo.toml
copy to crates/storage/opendal/Cargo.toml
index fde08b9a4..e0a3cf8ed 100644
--- a/crates/catalog/s3tables/Cargo.toml
+++ b/crates/storage/opendal/Cargo.toml
@@ -16,28 +16,43 @@
 # under the License.
 
 [package]
+name = "iceberg-storage-opendal"
 edition = { workspace = true }
-homepage = { workspace = true }
-name = "iceberg-catalog-s3tables"
-rust-version = { workspace = true }
 version = { workspace = true }
-readme = "README.md"
-
-categories = ["database"]
-description = "Apache Iceberg Rust S3Tables Catalog"
-keywords = ["iceberg", "sql", "catalog"]
 license = { workspace = true }
 repository = { workspace = true }
 
+categories = ["database"]
+description = "Apache Iceberg OpenDAL storage implementation"
+keywords = ["iceberg", "opendal", "storage"]
+
+[features]
+default = ["opendal-memory", "opendal-fs", "opendal-s3"]
+opendal-all = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", 
"opendal-oss", "opendal-azdls"]
+
+opendal-azdls = ["opendal/services-azdls"]
+opendal-fs = ["opendal/services-fs"]
+opendal-gcs = ["opendal/services-gcs"]
+opendal-memory = ["opendal/services-memory"]
+opendal-oss = ["opendal/services-oss"]
+opendal-s3 = ["opendal/services-s3", "reqsign"]
+
 [dependencies]
 anyhow = { workspace = true }
-async-trait = { workspace = true }
-aws-config = { workspace = true }
-aws-sdk-s3tables = { workspace = true }
+cfg-if = { workspace = true }
 iceberg = { workspace = true }
-
+opendal = { workspace = true }
+async-trait = { workspace = true }
+bytes = { workspace = true }
+reqsign = { version = "0.16.3", optional = true, default-features = false }
+reqwest = { workspace = true }
+serde = { workspace = true }
+typetag = { workspace = true }
+url = { workspace = true }
 
 [dev-dependencies]
+async-trait = { workspace = true }
 iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
-itertools = { workspace = true }
-tokio = { workspace = true }
+reqsign = { version = "0.16.3", default-features = false }
+reqwest = { workspace = true }
+tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
diff --git a/crates/iceberg/src/io/storage/opendal/azdls.rs 
b/crates/storage/opendal/src/azdls.rs
similarity index 97%
rename from crates/iceberg/src/io/storage/opendal/azdls.rs
rename to crates/storage/opendal/src/azdls.rs
index 759ff301e..70caae7c4 100644
--- a/crates/iceberg/src/io/storage/opendal/azdls.rs
+++ b/crates/storage/opendal/src/azdls.rs
@@ -19,16 +19,28 @@ use std::collections::HashMap;
 use std::fmt::Display;
 use std::str::FromStr;
 
+use iceberg::io::{
+    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, 
ADLS_CLIENT_SECRET,
+    ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID,
+};
+use iceberg::{Error, ErrorKind, Result};
 use opendal::Configurator;
 use opendal::services::AzdlsConfig;
 use serde::{Deserialize, Serialize};
 use url::Url;
 
-use crate::io::{
-    ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, 
ADLS_CLIENT_SECRET,
-    ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID,
-};
-use crate::{Error, ErrorKind, Result, ensure_data_valid};
+use crate::utils::from_opendal_error;
+
+/// Local version of `ensure_data_valid` macro since the iceberg crate's macro
+/// uses `$crate::error::Error` paths that don't resolve from external crates
+/// (the `error` module is private).
+macro_rules! ensure_data_valid {
+    ($cond:expr, $fmt:literal, $($arg:tt)*) => {
+        if !$cond {
+            return Err(Error::new(ErrorKind::DataInvalid, format!($fmt, 
$($arg)*)));
+        }
+    };
+}
 
 /// Parses adls.* prefixed configuration properties.
 pub(crate) fn azdls_config_parse(mut properties: HashMap<String, String>) -> 
Result<AzdlsConfig> {
@@ -201,7 +213,9 @@ fn azdls_config_build(config: &AzdlsConfig, path: 
&AzureStoragePath) -> Result<o
     }
     builder = builder.filesystem(&path.filesystem);
 
-    Ok(opendal::Operator::new(builder)?.finish())
+    Ok(opendal::Operator::new(builder)
+        .map_err(from_opendal_error)?
+        .finish())
 }
 
 /// Represents a fully qualified path to blob/ file in Azure Storage.
diff --git a/crates/iceberg/src/io/storage/opendal/fs.rs 
b/crates/storage/opendal/src/fs.rs
similarity index 87%
copy from crates/iceberg/src/io/storage/opendal/fs.rs
copy to crates/storage/opendal/src/fs.rs
index d3e121a08..bfce06c7a 100644
--- a/crates/iceberg/src/io/storage/opendal/fs.rs
+++ b/crates/storage/opendal/src/fs.rs
@@ -15,15 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iceberg::Result;
 use opendal::Operator;
 use opendal::services::FsConfig;
 
-use crate::Result;
+use crate::utils::from_opendal_error;
 
 /// Build new opendal operator from give path.
 pub(crate) fn fs_config_build() -> Result<Operator> {
     let mut cfg = FsConfig::default();
     cfg.root = Some("/".to_string());
 
-    Ok(Operator::from_config(cfg)?.finish())
+    Ok(Operator::from_config(cfg)
+        .map_err(from_opendal_error)?
+        .finish())
 }
diff --git a/crates/iceberg/src/io/storage/opendal/gcs.rs 
b/crates/storage/opendal/src/gcs.rs
similarity index 91%
rename from crates/iceberg/src/io/storage/opendal/gcs.rs
rename to crates/storage/opendal/src/gcs.rs
index 4cb8aa859..7d4738c90 100644
--- a/crates/iceberg/src/io/storage/opendal/gcs.rs
+++ b/crates/storage/opendal/src/gcs.rs
@@ -18,15 +18,16 @@
 
 use std::collections::HashMap;
 
+use iceberg::io::{
+    GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, 
GCS_DISABLE_VM_METADATA,
+    GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN,
+};
+use iceberg::{Error, ErrorKind, Result};
 use opendal::Operator;
 use opendal::services::GcsConfig;
 use url::Url;
 
-use crate::io::{
-    GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, 
GCS_DISABLE_VM_METADATA,
-    GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, is_truthy,
-};
-use crate::{Error, ErrorKind, Result};
+use crate::utils::{from_opendal_error, is_truthy};
 
 /// Parse iceberg properties to [`GcsConfig`].
 pub(crate) fn gcs_config_parse(mut m: HashMap<String, String>) -> 
Result<GcsConfig> {
@@ -81,5 +82,7 @@ pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) 
-> Result<Operator>
 
     let mut cfg = cfg.clone();
     cfg.bucket = bucket.to_string();
-    Ok(Operator::from_config(cfg)?.finish())
+    Ok(Operator::from_config(cfg)
+        .map_err(from_opendal_error)?
+        .finish())
 }
diff --git a/crates/iceberg/src/io/storage/opendal/mod.rs 
b/crates/storage/opendal/src/lib.rs
similarity index 70%
rename from crates/iceberg/src/io/storage/opendal/mod.rs
rename to crates/storage/opendal/src/lib.rs
index 52d00687e..1e5043aca 100644
--- a/crates/iceberg/src/io/storage/opendal/mod.rs
+++ b/crates/storage/opendal/src/lib.rs
@@ -15,60 +15,76 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! OpenDAL-based storage implementation.
+//! OpenDAL-based storage implementation for Apache Iceberg.
+//!
+//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`],
+//! which implement the [`Storage`](iceberg::io::Storage) and
+//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` 
crate
+//! using [OpenDAL](https://opendal.apache.org/) as the backend.
+
+mod utils;
 
-use std::ops::Range;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-#[cfg(feature = "storage-azdls")]
-use azdls::AzureStorageScheme;
 use bytes::Bytes;
+use cfg_if::cfg_if;
+use iceberg::io::{
+    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, 
StorageConfig,
+    StorageFactory,
+};
+use iceberg::{Error, ErrorKind, Result};
 use opendal::Operator;
 use opendal::layers::RetryLayer;
-#[cfg(feature = "storage-azdls")]
-use opendal::services::AzdlsConfig;
-#[cfg(feature = "storage-gcs")]
-use opendal::services::GcsConfig;
-#[cfg(feature = "storage-oss")]
-use opendal::services::OssConfig;
-#[cfg(feature = "storage-s3")]
-use opendal::services::S3Config;
-#[cfg(feature = "storage-s3")]
-pub use s3::CustomAwsCredentialLoader;
 use serde::{Deserialize, Serialize};
+use utils::from_opendal_error;
 
-use crate::io::{
-    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, 
StorageConfig,
-    StorageFactory,
-};
-use crate::{Error, ErrorKind, Result};
-
-#[cfg(feature = "storage-azdls")]
-mod azdls;
-#[cfg(feature = "storage-fs")]
-mod fs;
-#[cfg(feature = "storage-gcs")]
-mod gcs;
-#[cfg(feature = "storage-memory")]
-mod memory;
-#[cfg(feature = "storage-oss")]
-mod oss;
-#[cfg(feature = "storage-s3")]
-mod s3;
-
-#[cfg(feature = "storage-azdls")]
-use azdls::*;
-#[cfg(feature = "storage-fs")]
-use fs::*;
-#[cfg(feature = "storage-gcs")]
-use gcs::*;
-#[cfg(feature = "storage-memory")]
-use memory::*;
-#[cfg(feature = "storage-oss")]
-use oss::*;
-#[cfg(feature = "storage-s3")]
-pub use s3::*;
+cfg_if! {
+    if #[cfg(feature = "opendal-azdls")] {
+        mod azdls;
+        use azdls::AzureStorageScheme;
+        use azdls::*;
+        use opendal::services::AzdlsConfig;
+    }
+}
+
+cfg_if! {
+    if #[cfg(feature = "opendal-fs")] {
+        mod fs;
+        use fs::*;
+    }
+}
+
+cfg_if! {
+    if #[cfg(feature = "opendal-gcs")] {
+        mod gcs;
+        use gcs::*;
+        use opendal::services::GcsConfig;
+    }
+}
+
+cfg_if! {
+    if #[cfg(feature = "opendal-memory")] {
+        mod memory;
+        use memory::*;
+    }
+}
+
+cfg_if! {
+    if #[cfg(feature = "opendal-oss")] {
+        mod oss;
+        use opendal::services::OssConfig;
+        use oss::*;
+    }
+}
+
+cfg_if! {
+    if #[cfg(feature = "opendal-s3")] {
+        mod s3;
+        use opendal::services::S3Config;
+        pub use s3::*;
+    }
+}
 
 /// OpenDAL-based storage factory.
 ///
@@ -77,29 +93,29 @@ pub use s3::*;
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub enum OpenDalStorageFactory {
     /// Memory storage factory.
-    #[cfg(feature = "storage-memory")]
+    #[cfg(feature = "opendal-memory")]
     Memory,
     /// Local filesystem storage factory.
-    #[cfg(feature = "storage-fs")]
+    #[cfg(feature = "opendal-fs")]
     Fs,
     /// S3 storage factory.
-    #[cfg(feature = "storage-s3")]
+    #[cfg(feature = "opendal-s3")]
     S3 {
         /// s3 storage could have `s3://` and `s3a://`.
         /// Storing the scheme string here to return the correct path.
         configured_scheme: String,
         /// Custom AWS credential loader.
         #[serde(skip)]
-        customized_credential_load: Option<CustomAwsCredentialLoader>,
+        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
     },
     /// GCS storage factory.
-    #[cfg(feature = "storage-gcs")]
+    #[cfg(feature = "opendal-gcs")]
     Gcs,
     /// OSS storage factory.
-    #[cfg(feature = "storage-oss")]
+    #[cfg(feature = "opendal-oss")]
     Oss,
     /// Azure Data Lake Storage factory.
-    #[cfg(feature = "storage-azdls")]
+    #[cfg(feature = "opendal-azdls")]
     Azdls {
         /// The configured Azure storage scheme.
         configured_scheme: AzureStorageScheme,
@@ -111,13 +127,13 @@ impl StorageFactory for OpenDalStorageFactory {
     #[allow(unused_variables)]
     fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
         match self {
-            #[cfg(feature = "storage-memory")]
+            #[cfg(feature = "opendal-memory")]
             OpenDalStorageFactory::Memory => {
                 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
             }
-            #[cfg(feature = "storage-fs")]
+            #[cfg(feature = "opendal-fs")]
             OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
-            #[cfg(feature = "storage-s3")]
+            #[cfg(feature = "opendal-s3")]
             OpenDalStorageFactory::S3 {
                 configured_scheme,
                 customized_credential_load,
@@ -126,15 +142,15 @@ impl StorageFactory for OpenDalStorageFactory {
                 config: s3_config_parse(config.props().clone())?.into(),
                 customized_credential_load: customized_credential_load.clone(),
             })),
-            #[cfg(feature = "storage-gcs")]
+            #[cfg(feature = "opendal-gcs")]
             OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
                 config: gcs_config_parse(config.props().clone())?.into(),
             })),
-            #[cfg(feature = "storage-oss")]
+            #[cfg(feature = "opendal-oss")]
             OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
                 config: oss_config_parse(config.props().clone())?.into(),
             })),
-            #[cfg(feature = "storage-azdls")]
+            #[cfg(feature = "opendal-azdls")]
             OpenDalStorageFactory::Azdls { configured_scheme } => {
                 Ok(Arc::new(OpenDalStorage::Azdls {
                     configured_scheme: configured_scheme.clone(),
@@ -142,12 +158,12 @@ impl StorageFactory for OpenDalStorageFactory {
                 }))
             }
             #[cfg(all(
-                not(feature = "storage-memory"),
-                not(feature = "storage-fs"),
-                not(feature = "storage-s3"),
-                not(feature = "storage-gcs"),
-                not(feature = "storage-oss"),
-                not(feature = "storage-azdls"),
+                not(feature = "opendal-memory"),
+                not(feature = "opendal-fs"),
+                not(feature = "opendal-s3"),
+                not(feature = "opendal-gcs"),
+                not(feature = "opendal-oss"),
+                not(feature = "opendal-azdls"),
             ))]
             _ => Err(Error::new(
                 ErrorKind::FeatureUnsupported,
@@ -158,7 +174,7 @@ impl StorageFactory for OpenDalStorageFactory {
 }
 
 /// Default memory operator for serde deserialization.
-#[cfg(feature = "storage-memory")]
+#[cfg(feature = "opendal-memory")]
 fn default_memory_operator() -> Operator {
     memory_config_build().expect("Failed to create default memory operator")
 }
@@ -167,13 +183,13 @@ fn default_memory_operator() -> Operator {
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub enum OpenDalStorage {
     /// Memory storage variant.
-    #[cfg(feature = "storage-memory")]
+    #[cfg(feature = "opendal-memory")]
     Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
     /// Local filesystem storage variant.
-    #[cfg(feature = "storage-fs")]
+    #[cfg(feature = "opendal-fs")]
     LocalFs,
     /// S3 storage variant.
-    #[cfg(feature = "storage-s3")]
+    #[cfg(feature = "opendal-s3")]
     S3 {
         /// s3 storage could have `s3://` and `s3a://`.
         /// Storing the scheme string here to return the correct path.
@@ -182,16 +198,16 @@ pub enum OpenDalStorage {
         config: Arc<S3Config>,
         /// Custom AWS credential loader.
         #[serde(skip)]
-        customized_credential_load: Option<CustomAwsCredentialLoader>,
+        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
     },
     /// GCS storage variant.
-    #[cfg(feature = "storage-gcs")]
+    #[cfg(feature = "opendal-gcs")]
     Gcs {
         /// GCS configuration.
         config: Arc<GcsConfig>,
     },
     /// OSS storage variant.
-    #[cfg(feature = "storage-oss")]
+    #[cfg(feature = "opendal-oss")]
     Oss {
         /// OSS configuration.
         config: Arc<OssConfig>,
@@ -200,7 +216,7 @@ pub enum OpenDalStorage {
     /// Expects paths of the form
     /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
     /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
-    #[cfg(feature = "storage-azdls")]
+    #[cfg(feature = "opendal-azdls")]
     #[allow(private_interfaces)]
     Azdls {
         /// The configured Azure storage scheme.
@@ -217,7 +233,7 @@ impl OpenDalStorage {
     ///
     /// # Arguments
     ///
-    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
+    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`](iceberg::io::FileIO).
     ///
     /// # Returns
     ///
@@ -232,7 +248,7 @@ impl OpenDalStorage {
     ) -> Result<(Operator, &'a str)> {
         let path = path.as_ref();
         let (operator, relative_path): (Operator, &str) = match self {
-            #[cfg(feature = "storage-memory")]
+            #[cfg(feature = "opendal-memory")]
             OpenDalStorage::Memory(op) => {
                 if let Some(stripped) = path.strip_prefix("memory:/") {
                     (op.clone(), stripped)
@@ -240,7 +256,7 @@ impl OpenDalStorage {
                     (op.clone(), &path[1..])
                 }
             }
-            #[cfg(feature = "storage-fs")]
+            #[cfg(feature = "opendal-fs")]
             OpenDalStorage::LocalFs => {
                 let op = fs_config_build()?;
                 if let Some(stripped) = path.strip_prefix("file:/") {
@@ -249,7 +265,7 @@ impl OpenDalStorage {
                     (op, &path[1..])
                 }
             }
-            #[cfg(feature = "storage-s3")]
+            #[cfg(feature = "opendal-s3")]
             OpenDalStorage::S3 {
                 configured_scheme,
                 config,
@@ -269,7 +285,7 @@ impl OpenDalStorage {
                     ));
                 }
             }
-            #[cfg(feature = "storage-gcs")]
+            #[cfg(feature = "opendal-gcs")]
             OpenDalStorage::Gcs { config } => {
                 let operator = gcs_config_build(config, path)?;
                 let prefix = format!("gs://{}/", operator.info().name());
@@ -282,7 +298,7 @@ impl OpenDalStorage {
                     ));
                 }
             }
-            #[cfg(feature = "storage-oss")]
+            #[cfg(feature = "opendal-oss")]
             OpenDalStorage::Oss { config } => {
                 let op = oss_config_build(config, path)?;
                 let prefix = format!("oss://{}/", op.info().name());
@@ -295,17 +311,17 @@ impl OpenDalStorage {
                     ));
                 }
             }
-            #[cfg(feature = "storage-azdls")]
+            #[cfg(feature = "opendal-azdls")]
             OpenDalStorage::Azdls {
                 configured_scheme,
                 config,
             } => azdls_create_operator(path, config, configured_scheme)?,
             #[cfg(all(
-                not(feature = "storage-s3"),
-                not(feature = "storage-fs"),
-                not(feature = "storage-gcs"),
-                not(feature = "storage-oss"),
-                not(feature = "storage-azdls"),
+                not(feature = "opendal-s3"),
+                not(feature = "opendal-fs"),
+                not(feature = "opendal-gcs"),
+                not(feature = "opendal-oss"),
+                not(feature = "opendal-azdls"),
             ))]
             _ => {
                 return Err(Error::new(
@@ -327,12 +343,12 @@ impl OpenDalStorage {
 impl Storage for OpenDalStorage {
     async fn exists(&self, path: &str) -> Result<bool> {
         let (op, relative_path) = self.create_operator(&path)?;
-        Ok(op.exists(relative_path).await?)
+        Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
     }
 
     async fn metadata(&self, path: &str) -> Result<FileMetadata> {
         let (op, relative_path) = self.create_operator(&path)?;
-        let meta = op.stat(relative_path).await?;
+        let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
         Ok(FileMetadata {
             size: meta.content_length(),
         })
@@ -340,28 +356,38 @@ impl Storage for OpenDalStorage {
 
     async fn read(&self, path: &str) -> Result<Bytes> {
         let (op, relative_path) = self.create_operator(&path)?;
-        Ok(op.read(relative_path).await?.to_bytes())
+        Ok(op
+            .read(relative_path)
+            .await
+            .map_err(from_opendal_error)?
+            .to_bytes())
     }
 
     async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
         let (op, relative_path) = self.create_operator(&path)?;
-        Ok(Box::new(op.reader(relative_path).await?))
+        Ok(Box::new(OpenDalReader(
+            op.reader(relative_path).await.map_err(from_opendal_error)?,
+        )))
     }
 
     async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
         let (op, relative_path) = self.create_operator(&path)?;
-        op.write(relative_path, bs).await?;
+        op.write(relative_path, bs)
+            .await
+            .map_err(from_opendal_error)?;
         Ok(())
     }
 
     async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
         let (op, relative_path) = self.create_operator(&path)?;
-        Ok(Box::new(op.writer(relative_path).await?))
+        Ok(Box::new(OpenDalWriter(
+            op.writer(relative_path).await.map_err(from_opendal_error)?,
+        )))
     }
 
     async fn delete(&self, path: &str) -> Result<()> {
         let (op, relative_path) = self.create_operator(&path)?;
-        Ok(op.delete(relative_path).await?)
+        Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
     }
 
     async fn delete_prefix(&self, path: &str) -> Result<()> {
@@ -371,7 +397,7 @@ impl Storage for OpenDalStorage {
         } else {
             format!("{relative_path}/")
         };
-        Ok(op.remove_all(&path).await?)
+        Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
     }
 
     #[allow(unreachable_code, unused_variables)]
@@ -385,23 +411,38 @@ impl Storage for OpenDalStorage {
     }
 }
 
-// OpenDAL implementations for FileRead and FileWrite traits
+// Newtype wrappers for opendal types to satisfy orphan rules.
+// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's
+// Reader/Writer since neither trait nor type is defined in this crate.
+
+/// Wrapper around `opendal::Reader` that implements `FileRead`.
+pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
 
 #[async_trait]
-impl FileRead for opendal::Reader {
-    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
-        Ok(opendal::Reader::read(self, range).await?.to_bytes())
+impl FileRead for OpenDalReader {
+    async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
+        Ok(opendal::Reader::read(&self.0, range)
+            .await
+            .map_err(from_opendal_error)?
+            .to_bytes())
     }
 }
 
+/// Wrapper around `opendal::Writer` that implements `FileWrite`.
+pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
+
 #[async_trait]
-impl FileWrite for opendal::Writer {
+impl FileWrite for OpenDalWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        Ok(opendal::Writer::write(self, bs).await?)
+        Ok(opendal::Writer::write(&mut self.0, bs)
+            .await
+            .map_err(from_opendal_error)?)
     }
 
     async fn close(&mut self) -> Result<()> {
-        let _ = opendal::Writer::close(self).await?;
+        let _ = opendal::Writer::close(&mut self.0)
+            .await
+            .map_err(from_opendal_error)?;
         Ok(())
     }
 }
@@ -410,7 +451,7 @@ impl FileWrite for opendal::Writer {
 mod tests {
     use super::*;
 
-    #[cfg(feature = "storage-memory")]
+    #[cfg(feature = "opendal-memory")]
     #[test]
     fn test_default_memory_operator() {
         let op = default_memory_operator();
diff --git a/crates/iceberg/src/io/storage/opendal/memory.rs 
b/crates/storage/opendal/src/memory.rs
similarity index 84%
rename from crates/iceberg/src/io/storage/opendal/memory.rs
rename to crates/storage/opendal/src/memory.rs
index b8023717b..5957d0e84 100644
--- a/crates/iceberg/src/io/storage/opendal/memory.rs
+++ b/crates/storage/opendal/src/memory.rs
@@ -15,11 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iceberg::Result;
 use opendal::Operator;
 use opendal::services::MemoryConfig;
 
-use crate::Result;
+use crate::utils::from_opendal_error;
 
 pub(crate) fn memory_config_build() -> Result<Operator> {
-    Ok(Operator::from_config(MemoryConfig::default())?.finish())
+    Ok(Operator::from_config(MemoryConfig::default())
+        .map_err(from_opendal_error)?
+        .finish())
 }
diff --git a/crates/iceberg/src/io/storage/opendal/oss.rs 
b/crates/storage/opendal/src/oss.rs
similarity index 89%
rename from crates/iceberg/src/io/storage/opendal/oss.rs
rename to crates/storage/opendal/src/oss.rs
index 98b68f7cd..add8b7a0f 100644
--- a/crates/iceberg/src/io/storage/opendal/oss.rs
+++ b/crates/storage/opendal/src/oss.rs
@@ -17,12 +17,13 @@
 
 use std::collections::HashMap;
 
+use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT};
+use iceberg::{Error, ErrorKind, Result};
 use opendal::services::OssConfig;
 use opendal::{Configurator, Operator};
 use url::Url;
 
-use crate::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT};
-use crate::{Error, ErrorKind, Result};
+use crate::utils::from_opendal_error;
 
 /// Parse iceberg props to oss config.
 pub(crate) fn oss_config_parse(mut m: HashMap<String, String>) -> 
Result<OssConfig> {
@@ -52,5 +53,5 @@ pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) 
-> Result<Operator>
 
     let builder = cfg.clone().into_builder().bucket(bucket);
 
-    Ok(Operator::new(builder)?.finish())
+    Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
 }
diff --git a/crates/iceberg/src/io/storage/opendal/s3.rs 
b/crates/storage/opendal/src/s3.rs
similarity index 96%
rename from crates/iceberg/src/io/storage/opendal/s3.rs
rename to crates/storage/opendal/src/s3.rs
index 6b4d64e3a..7db88d273 100644
--- a/crates/iceberg/src/io/storage/opendal/s3.rs
+++ b/crates/storage/opendal/src/s3.rs
@@ -19,19 +19,20 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use iceberg::io::{
+    CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
+    S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, 
S3_DISABLE_CONFIG_LOAD,
+    S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, 
S3_SECRET_ACCESS_KEY,
+    S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE,
+};
+use iceberg::{Error, ErrorKind, Result};
 use opendal::services::S3Config;
 use opendal::{Configurator, Operator};
 pub use reqsign::{AwsCredential, AwsCredentialLoad};
 use reqwest::Client;
 use url::Url;
 
-use crate::io::{
-    CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN,
-    S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, 
S3_DISABLE_CONFIG_LOAD,
-    S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, 
S3_SECRET_ACCESS_KEY,
-    S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, is_truthy,
-};
-use crate::{Error, ErrorKind, Result};
+use crate::utils::{from_opendal_error, is_truthy};
 
 /// Parse iceberg props to s3 config.
 pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> 
Result<S3Config> {
@@ -141,7 +142,7 @@ pub(crate) fn s3_config_build(
             
.customized_credential_load(customized_credential_load.clone().into_opendal_loader());
     }
 
-    Ok(Operator::new(builder)?.finish())
+    Ok(Operator::new(builder).map_err(from_opendal_error)?.finish())
 }
 
 /// Custom AWS credential loader.
diff --git a/crates/iceberg/src/io/storage/opendal/fs.rs 
b/crates/storage/opendal/src/utils.rs
similarity index 68%
rename from crates/iceberg/src/io/storage/opendal/fs.rs
rename to crates/storage/opendal/src/utils.rs
index d3e121a08..56f8c1805 100644
--- a/crates/iceberg/src/io/storage/opendal/fs.rs
+++ b/crates/storage/opendal/src/utils.rs
@@ -15,15 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use opendal::Operator;
-use opendal::services::FsConfig;
-
-use crate::Result;
-
-/// Build new opendal operator from give path.
-pub(crate) fn fs_config_build() -> Result<Operator> {
-    let mut cfg = FsConfig::default();
-    cfg.root = Some("/".to_string());
+pub(crate) fn is_truthy(value: &str) -> bool {
+    ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
+}
 
-    Ok(Operator::from_config(cfg)?.finish())
+/// Convert an opendal error into an iceberg error.
+pub(crate) fn from_opendal_error(e: opendal::Error) -> iceberg::Error {
+    iceberg::Error::new(
+        iceberg::ErrorKind::Unexpected,
+        "Failure in doing io operation",
+    )
+    .with_source(e)
 }
diff --git a/crates/iceberg/tests/file_io_gcs_test.rs 
b/crates/storage/opendal/tests/file_io_gcs_test.rs
similarity index 95%
rename from crates/iceberg/tests/file_io_gcs_test.rs
rename to crates/storage/opendal/tests/file_io_gcs_test.rs
index 75bc9fae1..5e0449113 100644
--- a/crates/iceberg/tests/file_io_gcs_test.rs
+++ b/crates/storage/opendal/tests/file_io_gcs_test.rs
@@ -19,15 +19,14 @@
 //!
 //! These tests assume Docker containers are started externally via `make 
docker-up`.
 
-#[cfg(all(test, feature = "storage-gcs"))]
+#[cfg(feature = "opendal-gcs")]
 mod tests {
     use std::collections::HashMap;
     use std::sync::Arc;
 
     use bytes::Bytes;
-    use iceberg::io::{
-        FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, 
OpenDalStorageFactory,
-    };
+    use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH};
+    use iceberg_storage_opendal::OpenDalStorageFactory;
     use iceberg_test_utils::{get_gcs_endpoint, set_up};
 
     static FAKE_GCS_BUCKET: &str = "test-bucket";
diff --git a/crates/iceberg/tests/file_io_s3_test.rs 
b/crates/storage/opendal/tests/file_io_s3_test.rs
similarity index 97%
rename from crates/iceberg/tests/file_io_s3_test.rs
rename to crates/storage/opendal/tests/file_io_s3_test.rs
index 1c029cdbc..5801af060 100644
--- a/crates/iceberg/tests/file_io_s3_test.rs
+++ b/crates/storage/opendal/tests/file_io_s3_test.rs
@@ -19,15 +19,15 @@
 //!
 //! These tests assume Docker containers are started externally via `make 
docker-up`.
 //! Each test uses unique file paths based on module path to avoid conflicts.
-#[cfg(all(test, feature = "storage-s3"))]
+#[cfg(feature = "opendal-s3")]
 mod tests {
     use std::sync::Arc;
 
     use async_trait::async_trait;
     use iceberg::io::{
-        CustomAwsCredentialLoader, FileIO, FileIOBuilder, 
OpenDalStorageFactory, S3_ACCESS_KEY_ID,
-        S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
+        FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY,
     };
+    use iceberg_storage_opendal::{CustomAwsCredentialLoader, 
OpenDalStorageFactory};
     use iceberg_test_utils::{get_minio_endpoint, 
normalize_test_name_with_parts, set_up};
     use reqsign::{AwsCredential, AwsCredentialLoad};
     use reqwest::Client;

Reply via email to