This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c3e5f0ed feat!(io): Implement Storage for OpenDal (#2080)
0c3e5f0ed is described below

commit 0c3e5f0ed7e00857d751581f46c787e13f2298cf
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Feb 2 16:52:25 2026 -0800

    feat!(io): Implement Storage for OpenDal (#2080)
    
    ## Which issue does this PR close?
    
    - Closes #2083
    
    ## What changes are included in this PR?
    Wired `OpenDal` with `Storage` trait. `FileIO` still uses hardcoded
    `OpenDal` as of now so no customer facing behavior changes
    
    - Implement Storage for OpenDal
    - Added OpenDalFactory and implement StorageFactory (unused for now)
    - Updated Input/OutputFile to delegate operations to storage
    - Updated FileIO to delegate operations to inner storage
    
    
    ## Are these changes tested?
    No new tests added, rely on existing tests
---
 crates/iceberg/src/arrow/reader.rs                 |  12 +-
 crates/iceberg/src/io/file_io.rs                   | 131 ++----
 crates/iceberg/src/io/mod.rs                       |  31 +-
 .../src/io/{storage_azdls.rs => opendal/azdls.rs}  |   8 +-
 .../src/io/{storage_fs.rs => opendal/fs.rs}        |   0
 .../src/io/{storage_gcs.rs => opendal/gcs.rs}      |   0
 .../io/{storage_memory.rs => opendal/memory.rs}    |   0
 crates/iceberg/src/io/opendal/mod.rs               | 474 +++++++++++++++++++++
 .../src/io/{storage_oss.rs => opendal/oss.rs}      |   0
 .../src/io/{storage_s3.rs => opendal/s3.rs}        |   0
 crates/iceberg/src/io/storage.rs                   | 223 +---------
 crates/iceberg/src/puffin/metadata.rs              |  11 +-
 crates/iceberg/src/puffin/reader.rs                |   2 +-
 .../src/writer/file_writer/parquet_writer.rs       |  10 +-
 14 files changed, 543 insertions(+), 359 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index aa45a1297..5b3a7bb86 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -474,7 +474,7 @@ impl ArrowReader {
         file_io: FileIO,
         should_load_page_index: bool,
         arrow_reader_options: Option<ArrowReaderOptions>,
-    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead 
+ Sized>>> {
+    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
         // Get the metadata for the Parquet file we need to read and build
         // a reader for the data within
         let parquet_file = file_io.new_input(data_file_path)?;
@@ -1673,18 +1673,18 @@ impl BoundPredicateVisitor for PredicateConverter<'_> {
 }
 
 /// ArrowFileReader is a wrapper around a FileRead that impls parquets 
AsyncFileReader.
-pub struct ArrowFileReader<R: FileRead> {
+pub struct ArrowFileReader {
     meta: FileMetadata,
     preload_column_index: bool,
     preload_offset_index: bool,
     preload_page_index: bool,
     metadata_size_hint: Option<usize>,
-    r: R,
+    r: Box<dyn FileRead>,
 }
 
-impl<R: FileRead> ArrowFileReader<R> {
+impl ArrowFileReader {
     /// Create a new ArrowFileReader
-    pub fn new(meta: FileMetadata, r: R) -> Self {
+    pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
         Self {
             meta,
             preload_column_index: false,
@@ -1723,7 +1723,7 @@ impl<R: FileRead> ArrowFileReader<R> {
     }
 }
 
-impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
+impl AsyncFileReader for ArrowFileReader {
     fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
parquet::errors::Result<Bytes>> {
         Box::pin(
             self.r
diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs
index ef9f8aa93..1ad71d853 100644
--- a/crates/iceberg/src/io/file_io.rs
+++ b/crates/iceberg/src/io/file_io.rs
@@ -21,10 +21,10 @@ use std::ops::Range;
 use std::sync::Arc;
 
 use bytes::Bytes;
-use opendal::Operator;
 use url::Url;
 
-use super::storage::OpenDalStorage;
+use super::opendal::OpenDalStorage;
+use super::storage::Storage;
 use crate::{Error, ErrorKind, Result};
 
 /// FileIO implementation, used to manipulate files in underlying storage.
@@ -89,8 +89,7 @@ impl FileIO {
     ///
     /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
     pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
-        let (op, relative_path) = self.inner.create_operator(&path)?;
-        Ok(op.delete(relative_path).await?)
+        self.inner.delete(path.as_ref()).await
     }
 
     /// Remove the path and all nested dirs and files recursively.
@@ -104,14 +103,8 @@ impl FileIO {
     /// - If the path is a file or not exist, this function will be no-op.
     /// - If the path is a empty directory, this function will remove the 
directory itself.
     /// - If the path is a non-empty directory, this function will remove the 
directory and all nested files and directories.
-    pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
-        let (op, relative_path) = self.inner.create_operator(&path)?;
-        let path = if relative_path.ends_with('/') {
-            relative_path.to_string()
-        } else {
-            format!("{relative_path}/")
-        };
-        Ok(op.remove_all(&path).await?)
+    pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
+        self.inner.delete_prefix(path.as_ref()).await
     }
 
     /// Check file exists.
@@ -120,8 +113,7 @@ impl FileIO {
     ///
     /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
     pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
-        let (op, relative_path) = self.inner.create_operator(&path)?;
-        Ok(op.exists(relative_path).await?)
+        self.inner.exists(path.as_ref()).await
     }
 
     /// Creates input file.
@@ -130,14 +122,7 @@ impl FileIO {
     ///
     /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
     pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
-        let (op, relative_path) = self.inner.create_operator(&path)?;
-        let path = path.as_ref().to_string();
-        let relative_path_pos = path.len() - relative_path.len();
-        Ok(InputFile {
-            op,
-            path,
-            relative_path_pos,
-        })
+        self.inner.new_input(path.as_ref())
     }
 
     /// Creates output file.
@@ -146,14 +131,7 @@ impl FileIO {
     ///
     /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
     pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
-        let (op, relative_path) = self.inner.create_operator(&path)?;
-        let path = path.as_ref().to_string();
-        let relative_path_pos = path.len() - relative_path.len();
-        Ok(OutputFile {
-            op,
-            path,
-            relative_path_pos,
-        })
+        self.inner.new_output(path.as_ref())
     }
 }
 
@@ -291,24 +269,20 @@ pub trait FileRead: Send + Sync + Unpin + 'static {
     async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
 }
 
-#[async_trait::async_trait]
-impl FileRead for opendal::Reader {
-    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
-        Ok(opendal::Reader::read(self, range).await?.to_bytes())
-    }
-}
-
 /// Input file is used for reading from files.
 #[derive(Debug)]
 pub struct InputFile {
-    op: Operator,
-    // Absolution path of file.
+    storage: Arc<dyn Storage>,
+    // Absolute path of file.
     path: String,
-    // Relative path of file to uri, starts at [`relative_path_pos`]
-    relative_path_pos: usize,
 }
 
 impl InputFile {
+    /// Creates a new input file.
+    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
+        Self { storage, path }
+    }
+
     /// Absolute path to root uri.
     pub fn location(&self) -> &str {
         &self.path
@@ -316,34 +290,26 @@ impl InputFile {
 
     /// Check if file exists.
     pub async fn exists(&self) -> crate::Result<bool> {
-        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
+        self.storage.exists(&self.path).await
     }
 
     /// Fetch and returns metadata of file.
     pub async fn metadata(&self) -> crate::Result<FileMetadata> {
-        let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
-
-        Ok(FileMetadata {
-            size: meta.content_length(),
-        })
+        self.storage.metadata(&self.path).await
     }
 
     /// Read and returns whole content of file.
     ///
     /// For continuous reading, use [`Self::reader`] instead.
     pub async fn read(&self) -> crate::Result<Bytes> {
-        Ok(self
-            .op
-            .read(&self.path[self.relative_path_pos..])
-            .await?
-            .to_bytes())
+        self.storage.read(&self.path).await
     }
 
     /// Creates [`FileRead`] for continuous reading.
     ///
     /// For one-time reading, use [`Self::read`] instead.
-    pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
-        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
+    pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
+        self.storage.reader(&self.path).await
     }
 }
 
@@ -366,40 +332,20 @@ pub trait FileWrite: Send + Unpin + 'static {
     async fn close(&mut self) -> crate::Result<()>;
 }
 
-#[async_trait::async_trait]
-impl FileWrite for opendal::Writer {
-    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
-        Ok(opendal::Writer::write(self, bs).await?)
-    }
-
-    async fn close(&mut self) -> crate::Result<()> {
-        let _ = opendal::Writer::close(self).await?;
-        Ok(())
-    }
-}
-
-#[async_trait::async_trait]
-impl FileWrite for Box<dyn FileWrite> {
-    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
-        self.as_mut().write(bs).await
-    }
-
-    async fn close(&mut self) -> crate::Result<()> {
-        self.as_mut().close().await
-    }
-}
-
 /// Output file is used for writing to files..
 #[derive(Debug)]
 pub struct OutputFile {
-    op: Operator,
-    // Absolution path of file.
+    storage: Arc<dyn Storage>,
+    // Absolute path of file.
     path: String,
-    // Relative path of file to uri, starts at [`relative_path_pos`]
-    relative_path_pos: usize,
 }
 
 impl OutputFile {
+    /// Creates a new output file.
+    pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
+        Self { storage, path }
+    }
+
     /// Relative path to root uri.
     pub fn location(&self) -> &str {
         &self.path
@@ -407,22 +353,21 @@ impl OutputFile {
 
     /// Checks if file exists.
     pub async fn exists(&self) -> Result<bool> {
-        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
+        self.storage.exists(&self.path).await
     }
 
     /// Deletes file.
     ///
     /// If the file does not exist, it will not return error.
     pub async fn delete(&self) -> Result<()> {
-        Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
+        self.storage.delete(&self.path).await
     }
 
     /// Converts into [`InputFile`].
     pub fn to_input_file(self) -> InputFile {
         InputFile {
-            op: self.op,
+            storage: self.storage,
             path: self.path,
-            relative_path_pos: self.relative_path_pos,
         }
     }
 
@@ -433,9 +378,7 @@ impl OutputFile {
     /// Calling `write` will overwrite the file if it exists.
     /// For continuous writing, use [`Self::writer`].
     pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
-        let mut writer = self.writer().await?;
-        writer.write(bs).await?;
-        writer.close().await
+        self.storage.write(&self.path, bs).await
     }
 
     /// Creates output file for continuous writing.
@@ -444,9 +387,7 @@ impl OutputFile {
     ///
     /// For one-time writing, use [`Self::write`] instead.
     pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
-        Ok(Box::new(
-            self.op.writer(&self.path[self.relative_path_pos..]).await?,
-        ))
+        self.storage.writer(&self.path).await
     }
 }
 
@@ -517,14 +458,14 @@ mod tests {
         assert!(file_io.exists(&a_path).await.unwrap());
 
         // Remove a file should be no-op.
-        file_io.remove_dir_all(&a_path).await.unwrap();
+        file_io.delete_prefix(&a_path).await.unwrap();
         assert!(file_io.exists(&a_path).await.unwrap());
 
         // Remove a not exist dir should be no-op.
-        file_io.remove_dir_all("not_exists/").await.unwrap();
+        file_io.delete_prefix("not_exists/").await.unwrap();
 
         // Remove a dir should remove all files in it.
-        file_io.remove_dir_all(&sub_dir_path).await.unwrap();
+        file_io.delete_prefix(&sub_dir_path).await.unwrap();
         assert!(!file_io.exists(&b_path).await.unwrap());
         assert!(!file_io.exists(&c_path).await.unwrap());
         assert!(file_io.exists(&a_path).await.unwrap());
@@ -543,7 +484,7 @@ mod tests {
         let file_io = create_local_file_io();
         assert!(!file_io.exists(&full_path).await.unwrap());
         assert!(file_io.delete(&full_path).await.is_ok());
-        assert!(file_io.remove_dir_all(&full_path).await.is_ok());
+        assert!(file_io.delete_prefix(&full_path).await.is_ok());
     }
 
     #[tokio::test]
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 8e40bae97..888b868e3 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -68,38 +68,17 @@
 
 mod config;
 mod file_io;
+mod opendal;
 mod storage;
 
 pub use config::*;
 pub use file_io::*;
-pub use storage::{Storage, StorageFactory};
-pub(crate) mod object_cache;
-
-#[cfg(feature = "storage-azdls")]
-mod storage_azdls;
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-gcs")]
-mod storage_gcs;
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
 #[cfg(feature = "storage-s3")]
-mod storage_s3;
+pub use opendal::CustomAwsCredentialLoader;
+pub use opendal::{OpenDalStorage, OpenDalStorageFactory};
+pub use storage::{Storage, StorageConfig, StorageFactory};
 
-#[cfg(feature = "storage-azdls")]
-use storage_azdls::*;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
-#[cfg(feature = "storage-gcs")]
-use storage_gcs::*;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-#[cfg(feature = "storage-s3")]
-pub use storage_s3::*;
+pub(crate) mod object_cache;
 
 pub(crate) fn is_truthy(value: &str) -> bool {
     ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
diff --git a/crates/iceberg/src/io/storage_azdls.rs 
b/crates/iceberg/src/io/opendal/azdls.rs
similarity index 98%
rename from crates/iceberg/src/io/storage_azdls.rs
rename to crates/iceberg/src/io/opendal/azdls.rs
index ce2b7427b..c957fd62a 100644
--- a/crates/iceberg/src/io/storage_azdls.rs
+++ b/crates/iceberg/src/io/opendal/azdls.rs
@@ -21,6 +21,7 @@ use std::str::FromStr;
 
 use opendal::Configurator;
 use opendal::services::AzdlsConfig;
+use serde::{Deserialize, Serialize};
 use url::Url;
 
 use crate::io::config::{
@@ -100,8 +101,8 @@ pub(crate) fn azdls_create_operator<'a>(
 ///   paths are expected to contain the `dfs` storage service.
 /// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are
 ///   expected to contain the `blob` storage service.
-#[derive(Debug, PartialEq)]
-pub(crate) enum AzureStorageScheme {
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub enum AzureStorageScheme {
     Abfs,
     Abfss,
     Wasb,
@@ -320,8 +321,7 @@ mod tests {
 
     use opendal::services::AzdlsConfig;
 
-    use super::{AzureStoragePath, AzureStorageScheme, azdls_create_operator};
-    use crate::io::azdls_config_parse;
+    use super::{AzureStoragePath, AzureStorageScheme, azdls_config_parse, 
azdls_create_operator};
 
     #[test]
     fn test_azdls_config_parse() {
diff --git a/crates/iceberg/src/io/storage_fs.rs 
b/crates/iceberg/src/io/opendal/fs.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_fs.rs
rename to crates/iceberg/src/io/opendal/fs.rs
diff --git a/crates/iceberg/src/io/storage_gcs.rs 
b/crates/iceberg/src/io/opendal/gcs.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_gcs.rs
rename to crates/iceberg/src/io/opendal/gcs.rs
diff --git a/crates/iceberg/src/io/storage_memory.rs 
b/crates/iceberg/src/io/opendal/memory.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_memory.rs
rename to crates/iceberg/src/io/opendal/memory.rs
diff --git a/crates/iceberg/src/io/opendal/mod.rs 
b/crates/iceberg/src/io/opendal/mod.rs
new file mode 100644
index 000000000..fb49dc9e3
--- /dev/null
+++ b/crates/iceberg/src/io/opendal/mod.rs
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! OpenDAL-based storage implementation.
+
+use std::ops::Range;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+#[cfg(feature = "storage-azdls")]
+use azdls::AzureStorageScheme;
+use bytes::Bytes;
+use opendal::layers::RetryLayer;
+#[cfg(feature = "storage-azdls")]
+use opendal::services::AzdlsConfig;
+#[cfg(feature = "storage-gcs")]
+use opendal::services::GcsConfig;
+#[cfg(feature = "storage-oss")]
+use opendal::services::OssConfig;
+#[cfg(feature = "storage-s3")]
+use opendal::services::S3Config;
+use opendal::{Operator, Scheme};
+#[cfg(feature = "storage-s3")]
+pub use s3::CustomAwsCredentialLoader;
+use serde::{Deserialize, Serialize};
+
+use super::{
+    FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, 
Storage,
+    StorageConfig, StorageFactory,
+};
+use crate::{Error, ErrorKind, Result};
+
+#[cfg(feature = "storage-azdls")]
+mod azdls;
+#[cfg(feature = "storage-fs")]
+mod fs;
+#[cfg(feature = "storage-gcs")]
+mod gcs;
+#[cfg(feature = "storage-memory")]
+mod memory;
+#[cfg(feature = "storage-oss")]
+mod oss;
+#[cfg(feature = "storage-s3")]
+mod s3;
+
+#[cfg(feature = "storage-azdls")]
+use azdls::*;
+#[cfg(feature = "storage-fs")]
+use fs::*;
+#[cfg(feature = "storage-gcs")]
+use gcs::*;
+#[cfg(feature = "storage-memory")]
+use memory::*;
+#[cfg(feature = "storage-oss")]
+use oss::*;
+#[cfg(feature = "storage-s3")]
+pub use s3::*;
+
+/// OpenDAL-based storage factory.
+///
+/// Maps scheme to the corresponding OpenDalStorage storage variant.
+///
+/// TODO this is currently not used, we still use OpenDalStorage::build() for 
now
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum OpenDalStorageFactory {
+    /// Memory storage factory.
+    #[cfg(feature = "storage-memory")]
+    Memory,
+    /// Local filesystem storage factory.
+    #[cfg(feature = "storage-fs")]
+    Fs,
+    /// S3 storage factory.
+    #[cfg(feature = "storage-s3")]
+    S3 {
+        /// Custom AWS credential loader.
+        #[serde(skip)]
+        customized_credential_load: Option<CustomAwsCredentialLoader>,
+    },
+    /// GCS storage factory.
+    #[cfg(feature = "storage-gcs")]
+    Gcs,
+    /// OSS storage factory.
+    #[cfg(feature = "storage-oss")]
+    Oss,
+    /// Azure Data Lake Storage factory.
+    #[cfg(feature = "storage-azdls")]
+    Azdls {
+        /// The configured Azure storage scheme.
+        configured_scheme: AzureStorageScheme,
+    },
+}
+
+#[typetag::serde(name = "OpenDalStorageFactory")]
+impl StorageFactory for OpenDalStorageFactory {
+    #[allow(unused_variables)]
+    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+        match self {
+            #[cfg(feature = "storage-memory")]
+            OpenDalStorageFactory::Memory => {
+                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
+            }
+            #[cfg(feature = "storage-fs")]
+            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
+            #[cfg(feature = "storage-s3")]
+            OpenDalStorageFactory::S3 {
+                customized_credential_load,
+            } => Ok(Arc::new(OpenDalStorage::S3 {
+                configured_scheme: "s3".to_string(),
+                config: s3_config_parse(config.props().clone())?.into(),
+                customized_credential_load: customized_credential_load.clone(),
+            })),
+            #[cfg(feature = "storage-gcs")]
+            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
+                config: gcs_config_parse(config.props().clone())?.into(),
+            })),
+            #[cfg(feature = "storage-oss")]
+            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
+                config: oss_config_parse(config.props().clone())?.into(),
+            })),
+            #[cfg(feature = "storage-azdls")]
+            OpenDalStorageFactory::Azdls { configured_scheme } => {
+                Ok(Arc::new(OpenDalStorage::Azdls {
+                    configured_scheme: configured_scheme.clone(),
+                    config: azdls_config_parse(config.props().clone())?.into(),
+                }))
+            }
+            #[cfg(all(
+                not(feature = "storage-memory"),
+                not(feature = "storage-fs"),
+                not(feature = "storage-s3"),
+                not(feature = "storage-gcs"),
+                not(feature = "storage-oss"),
+                not(feature = "storage-azdls"),
+            ))]
+            _ => Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "No storage service has been enabled",
+            )),
+        }
+    }
+}
+
+/// Default memory operator for serde deserialization.
+#[cfg(feature = "storage-memory")]
+fn default_memory_operator() -> Operator {
+    memory_config_build().expect("Failed to create default memory operator")
+}
+
+/// OpenDAL-based storage implementation.
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum OpenDalStorage {
+    /// Memory storage variant.
+    #[cfg(feature = "storage-memory")]
+    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
+    /// Local filesystem storage variant.
+    #[cfg(feature = "storage-fs")]
+    LocalFs,
+    /// S3 storage variant.
+    #[cfg(feature = "storage-s3")]
+    S3 {
+        /// s3 storage could have `s3://` and `s3a://`.
+        /// Storing the scheme string here to return the correct path.
+        configured_scheme: String,
+        /// S3 configuration.
+        config: Arc<S3Config>,
+        /// Custom AWS credential loader.
+        #[serde(skip)]
+        customized_credential_load: Option<CustomAwsCredentialLoader>,
+    },
+    /// GCS storage variant.
+    #[cfg(feature = "storage-gcs")]
+    Gcs {
+        /// GCS configuration.
+        config: Arc<GcsConfig>,
+    },
+    /// OSS storage variant.
+    #[cfg(feature = "storage-oss")]
+    Oss {
+        /// OSS configuration.
+        config: Arc<OssConfig>,
+    },
+    /// Azure Data Lake Storage variant.
+    /// Expects paths of the form
+    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
+    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
+    #[cfg(feature = "storage-azdls")]
+    #[allow(private_interfaces)]
+    Azdls {
+        /// The configured Azure storage scheme.
+        /// Because Azdls accepts multiple possible schemes, we store the full
+        /// passed scheme here to later validate schemes passed via paths.
+        configured_scheme: AzureStorageScheme,
+        /// Azure DLS configuration.
+        config: Arc<AzdlsConfig>,
+    },
+}
+
+impl OpenDalStorage {
+    /// Convert iceberg config to opendal config.
+    ///
+    /// TODO Switch to use OpenDalStorageFactory::build()
+    pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
+        let (scheme_str, props, extensions) = file_io_builder.into_parts();
+        let _ = (&props, &extensions);
+        let scheme = Self::parse_scheme(&scheme_str)?;
+
+        match scheme {
+            #[cfg(feature = "storage-memory")]
+            Scheme::Memory => Ok(Self::Memory(memory_config_build()?)),
+            #[cfg(feature = "storage-fs")]
+            Scheme::Fs => Ok(Self::LocalFs),
+            #[cfg(feature = "storage-s3")]
+            Scheme::S3 => Ok(Self::S3 {
+                configured_scheme: scheme_str,
+                config: s3_config_parse(props)?.into(),
+                customized_credential_load: extensions
+                    .get::<CustomAwsCredentialLoader>()
+                    .map(Arc::unwrap_or_clone),
+            }),
+            #[cfg(feature = "storage-gcs")]
+            Scheme::Gcs => Ok(Self::Gcs {
+                config: gcs_config_parse(props)?.into(),
+            }),
+            #[cfg(feature = "storage-oss")]
+            Scheme::Oss => Ok(Self::Oss {
+                config: oss_config_parse(props)?.into(),
+            }),
+            #[cfg(feature = "storage-azdls")]
+            Scheme::Azdls => {
+                let scheme = scheme_str.parse::<AzureStorageScheme>()?;
+                Ok(Self::Azdls {
+                    config: azdls_config_parse(props)?.into(),
+                    configured_scheme: scheme,
+                })
+            }
+            // Update doc on [`FileIO`] when adding new schemes.
+            _ => Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                format!("Constructing file io from scheme: {scheme} not 
supported now",),
+            )),
+        }
+    }
+
+    /// Creates operator from path.
+    ///
+    /// # Arguments
+    ///
+    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
+    ///
+    /// # Returns
+    ///
+    /// The return value consists of two parts:
+    ///
+    /// * An [`opendal::Operator`] instance used to operate on file.
+    /// * Relative path to the root uri of [`opendal::Operator`].
+    #[allow(unreachable_code, unused_variables)]
+    pub(crate) fn create_operator<'a>(
+        &self,
+        path: &'a impl AsRef<str>,
+    ) -> Result<(Operator, &'a str)> {
+        let path = path.as_ref();
+        let (operator, relative_path): (Operator, &str) = match self {
+            #[cfg(feature = "storage-memory")]
+            OpenDalStorage::Memory(op) => {
+                if let Some(stripped) = path.strip_prefix("memory:/") {
+                    (op.clone(), stripped)
+                } else {
+                    (op.clone(), &path[1..])
+                }
+            }
+            #[cfg(feature = "storage-fs")]
+            OpenDalStorage::LocalFs => {
+                let op = fs_config_build()?;
+                if let Some(stripped) = path.strip_prefix("file:/") {
+                    (op, stripped)
+                } else {
+                    (op, &path[1..])
+                }
+            }
+            #[cfg(feature = "storage-s3")]
+            OpenDalStorage::S3 {
+                configured_scheme,
+                config,
+                customized_credential_load,
+            } => {
+                let op = s3_config_build(config, customized_credential_load, 
path)?;
+                let op_info = op.info();
+
+                // Check prefix of s3 path.
+                let prefix = format!("{}://{}/", configured_scheme, 
op_info.name());
+                if path.starts_with(&prefix) {
+                    (op, &path[prefix.len()..])
+                } else {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid s3 url: {path}, should start with 
{prefix}"),
+                    ));
+                }
+            }
+            #[cfg(feature = "storage-gcs")]
+            OpenDalStorage::Gcs { config } => {
+                let operator = gcs_config_build(config, path)?;
+                let prefix = format!("gs://{}/", operator.info().name());
+                if path.starts_with(&prefix) {
+                    (operator, &path[prefix.len()..])
+                } else {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid gcs url: {path}, should start with 
{prefix}"),
+                    ));
+                }
+            }
+            #[cfg(feature = "storage-oss")]
+            OpenDalStorage::Oss { config } => {
+                let op = oss_config_build(config, path)?;
+                let prefix = format!("oss://{}/", op.info().name());
+                if path.starts_with(&prefix) {
+                    (op, &path[prefix.len()..])
+                } else {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid oss url: {path}, should start with 
{prefix}"),
+                    ));
+                }
+            }
+            #[cfg(feature = "storage-azdls")]
+            OpenDalStorage::Azdls {
+                configured_scheme,
+                config,
+            } => azdls_create_operator(path, config, configured_scheme)?,
+            #[cfg(all(
+                not(feature = "storage-s3"),
+                not(feature = "storage-fs"),
+                not(feature = "storage-gcs"),
+                not(feature = "storage-oss"),
+                not(feature = "storage-azdls"),
+            ))]
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::FeatureUnsupported,
+                    "No storage service has been enabled",
+                ));
+            }
+        };
+
+        // Transient errors are common for object stores; however there's no
+        // harm in retrying temporary failures for other storage backends as 
well.
+        let operator = operator.layer(RetryLayer::new());
+        Ok((operator, relative_path))
+    }
+
+    /// Parse scheme.
+    fn parse_scheme(scheme: &str) -> Result<Scheme> {
+        match scheme {
+            "memory" => Ok(Scheme::Memory),
+            "file" | "" => Ok(Scheme::Fs),
+            "s3" | "s3a" => Ok(Scheme::S3),
+            "gs" | "gcs" => Ok(Scheme::Gcs),
+            "oss" => Ok(Scheme::Oss),
+            "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
+            s => Ok(s.parse::<Scheme>()?),
+        }
+    }
+}
+
+#[typetag::serde(name = "OpenDalStorage")]
+#[async_trait]
+impl Storage for OpenDalStorage {
+    async fn exists(&self, path: &str) -> Result<bool> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        Ok(op.exists(relative_path).await?)
+    }
+
+    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        let meta = op.stat(relative_path).await?;
+        Ok(FileMetadata {
+            size: meta.content_length(),
+        })
+    }
+
+    async fn read(&self, path: &str) -> Result<Bytes> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        Ok(op.read(relative_path).await?.to_bytes())
+    }
+
+    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        Ok(Box::new(op.reader(relative_path).await?))
+    }
+
+    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        op.write(relative_path, bs).await?;
+        Ok(())
+    }
+
+    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        Ok(Box::new(op.writer(relative_path).await?))
+    }
+
+    async fn delete(&self, path: &str) -> Result<()> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        Ok(op.delete(relative_path).await?)
+    }
+
+    async fn delete_prefix(&self, path: &str) -> Result<()> {
+        let (op, relative_path) = self.create_operator(&path)?;
+        let path = if relative_path.ends_with('/') {
+            relative_path.to_string()
+        } else {
+            format!("{relative_path}/")
+        };
+        Ok(op.remove_all(&path).await?)
+    }
+
+    #[allow(unreachable_code, unused_variables)]
+    fn new_input(&self, path: &str) -> Result<InputFile> {
+        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
+    }
+
+    #[allow(unreachable_code, unused_variables)]
+    fn new_output(&self, path: &str) -> Result<OutputFile> {
+        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
+    }
+}
+
+// OpenDAL implementations for FileRead and FileWrite traits
+
+#[async_trait]
+impl FileRead for opendal::Reader {
+    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+        Ok(opendal::Reader::read(self, range).await?.to_bytes())
+    }
+}
+
+#[async_trait]
+impl FileWrite for opendal::Writer {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        Ok(opendal::Writer::write(self, bs).await?)
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        let _ = opendal::Writer::close(self).await?;
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[cfg(feature = "storage-memory")]
+    #[test]
+    fn test_default_memory_operator() {
+        let op = default_memory_operator();
+        assert_eq!(op.info().scheme().to_string(), "memory");
+    }
+}
diff --git a/crates/iceberg/src/io/storage_oss.rs 
b/crates/iceberg/src/io/opendal/oss.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_oss.rs
rename to crates/iceberg/src/io/opendal/oss.rs
diff --git a/crates/iceberg/src/io/storage_s3.rs 
b/crates/iceberg/src/io/opendal/s3.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_s3.rs
rename to crates/iceberg/src/io/opendal/s3.rs
diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs
index 77978e223..15cc85ab1 100644
--- a/crates/iceberg/src/io/storage.rs
+++ b/crates/iceberg/src/io/storage.rs
@@ -15,30 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Storage interfaces for Iceberg.
+
 use std::fmt::Debug;
 use std::sync::Arc;
 
 use async_trait::async_trait;
 use bytes::Bytes;
-use opendal::layers::RetryLayer;
-#[cfg(feature = "storage-azdls")]
-use opendal::services::AzdlsConfig;
-#[cfg(feature = "storage-gcs")]
-use opendal::services::GcsConfig;
-#[cfg(feature = "storage-oss")]
-use opendal::services::OssConfig;
-#[cfg(feature = "storage-s3")]
-use opendal::services::S3Config;
-use opendal::{Operator, Scheme};
 
-#[cfg(feature = "storage-azdls")]
-use super::AzureStorageScheme;
-use super::{
-    FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, 
StorageConfig,
-};
-#[cfg(feature = "storage-s3")]
-use crate::io::CustomAwsCredentialLoader;
-use crate::{Error, ErrorKind, Result};
+use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile};
+use crate::Result;
+pub use crate::io::config::StorageConfig;
 
 /// Trait for storage operations in Iceberg.
 ///
@@ -153,203 +140,3 @@ pub trait StorageFactory: Debug + Send + Sync {
     /// if the storage could not be created.
     fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>>;
 }
-
-/// The storage carries all supported storage services in iceberg
-#[derive(Debug)]
-pub(crate) enum OpenDalStorage {
-    #[cfg(feature = "storage-memory")]
-    Memory(Operator),
-    #[cfg(feature = "storage-fs")]
-    LocalFs,
-    /// Expects paths of the form `s3[a]://<bucket>/<path>`.
-    #[cfg(feature = "storage-s3")]
-    S3 {
-        /// s3 storage could have `s3://` and `s3a://`.
-        /// Storing the scheme string here to return the correct path.
-        configured_scheme: String,
-        config: Arc<S3Config>,
-        customized_credential_load: Option<CustomAwsCredentialLoader>,
-    },
-    #[cfg(feature = "storage-gcs")]
-    Gcs { config: Arc<GcsConfig> },
-    #[cfg(feature = "storage-oss")]
-    Oss { config: Arc<OssConfig> },
-    /// Expects paths of the form
-    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
-    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
-    #[cfg(feature = "storage-azdls")]
-    Azdls {
-        /// Because Azdls accepts multiple possible schemes, we store the full
-        /// passed scheme here to later validate schemes passed via paths.
-        configured_scheme: AzureStorageScheme,
-        config: Arc<AzdlsConfig>,
-    },
-}
-
-impl OpenDalStorage {
-    /// Convert iceberg config to opendal config.
-    pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> 
{
-        let (scheme_str, props, extensions) = file_io_builder.into_parts();
-        let _ = (&props, &extensions);
-        let scheme = Self::parse_scheme(&scheme_str)?;
-
-        match scheme {
-            #[cfg(feature = "storage-memory")]
-            Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
-            #[cfg(feature = "storage-fs")]
-            Scheme::Fs => Ok(Self::LocalFs),
-            #[cfg(feature = "storage-s3")]
-            Scheme::S3 => Ok(Self::S3 {
-                configured_scheme: scheme_str,
-                config: super::s3_config_parse(props)?.into(),
-                customized_credential_load: extensions
-                    .get::<CustomAwsCredentialLoader>()
-                    .map(Arc::unwrap_or_clone),
-            }),
-            #[cfg(feature = "storage-gcs")]
-            Scheme::Gcs => Ok(Self::Gcs {
-                config: super::gcs_config_parse(props)?.into(),
-            }),
-            #[cfg(feature = "storage-oss")]
-            Scheme::Oss => Ok(Self::Oss {
-                config: super::oss_config_parse(props)?.into(),
-            }),
-            #[cfg(feature = "storage-azdls")]
-            Scheme::Azdls => {
-                let scheme = scheme_str.parse::<AzureStorageScheme>()?;
-                Ok(Self::Azdls {
-                    config: super::azdls_config_parse(props)?.into(),
-                    configured_scheme: scheme,
-                })
-            }
-            // Update doc on [`FileIO`] when adding new schemes.
-            _ => Err(Error::new(
-                ErrorKind::FeatureUnsupported,
-                format!("Constructing file io from scheme: {scheme} not 
supported now",),
-            )),
-        }
-    }
-
-    /// Creates operator from path.
-    ///
-    /// # Arguments
-    ///
-    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
-    ///
-    /// # Returns
-    ///
-    /// The return value consists of two parts:
-    ///
-    /// * An [`opendal::Operator`] instance used to operate on file.
-    /// * Relative path to the root uri of [`opendal::Operator`].
-    pub(crate) fn create_operator<'a>(
-        &self,
-        path: &'a impl AsRef<str>,
-    ) -> crate::Result<(Operator, &'a str)> {
-        let path = path.as_ref();
-        let _ = path;
-        let (operator, relative_path): (Operator, &str) = match self {
-            #[cfg(feature = "storage-memory")]
-            OpenDalStorage::Memory(op) => {
-                if let Some(stripped) = path.strip_prefix("memory:/") {
-                    Ok::<_, crate::Error>((op.clone(), stripped))
-                } else {
-                    Ok::<_, crate::Error>((op.clone(), &path[1..]))
-                }
-            }
-            #[cfg(feature = "storage-fs")]
-            OpenDalStorage::LocalFs => {
-                let op = super::fs_config_build()?;
-
-                if let Some(stripped) = path.strip_prefix("file:/") {
-                    Ok::<_, crate::Error>((op, stripped))
-                } else {
-                    Ok::<_, crate::Error>((op, &path[1..]))
-                }
-            }
-            #[cfg(feature = "storage-s3")]
-            OpenDalStorage::S3 {
-                configured_scheme,
-                config,
-                customized_credential_load,
-            } => {
-                let op = super::s3_config_build(config, 
customized_credential_load, path)?;
-                let op_info = op.info();
-
-                // Check prefix of s3 path.
-                let prefix = format!("{}://{}/", configured_scheme, 
op_info.name());
-                if path.starts_with(&prefix) {
-                    Ok((op, &path[prefix.len()..]))
-                } else {
-                    Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Invalid s3 url: {path}, should start with 
{prefix}"),
-                    ))
-                }
-            }
-            #[cfg(feature = "storage-gcs")]
-            OpenDalStorage::Gcs { config } => {
-                let operator = super::gcs_config_build(config, path)?;
-                let prefix = format!("gs://{}/", operator.info().name());
-                if path.starts_with(&prefix) {
-                    Ok((operator, &path[prefix.len()..]))
-                } else {
-                    Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Invalid gcs url: {path}, should start with 
{prefix}"),
-                    ))
-                }
-            }
-            #[cfg(feature = "storage-oss")]
-            OpenDalStorage::Oss { config } => {
-                let op = super::oss_config_build(config, path)?;
-
-                // Check prefix of oss path.
-                let prefix = format!("oss://{}/", op.info().name());
-                if path.starts_with(&prefix) {
-                    Ok((op, &path[prefix.len()..]))
-                } else {
-                    Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Invalid oss url: {path}, should start with 
{prefix}"),
-                    ))
-                }
-            }
-            #[cfg(feature = "storage-azdls")]
-            OpenDalStorage::Azdls {
-                configured_scheme,
-                config,
-            } => super::azdls_create_operator(path, config, configured_scheme),
-            #[cfg(all(
-                not(feature = "storage-s3"),
-                not(feature = "storage-fs"),
-                not(feature = "storage-gcs"),
-                not(feature = "storage-oss"),
-                not(feature = "storage-azdls"),
-            ))]
-            _ => Err(Error::new(
-                ErrorKind::FeatureUnsupported,
-                "No storage service has been enabled",
-            )),
-        }?;
-
-        // Transient errors are common for object stores; however there's no
-        // harm in retrying temporary failures for other storage backends as 
well.
-        let operator = operator.layer(RetryLayer::new());
-
-        Ok((operator, relative_path))
-    }
-
-    /// Parse scheme.
-    fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
-        match scheme {
-            "memory" => Ok(Scheme::Memory),
-            "file" | "" => Ok(Scheme::Fs),
-            "s3" | "s3a" => Ok(Scheme::S3),
-            "gs" | "gcs" => Ok(Scheme::Gcs),
-            "oss" => Ok(Scheme::Oss),
-            "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
-            s => Ok(s.parse::<Scheme>()?),
-        }
-    }
-}
diff --git a/crates/iceberg/src/puffin/metadata.rs 
b/crates/iceberg/src/puffin/metadata.rs
index b09f2c7c1..3794eec45 100644
--- a/crates/iceberg/src/puffin/metadata.rs
+++ b/crates/iceberg/src/puffin/metadata.rs
@@ -285,10 +285,13 @@ impl FileMetadata {
 
         let input_file_length = input_file.metadata().await?.size;
         let footer_payload_length =
-            FileMetadata::read_footer_payload_length(&file_read, 
input_file_length).await?;
-        let footer_bytes =
-            FileMetadata::read_footer_bytes(&file_read, input_file_length, 
footer_payload_length)
-                .await?;
+            FileMetadata::read_footer_payload_length(file_read.as_ref(), 
input_file_length).await?;
+        let footer_bytes = FileMetadata::read_footer_bytes(
+            file_read.as_ref(),
+            input_file_length,
+            footer_payload_length,
+        )
+        .await?;
 
         let magic_length = FileMetadata::MAGIC_LENGTH as usize;
         // check first four bytes of footer
diff --git a/crates/iceberg/src/puffin/reader.rs 
b/crates/iceberg/src/puffin/reader.rs
index a6308dceb..d272f02d4 100644
--- a/crates/iceberg/src/puffin/reader.rs
+++ b/crates/iceberg/src/puffin/reader.rs
@@ -19,7 +19,7 @@ use tokio::sync::OnceCell;
 
 use super::validate_puffin_compression;
 use crate::Result;
-use crate::io::{FileRead, InputFile};
+use crate::io::InputFile;
 use crate::puffin::blob::Blob;
 use crate::puffin::metadata::{BlobMetadata, FileMetadata};
 
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index a75c74b3b..da04d5435 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -212,7 +212,7 @@ impl SchemaVisitor for IndexByParquetPathName {
 pub struct ParquetWriter {
     schema: SchemaRef,
     output_file: OutputFile,
-    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn 
FileWrite>>>>,
+    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter>>,
     writer_properties: WriterProperties,
     current_row_num: usize,
     nan_value_count_visitor: NanValueCountVisitor,
@@ -577,16 +577,16 @@ impl CurrentFileStatus for ParquetWriter {
 /// # NOTES
 ///
 /// We keep this wrapper been used inside only.
-struct AsyncFileWriter<W: FileWrite>(W);
+struct AsyncFileWriter(Box<dyn FileWrite>);
 
-impl<W: FileWrite> AsyncFileWriter<W> {
+impl AsyncFileWriter {
     /// Create a new `AsyncFileWriter` with the given writer.
-    pub fn new(writer: W) -> Self {
+    pub fn new(writer: Box<dyn FileWrite>) -> Self {
         Self(writer)
     }
 }
 
-impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
+impl ArrowAsyncFileWriter for AsyncFileWriter {
     fn write(&mut self, bs: Bytes) -> BoxFuture<'_, 
parquet::errors::Result<()>> {
         Box::pin(async {
             self.0


Reply via email to