This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0c3e5f0ed feat!(io): Implement Storage for OpenDal (#2080)
0c3e5f0ed is described below
commit 0c3e5f0ed7e00857d751581f46c787e13f2298cf
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Feb 2 16:52:25 2026 -0800
feat!(io): Implement Storage for OpenDal (#2080)
## Which issue does this PR close?
- Closes #2083
## What changes are included in this PR?
Wired `OpenDal` with `Storage` trait. `FileIO` still uses hardcoded
`OpenDal` as of now so no customer facing behavior changes
- Implement Storage for OpenDal
- Added OpenDalFactory and implement StorageFactory (unused for now)
- Updated Input/OutputFile to delegate operations to storage
- Updated FileIO to delegate operations to inner storage
## Are these changes tested?
No new tests added, rely on existing tests
---
crates/iceberg/src/arrow/reader.rs | 12 +-
crates/iceberg/src/io/file_io.rs | 131 ++----
crates/iceberg/src/io/mod.rs | 31 +-
.../src/io/{storage_azdls.rs => opendal/azdls.rs} | 8 +-
.../src/io/{storage_fs.rs => opendal/fs.rs} | 0
.../src/io/{storage_gcs.rs => opendal/gcs.rs} | 0
.../io/{storage_memory.rs => opendal/memory.rs} | 0
crates/iceberg/src/io/opendal/mod.rs | 474 +++++++++++++++++++++
.../src/io/{storage_oss.rs => opendal/oss.rs} | 0
.../src/io/{storage_s3.rs => opendal/s3.rs} | 0
crates/iceberg/src/io/storage.rs | 223 +---------
crates/iceberg/src/puffin/metadata.rs | 11 +-
crates/iceberg/src/puffin/reader.rs | 2 +-
.../src/writer/file_writer/parquet_writer.rs | 10 +-
14 files changed, 543 insertions(+), 359 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index aa45a1297..5b3a7bb86 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -474,7 +474,7 @@ impl ArrowReader {
file_io: FileIO,
should_load_page_index: bool,
arrow_reader_options: Option<ArrowReaderOptions>,
- ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead
+ Sized>>> {
+ ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(data_file_path)?;
@@ -1673,18 +1673,18 @@ impl BoundPredicateVisitor for PredicateConverter<'_> {
}
/// ArrowFileReader is a wrapper around a FileRead that impls parquets
AsyncFileReader.
-pub struct ArrowFileReader<R: FileRead> {
+pub struct ArrowFileReader {
meta: FileMetadata,
preload_column_index: bool,
preload_offset_index: bool,
preload_page_index: bool,
metadata_size_hint: Option<usize>,
- r: R,
+ r: Box<dyn FileRead>,
}
-impl<R: FileRead> ArrowFileReader<R> {
+impl ArrowFileReader {
/// Create a new ArrowFileReader
- pub fn new(meta: FileMetadata, r: R) -> Self {
+ pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
Self {
meta,
preload_column_index: false,
@@ -1723,7 +1723,7 @@ impl<R: FileRead> ArrowFileReader<R> {
}
}
-impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
+impl AsyncFileReader for ArrowFileReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
Box::pin(
self.r
diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs
index ef9f8aa93..1ad71d853 100644
--- a/crates/iceberg/src/io/file_io.rs
+++ b/crates/iceberg/src/io/file_io.rs
@@ -21,10 +21,10 @@ use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
-use opendal::Operator;
use url::Url;
-use super::storage::OpenDalStorage;
+use super::opendal::OpenDalStorage;
+use super::storage::Storage;
use crate::{Error, ErrorKind, Result};
/// FileIO implementation, used to manipulate files in underlying storage.
@@ -89,8 +89,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
- let (op, relative_path) = self.inner.create_operator(&path)?;
- Ok(op.delete(relative_path).await?)
+ self.inner.delete(path.as_ref()).await
}
/// Remove the path and all nested dirs and files recursively.
@@ -104,14 +103,8 @@ impl FileIO {
/// - If the path is a file or not exist, this function will be no-op.
/// - If the path is a empty directory, this function will remove the
directory itself.
/// - If the path is a non-empty directory, this function will remove the
directory and all nested files and directories.
- pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
- let (op, relative_path) = self.inner.create_operator(&path)?;
- let path = if relative_path.ends_with('/') {
- relative_path.to_string()
- } else {
- format!("{relative_path}/")
- };
- Ok(op.remove_all(&path).await?)
+ pub async fn delete_prefix(&self, path: impl AsRef<str>) -> Result<()> {
+ self.inner.delete_prefix(path.as_ref()).await
}
/// Check file exists.
@@ -120,8 +113,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
- let (op, relative_path) = self.inner.create_operator(&path)?;
- Ok(op.exists(relative_path).await?)
+ self.inner.exists(path.as_ref()).await
}
/// Creates input file.
@@ -130,14 +122,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
- let (op, relative_path) = self.inner.create_operator(&path)?;
- let path = path.as_ref().to_string();
- let relative_path_pos = path.len() - relative_path.len();
- Ok(InputFile {
- op,
- path,
- relative_path_pos,
- })
+ self.inner.new_input(path.as_ref())
}
/// Creates output file.
@@ -146,14 +131,7 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
- let (op, relative_path) = self.inner.create_operator(&path)?;
- let path = path.as_ref().to_string();
- let relative_path_pos = path.len() - relative_path.len();
- Ok(OutputFile {
- op,
- path,
- relative_path_pos,
- })
+ self.inner.new_output(path.as_ref())
}
}
@@ -291,24 +269,20 @@ pub trait FileRead: Send + Sync + Unpin + 'static {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}
-#[async_trait::async_trait]
-impl FileRead for opendal::Reader {
- async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
- Ok(opendal::Reader::read(self, range).await?.to_bytes())
- }
-}
-
/// Input file is used for reading from files.
#[derive(Debug)]
pub struct InputFile {
- op: Operator,
- // Absolution path of file.
+ storage: Arc<dyn Storage>,
+ // Absolute path of file.
path: String,
- // Relative path of file to uri, starts at [`relative_path_pos`]
- relative_path_pos: usize,
}
impl InputFile {
+ /// Creates a new input file.
+ pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
+ Self { storage, path }
+ }
+
/// Absolute path to root uri.
pub fn location(&self) -> &str {
&self.path
@@ -316,34 +290,26 @@ impl InputFile {
/// Check if file exists.
pub async fn exists(&self) -> crate::Result<bool> {
- Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
+ self.storage.exists(&self.path).await
}
/// Fetch and returns metadata of file.
pub async fn metadata(&self) -> crate::Result<FileMetadata> {
- let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
-
- Ok(FileMetadata {
- size: meta.content_length(),
- })
+ self.storage.metadata(&self.path).await
}
/// Read and returns whole content of file.
///
/// For continuous reading, use [`Self::reader`] instead.
pub async fn read(&self) -> crate::Result<Bytes> {
- Ok(self
- .op
- .read(&self.path[self.relative_path_pos..])
- .await?
- .to_bytes())
+ self.storage.read(&self.path).await
}
/// Creates [`FileRead`] for continuous reading.
///
/// For one-time reading, use [`Self::read`] instead.
- pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
- Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
+ pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
+ self.storage.reader(&self.path).await
}
}
@@ -366,40 +332,20 @@ pub trait FileWrite: Send + Unpin + 'static {
async fn close(&mut self) -> crate::Result<()>;
}
-#[async_trait::async_trait]
-impl FileWrite for opendal::Writer {
- async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
- Ok(opendal::Writer::write(self, bs).await?)
- }
-
- async fn close(&mut self) -> crate::Result<()> {
- let _ = opendal::Writer::close(self).await?;
- Ok(())
- }
-}
-
-#[async_trait::async_trait]
-impl FileWrite for Box<dyn FileWrite> {
- async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
- self.as_mut().write(bs).await
- }
-
- async fn close(&mut self) -> crate::Result<()> {
- self.as_mut().close().await
- }
-}
-
/// Output file is used for writing to files..
#[derive(Debug)]
pub struct OutputFile {
- op: Operator,
- // Absolution path of file.
+ storage: Arc<dyn Storage>,
+ // Absolute path of file.
path: String,
- // Relative path of file to uri, starts at [`relative_path_pos`]
- relative_path_pos: usize,
}
impl OutputFile {
+ /// Creates a new output file.
+ pub fn new(storage: Arc<dyn Storage>, path: String) -> Self {
+ Self { storage, path }
+ }
+
/// Relative path to root uri.
pub fn location(&self) -> &str {
&self.path
@@ -407,22 +353,21 @@ impl OutputFile {
/// Checks if file exists.
pub async fn exists(&self) -> Result<bool> {
- Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
+ self.storage.exists(&self.path).await
}
/// Deletes file.
///
/// If the file does not exist, it will not return error.
pub async fn delete(&self) -> Result<()> {
- Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
+ self.storage.delete(&self.path).await
}
/// Converts into [`InputFile`].
pub fn to_input_file(self) -> InputFile {
InputFile {
- op: self.op,
+ storage: self.storage,
path: self.path,
- relative_path_pos: self.relative_path_pos,
}
}
@@ -433,9 +378,7 @@ impl OutputFile {
/// Calling `write` will overwrite the file if it exists.
/// For continuous writing, use [`Self::writer`].
pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
- let mut writer = self.writer().await?;
- writer.write(bs).await?;
- writer.close().await
+ self.storage.write(&self.path, bs).await
}
/// Creates output file for continuous writing.
@@ -444,9 +387,7 @@ impl OutputFile {
///
/// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
- Ok(Box::new(
- self.op.writer(&self.path[self.relative_path_pos..]).await?,
- ))
+ self.storage.writer(&self.path).await
}
}
@@ -517,14 +458,14 @@ mod tests {
assert!(file_io.exists(&a_path).await.unwrap());
// Remove a file should be no-op.
- file_io.remove_dir_all(&a_path).await.unwrap();
+ file_io.delete_prefix(&a_path).await.unwrap();
assert!(file_io.exists(&a_path).await.unwrap());
// Remove a not exist dir should be no-op.
- file_io.remove_dir_all("not_exists/").await.unwrap();
+ file_io.delete_prefix("not_exists/").await.unwrap();
// Remove a dir should remove all files in it.
- file_io.remove_dir_all(&sub_dir_path).await.unwrap();
+ file_io.delete_prefix(&sub_dir_path).await.unwrap();
assert!(!file_io.exists(&b_path).await.unwrap());
assert!(!file_io.exists(&c_path).await.unwrap());
assert!(file_io.exists(&a_path).await.unwrap());
@@ -543,7 +484,7 @@ mod tests {
let file_io = create_local_file_io();
assert!(!file_io.exists(&full_path).await.unwrap());
assert!(file_io.delete(&full_path).await.is_ok());
- assert!(file_io.remove_dir_all(&full_path).await.is_ok());
+ assert!(file_io.delete_prefix(&full_path).await.is_ok());
}
#[tokio::test]
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 8e40bae97..888b868e3 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -68,38 +68,17 @@
mod config;
mod file_io;
+mod opendal;
mod storage;
pub use config::*;
pub use file_io::*;
-pub use storage::{Storage, StorageFactory};
-pub(crate) mod object_cache;
-
-#[cfg(feature = "storage-azdls")]
-mod storage_azdls;
-#[cfg(feature = "storage-fs")]
-mod storage_fs;
-#[cfg(feature = "storage-gcs")]
-mod storage_gcs;
-#[cfg(feature = "storage-memory")]
-mod storage_memory;
-#[cfg(feature = "storage-oss")]
-mod storage_oss;
#[cfg(feature = "storage-s3")]
-mod storage_s3;
+pub use opendal::CustomAwsCredentialLoader;
+pub use opendal::{OpenDalStorage, OpenDalStorageFactory};
+pub use storage::{Storage, StorageConfig, StorageFactory};
-#[cfg(feature = "storage-azdls")]
-use storage_azdls::*;
-#[cfg(feature = "storage-fs")]
-use storage_fs::*;
-#[cfg(feature = "storage-gcs")]
-use storage_gcs::*;
-#[cfg(feature = "storage-memory")]
-use storage_memory::*;
-#[cfg(feature = "storage-oss")]
-use storage_oss::*;
-#[cfg(feature = "storage-s3")]
-pub use storage_s3::*;
+pub(crate) mod object_cache;
pub(crate) fn is_truthy(value: &str) -> bool {
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
diff --git a/crates/iceberg/src/io/storage_azdls.rs
b/crates/iceberg/src/io/opendal/azdls.rs
similarity index 98%
rename from crates/iceberg/src/io/storage_azdls.rs
rename to crates/iceberg/src/io/opendal/azdls.rs
index ce2b7427b..c957fd62a 100644
--- a/crates/iceberg/src/io/storage_azdls.rs
+++ b/crates/iceberg/src/io/opendal/azdls.rs
@@ -21,6 +21,7 @@ use std::str::FromStr;
use opendal::Configurator;
use opendal::services::AzdlsConfig;
+use serde::{Deserialize, Serialize};
use url::Url;
use crate::io::config::{
@@ -100,8 +101,8 @@ pub(crate) fn azdls_create_operator<'a>(
/// paths are expected to contain the `dfs` storage service.
/// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are
/// expected to contain the `blob` storage service.
-#[derive(Debug, PartialEq)]
-pub(crate) enum AzureStorageScheme {
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub enum AzureStorageScheme {
Abfs,
Abfss,
Wasb,
@@ -320,8 +321,7 @@ mod tests {
use opendal::services::AzdlsConfig;
- use super::{AzureStoragePath, AzureStorageScheme, azdls_create_operator};
- use crate::io::azdls_config_parse;
+ use super::{AzureStoragePath, AzureStorageScheme, azdls_config_parse,
azdls_create_operator};
#[test]
fn test_azdls_config_parse() {
diff --git a/crates/iceberg/src/io/storage_fs.rs
b/crates/iceberg/src/io/opendal/fs.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_fs.rs
rename to crates/iceberg/src/io/opendal/fs.rs
diff --git a/crates/iceberg/src/io/storage_gcs.rs
b/crates/iceberg/src/io/opendal/gcs.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_gcs.rs
rename to crates/iceberg/src/io/opendal/gcs.rs
diff --git a/crates/iceberg/src/io/storage_memory.rs
b/crates/iceberg/src/io/opendal/memory.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_memory.rs
rename to crates/iceberg/src/io/opendal/memory.rs
diff --git a/crates/iceberg/src/io/opendal/mod.rs
b/crates/iceberg/src/io/opendal/mod.rs
new file mode 100644
index 000000000..fb49dc9e3
--- /dev/null
+++ b/crates/iceberg/src/io/opendal/mod.rs
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! OpenDAL-based storage implementation.
+
+use std::ops::Range;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+#[cfg(feature = "storage-azdls")]
+use azdls::AzureStorageScheme;
+use bytes::Bytes;
+use opendal::layers::RetryLayer;
+#[cfg(feature = "storage-azdls")]
+use opendal::services::AzdlsConfig;
+#[cfg(feature = "storage-gcs")]
+use opendal::services::GcsConfig;
+#[cfg(feature = "storage-oss")]
+use opendal::services::OssConfig;
+#[cfg(feature = "storage-s3")]
+use opendal::services::S3Config;
+use opendal::{Operator, Scheme};
+#[cfg(feature = "storage-s3")]
+pub use s3::CustomAwsCredentialLoader;
+use serde::{Deserialize, Serialize};
+
+use super::{
+ FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile,
Storage,
+ StorageConfig, StorageFactory,
+};
+use crate::{Error, ErrorKind, Result};
+
+#[cfg(feature = "storage-azdls")]
+mod azdls;
+#[cfg(feature = "storage-fs")]
+mod fs;
+#[cfg(feature = "storage-gcs")]
+mod gcs;
+#[cfg(feature = "storage-memory")]
+mod memory;
+#[cfg(feature = "storage-oss")]
+mod oss;
+#[cfg(feature = "storage-s3")]
+mod s3;
+
+#[cfg(feature = "storage-azdls")]
+use azdls::*;
+#[cfg(feature = "storage-fs")]
+use fs::*;
+#[cfg(feature = "storage-gcs")]
+use gcs::*;
+#[cfg(feature = "storage-memory")]
+use memory::*;
+#[cfg(feature = "storage-oss")]
+use oss::*;
+#[cfg(feature = "storage-s3")]
+pub use s3::*;
+
+/// OpenDAL-based storage factory.
+///
+/// Maps scheme to the corresponding OpenDalStorage storage variant.
+///
+/// TODO this is currently not used, we still use OpenDalStorage::build() for
now
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum OpenDalStorageFactory {
+ /// Memory storage factory.
+ #[cfg(feature = "storage-memory")]
+ Memory,
+ /// Local filesystem storage factory.
+ #[cfg(feature = "storage-fs")]
+ Fs,
+ /// S3 storage factory.
+ #[cfg(feature = "storage-s3")]
+ S3 {
+ /// Custom AWS credential loader.
+ #[serde(skip)]
+ customized_credential_load: Option<CustomAwsCredentialLoader>,
+ },
+ /// GCS storage factory.
+ #[cfg(feature = "storage-gcs")]
+ Gcs,
+ /// OSS storage factory.
+ #[cfg(feature = "storage-oss")]
+ Oss,
+ /// Azure Data Lake Storage factory.
+ #[cfg(feature = "storage-azdls")]
+ Azdls {
+ /// The configured Azure storage scheme.
+ configured_scheme: AzureStorageScheme,
+ },
+}
+
+#[typetag::serde(name = "OpenDalStorageFactory")]
+impl StorageFactory for OpenDalStorageFactory {
+ #[allow(unused_variables)]
+ fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+ match self {
+ #[cfg(feature = "storage-memory")]
+ OpenDalStorageFactory::Memory => {
+ Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
+ }
+ #[cfg(feature = "storage-fs")]
+ OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
+ #[cfg(feature = "storage-s3")]
+ OpenDalStorageFactory::S3 {
+ customized_credential_load,
+ } => Ok(Arc::new(OpenDalStorage::S3 {
+ configured_scheme: "s3".to_string(),
+ config: s3_config_parse(config.props().clone())?.into(),
+ customized_credential_load: customized_credential_load.clone(),
+ })),
+ #[cfg(feature = "storage-gcs")]
+ OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
+ config: gcs_config_parse(config.props().clone())?.into(),
+ })),
+ #[cfg(feature = "storage-oss")]
+ OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
+ config: oss_config_parse(config.props().clone())?.into(),
+ })),
+ #[cfg(feature = "storage-azdls")]
+ OpenDalStorageFactory::Azdls { configured_scheme } => {
+ Ok(Arc::new(OpenDalStorage::Azdls {
+ configured_scheme: configured_scheme.clone(),
+ config: azdls_config_parse(config.props().clone())?.into(),
+ }))
+ }
+ #[cfg(all(
+ not(feature = "storage-memory"),
+ not(feature = "storage-fs"),
+ not(feature = "storage-s3"),
+ not(feature = "storage-gcs"),
+ not(feature = "storage-oss"),
+ not(feature = "storage-azdls"),
+ ))]
+ _ => Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "No storage service has been enabled",
+ )),
+ }
+ }
+}
+
+/// Default memory operator for serde deserialization.
+#[cfg(feature = "storage-memory")]
+fn default_memory_operator() -> Operator {
+ memory_config_build().expect("Failed to create default memory operator")
+}
+
+/// OpenDAL-based storage implementation.
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum OpenDalStorage {
+ /// Memory storage variant.
+ #[cfg(feature = "storage-memory")]
+ Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
+ /// Local filesystem storage variant.
+ #[cfg(feature = "storage-fs")]
+ LocalFs,
+ /// S3 storage variant.
+ #[cfg(feature = "storage-s3")]
+ S3 {
+ /// s3 storage could have `s3://` and `s3a://`.
+ /// Storing the scheme string here to return the correct path.
+ configured_scheme: String,
+ /// S3 configuration.
+ config: Arc<S3Config>,
+ /// Custom AWS credential loader.
+ #[serde(skip)]
+ customized_credential_load: Option<CustomAwsCredentialLoader>,
+ },
+ /// GCS storage variant.
+ #[cfg(feature = "storage-gcs")]
+ Gcs {
+ /// GCS configuration.
+ config: Arc<GcsConfig>,
+ },
+ /// OSS storage variant.
+ #[cfg(feature = "storage-oss")]
+ Oss {
+ /// OSS configuration.
+ config: Arc<OssConfig>,
+ },
+ /// Azure Data Lake Storage variant.
+ /// Expects paths of the form
+ /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
+ /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
+ #[cfg(feature = "storage-azdls")]
+ #[allow(private_interfaces)]
+ Azdls {
+ /// The configured Azure storage scheme.
+ /// Because Azdls accepts multiple possible schemes, we store the full
+ /// passed scheme here to later validate schemes passed via paths.
+ configured_scheme: AzureStorageScheme,
+ /// Azure DLS configuration.
+ config: Arc<AzdlsConfig>,
+ },
+}
+
+impl OpenDalStorage {
+ /// Convert iceberg config to opendal config.
+ ///
+ /// TODO Switch to use OpenDalStorageFactory::build()
+ pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
+ let (scheme_str, props, extensions) = file_io_builder.into_parts();
+ let _ = (&props, &extensions);
+ let scheme = Self::parse_scheme(&scheme_str)?;
+
+ match scheme {
+ #[cfg(feature = "storage-memory")]
+ Scheme::Memory => Ok(Self::Memory(memory_config_build()?)),
+ #[cfg(feature = "storage-fs")]
+ Scheme::Fs => Ok(Self::LocalFs),
+ #[cfg(feature = "storage-s3")]
+ Scheme::S3 => Ok(Self::S3 {
+ configured_scheme: scheme_str,
+ config: s3_config_parse(props)?.into(),
+ customized_credential_load: extensions
+ .get::<CustomAwsCredentialLoader>()
+ .map(Arc::unwrap_or_clone),
+ }),
+ #[cfg(feature = "storage-gcs")]
+ Scheme::Gcs => Ok(Self::Gcs {
+ config: gcs_config_parse(props)?.into(),
+ }),
+ #[cfg(feature = "storage-oss")]
+ Scheme::Oss => Ok(Self::Oss {
+ config: oss_config_parse(props)?.into(),
+ }),
+ #[cfg(feature = "storage-azdls")]
+ Scheme::Azdls => {
+ let scheme = scheme_str.parse::<AzureStorageScheme>()?;
+ Ok(Self::Azdls {
+ config: azdls_config_parse(props)?.into(),
+ configured_scheme: scheme,
+ })
+ }
+ // Update doc on [`FileIO`] when adding new schemes.
+ _ => Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ format!("Constructing file io from scheme: {scheme} not
supported now",),
+ )),
+ }
+ }
+
+ /// Creates operator from path.
+ ///
+ /// # Arguments
+ ///
+ /// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
+ ///
+ /// # Returns
+ ///
+ /// The return value consists of two parts:
+ ///
+ /// * An [`opendal::Operator`] instance used to operate on file.
+ /// * Relative path to the root uri of [`opendal::Operator`].
+ #[allow(unreachable_code, unused_variables)]
+ pub(crate) fn create_operator<'a>(
+ &self,
+ path: &'a impl AsRef<str>,
+ ) -> Result<(Operator, &'a str)> {
+ let path = path.as_ref();
+ let (operator, relative_path): (Operator, &str) = match self {
+ #[cfg(feature = "storage-memory")]
+ OpenDalStorage::Memory(op) => {
+ if let Some(stripped) = path.strip_prefix("memory:/") {
+ (op.clone(), stripped)
+ } else {
+ (op.clone(), &path[1..])
+ }
+ }
+ #[cfg(feature = "storage-fs")]
+ OpenDalStorage::LocalFs => {
+ let op = fs_config_build()?;
+ if let Some(stripped) = path.strip_prefix("file:/") {
+ (op, stripped)
+ } else {
+ (op, &path[1..])
+ }
+ }
+ #[cfg(feature = "storage-s3")]
+ OpenDalStorage::S3 {
+ configured_scheme,
+ config,
+ customized_credential_load,
+ } => {
+ let op = s3_config_build(config, customized_credential_load,
path)?;
+ let op_info = op.info();
+
+ // Check prefix of s3 path.
+ let prefix = format!("{}://{}/", configured_scheme,
op_info.name());
+ if path.starts_with(&prefix) {
+ (op, &path[prefix.len()..])
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid s3 url: {path}, should start with
{prefix}"),
+ ));
+ }
+ }
+ #[cfg(feature = "storage-gcs")]
+ OpenDalStorage::Gcs { config } => {
+ let operator = gcs_config_build(config, path)?;
+ let prefix = format!("gs://{}/", operator.info().name());
+ if path.starts_with(&prefix) {
+ (operator, &path[prefix.len()..])
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid gcs url: {path}, should start with
{prefix}"),
+ ));
+ }
+ }
+ #[cfg(feature = "storage-oss")]
+ OpenDalStorage::Oss { config } => {
+ let op = oss_config_build(config, path)?;
+ let prefix = format!("oss://{}/", op.info().name());
+ if path.starts_with(&prefix) {
+ (op, &path[prefix.len()..])
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid oss url: {path}, should start with
{prefix}"),
+ ));
+ }
+ }
+ #[cfg(feature = "storage-azdls")]
+ OpenDalStorage::Azdls {
+ configured_scheme,
+ config,
+ } => azdls_create_operator(path, config, configured_scheme)?,
+ #[cfg(all(
+ not(feature = "storage-s3"),
+ not(feature = "storage-fs"),
+ not(feature = "storage-gcs"),
+ not(feature = "storage-oss"),
+ not(feature = "storage-azdls"),
+ ))]
+ _ => {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "No storage service has been enabled",
+ ));
+ }
+ };
+
+ // Transient errors are common for object stores; however there's no
+ // harm in retrying temporary failures for other storage backends as
well.
+ let operator = operator.layer(RetryLayer::new());
+ Ok((operator, relative_path))
+ }
+
+ /// Parse scheme.
+ fn parse_scheme(scheme: &str) -> Result<Scheme> {
+ match scheme {
+ "memory" => Ok(Scheme::Memory),
+ "file" | "" => Ok(Scheme::Fs),
+ "s3" | "s3a" => Ok(Scheme::S3),
+ "gs" | "gcs" => Ok(Scheme::Gcs),
+ "oss" => Ok(Scheme::Oss),
+ "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
+ s => Ok(s.parse::<Scheme>()?),
+ }
+ }
+}
+
+#[typetag::serde(name = "OpenDalStorage")]
+#[async_trait]
+impl Storage for OpenDalStorage {
+ async fn exists(&self, path: &str) -> Result<bool> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ Ok(op.exists(relative_path).await?)
+ }
+
+ async fn metadata(&self, path: &str) -> Result<FileMetadata> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ let meta = op.stat(relative_path).await?;
+ Ok(FileMetadata {
+ size: meta.content_length(),
+ })
+ }
+
+ async fn read(&self, path: &str) -> Result<Bytes> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ Ok(op.read(relative_path).await?.to_bytes())
+ }
+
+ async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ Ok(Box::new(op.reader(relative_path).await?))
+ }
+
+ async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ op.write(relative_path, bs).await?;
+ Ok(())
+ }
+
+ async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ Ok(Box::new(op.writer(relative_path).await?))
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ Ok(op.delete(relative_path).await?)
+ }
+
+ async fn delete_prefix(&self, path: &str) -> Result<()> {
+ let (op, relative_path) = self.create_operator(&path)?;
+ let path = if relative_path.ends_with('/') {
+ relative_path.to_string()
+ } else {
+ format!("{relative_path}/")
+ };
+ Ok(op.remove_all(&path).await?)
+ }
+
+ #[allow(unreachable_code, unused_variables)]
+ fn new_input(&self, path: &str) -> Result<InputFile> {
+ Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
+ }
+
+ #[allow(unreachable_code, unused_variables)]
+ fn new_output(&self, path: &str) -> Result<OutputFile> {
+ Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
+ }
+}
+
+// OpenDAL implementations for FileRead and FileWrite traits
+
+#[async_trait]
+impl FileRead for opendal::Reader {
+ async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+ Ok(opendal::Reader::read(self, range).await?.to_bytes())
+ }
+}
+
+#[async_trait]
+impl FileWrite for opendal::Writer {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ Ok(opendal::Writer::write(self, bs).await?)
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ let _ = opendal::Writer::close(self).await?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[cfg(feature = "storage-memory")]
+ #[test]
+ fn test_default_memory_operator() {
+ let op = default_memory_operator();
+ assert_eq!(op.info().scheme().to_string(), "memory");
+ }
+}
diff --git a/crates/iceberg/src/io/storage_oss.rs
b/crates/iceberg/src/io/opendal/oss.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_oss.rs
rename to crates/iceberg/src/io/opendal/oss.rs
diff --git a/crates/iceberg/src/io/storage_s3.rs
b/crates/iceberg/src/io/opendal/s3.rs
similarity index 100%
rename from crates/iceberg/src/io/storage_s3.rs
rename to crates/iceberg/src/io/opendal/s3.rs
diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs
index 77978e223..15cc85ab1 100644
--- a/crates/iceberg/src/io/storage.rs
+++ b/crates/iceberg/src/io/storage.rs
@@ -15,30 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+//! Storage interfaces for Iceberg.
+
use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
-use opendal::layers::RetryLayer;
-#[cfg(feature = "storage-azdls")]
-use opendal::services::AzdlsConfig;
-#[cfg(feature = "storage-gcs")]
-use opendal::services::GcsConfig;
-#[cfg(feature = "storage-oss")]
-use opendal::services::OssConfig;
-#[cfg(feature = "storage-s3")]
-use opendal::services::S3Config;
-use opendal::{Operator, Scheme};
-#[cfg(feature = "storage-azdls")]
-use super::AzureStorageScheme;
-use super::{
- FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile,
StorageConfig,
-};
-#[cfg(feature = "storage-s3")]
-use crate::io::CustomAwsCredentialLoader;
-use crate::{Error, ErrorKind, Result};
+use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile};
+use crate::Result;
+pub use crate::io::config::StorageConfig;
/// Trait for storage operations in Iceberg.
///
@@ -153,203 +140,3 @@ pub trait StorageFactory: Debug + Send + Sync {
/// if the storage could not be created.
fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>>;
}
-
-/// The storage carries all supported storage services in iceberg
-#[derive(Debug)]
-pub(crate) enum OpenDalStorage {
- #[cfg(feature = "storage-memory")]
- Memory(Operator),
- #[cfg(feature = "storage-fs")]
- LocalFs,
- /// Expects paths of the form `s3[a]://<bucket>/<path>`.
- #[cfg(feature = "storage-s3")]
- S3 {
- /// s3 storage could have `s3://` and `s3a://`.
- /// Storing the scheme string here to return the correct path.
- configured_scheme: String,
- config: Arc<S3Config>,
- customized_credential_load: Option<CustomAwsCredentialLoader>,
- },
- #[cfg(feature = "storage-gcs")]
- Gcs { config: Arc<GcsConfig> },
- #[cfg(feature = "storage-oss")]
- Oss { config: Arc<OssConfig> },
- /// Expects paths of the form
- /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
- /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
- #[cfg(feature = "storage-azdls")]
- Azdls {
- /// Because Azdls accepts multiple possible schemes, we store the full
- /// passed scheme here to later validate schemes passed via paths.
- configured_scheme: AzureStorageScheme,
- config: Arc<AzdlsConfig>,
- },
-}
-
-impl OpenDalStorage {
- /// Convert iceberg config to opendal config.
- pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self>
{
- let (scheme_str, props, extensions) = file_io_builder.into_parts();
- let _ = (&props, &extensions);
- let scheme = Self::parse_scheme(&scheme_str)?;
-
- match scheme {
- #[cfg(feature = "storage-memory")]
- Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
- #[cfg(feature = "storage-fs")]
- Scheme::Fs => Ok(Self::LocalFs),
- #[cfg(feature = "storage-s3")]
- Scheme::S3 => Ok(Self::S3 {
- configured_scheme: scheme_str,
- config: super::s3_config_parse(props)?.into(),
- customized_credential_load: extensions
- .get::<CustomAwsCredentialLoader>()
- .map(Arc::unwrap_or_clone),
- }),
- #[cfg(feature = "storage-gcs")]
- Scheme::Gcs => Ok(Self::Gcs {
- config: super::gcs_config_parse(props)?.into(),
- }),
- #[cfg(feature = "storage-oss")]
- Scheme::Oss => Ok(Self::Oss {
- config: super::oss_config_parse(props)?.into(),
- }),
- #[cfg(feature = "storage-azdls")]
- Scheme::Azdls => {
- let scheme = scheme_str.parse::<AzureStorageScheme>()?;
- Ok(Self::Azdls {
- config: super::azdls_config_parse(props)?.into(),
- configured_scheme: scheme,
- })
- }
- // Update doc on [`FileIO`] when adding new schemes.
- _ => Err(Error::new(
- ErrorKind::FeatureUnsupported,
- format!("Constructing file io from scheme: {scheme} not
supported now",),
- )),
- }
- }
-
- /// Creates operator from path.
- ///
- /// # Arguments
- ///
- /// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
- ///
- /// # Returns
- ///
- /// The return value consists of two parts:
- ///
- /// * An [`opendal::Operator`] instance used to operate on file.
- /// * Relative path to the root uri of [`opendal::Operator`].
- pub(crate) fn create_operator<'a>(
- &self,
- path: &'a impl AsRef<str>,
- ) -> crate::Result<(Operator, &'a str)> {
- let path = path.as_ref();
- let _ = path;
- let (operator, relative_path): (Operator, &str) = match self {
- #[cfg(feature = "storage-memory")]
- OpenDalStorage::Memory(op) => {
- if let Some(stripped) = path.strip_prefix("memory:/") {
- Ok::<_, crate::Error>((op.clone(), stripped))
- } else {
- Ok::<_, crate::Error>((op.clone(), &path[1..]))
- }
- }
- #[cfg(feature = "storage-fs")]
- OpenDalStorage::LocalFs => {
- let op = super::fs_config_build()?;
-
- if let Some(stripped) = path.strip_prefix("file:/") {
- Ok::<_, crate::Error>((op, stripped))
- } else {
- Ok::<_, crate::Error>((op, &path[1..]))
- }
- }
- #[cfg(feature = "storage-s3")]
- OpenDalStorage::S3 {
- configured_scheme,
- config,
- customized_credential_load,
- } => {
- let op = super::s3_config_build(config,
customized_credential_load, path)?;
- let op_info = op.info();
-
- // Check prefix of s3 path.
- let prefix = format!("{}://{}/", configured_scheme,
op_info.name());
- if path.starts_with(&prefix) {
- Ok((op, &path[prefix.len()..]))
- } else {
- Err(Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid s3 url: {path}, should start with
{prefix}"),
- ))
- }
- }
- #[cfg(feature = "storage-gcs")]
- OpenDalStorage::Gcs { config } => {
- let operator = super::gcs_config_build(config, path)?;
- let prefix = format!("gs://{}/", operator.info().name());
- if path.starts_with(&prefix) {
- Ok((operator, &path[prefix.len()..]))
- } else {
- Err(Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid gcs url: {path}, should start with
{prefix}"),
- ))
- }
- }
- #[cfg(feature = "storage-oss")]
- OpenDalStorage::Oss { config } => {
- let op = super::oss_config_build(config, path)?;
-
- // Check prefix of oss path.
- let prefix = format!("oss://{}/", op.info().name());
- if path.starts_with(&prefix) {
- Ok((op, &path[prefix.len()..]))
- } else {
- Err(Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid oss url: {path}, should start with
{prefix}"),
- ))
- }
- }
- #[cfg(feature = "storage-azdls")]
- OpenDalStorage::Azdls {
- configured_scheme,
- config,
- } => super::azdls_create_operator(path, config, configured_scheme),
- #[cfg(all(
- not(feature = "storage-s3"),
- not(feature = "storage-fs"),
- not(feature = "storage-gcs"),
- not(feature = "storage-oss"),
- not(feature = "storage-azdls"),
- ))]
- _ => Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "No storage service has been enabled",
- )),
- }?;
-
- // Transient errors are common for object stores; however there's no
- // harm in retrying temporary failures for other storage backends as
well.
- let operator = operator.layer(RetryLayer::new());
-
- Ok((operator, relative_path))
- }
-
- /// Parse scheme.
- fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
- match scheme {
- "memory" => Ok(Scheme::Memory),
- "file" | "" => Ok(Scheme::Fs),
- "s3" | "s3a" => Ok(Scheme::S3),
- "gs" | "gcs" => Ok(Scheme::Gcs),
- "oss" => Ok(Scheme::Oss),
- "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
- s => Ok(s.parse::<Scheme>()?),
- }
- }
-}
diff --git a/crates/iceberg/src/puffin/metadata.rs
b/crates/iceberg/src/puffin/metadata.rs
index b09f2c7c1..3794eec45 100644
--- a/crates/iceberg/src/puffin/metadata.rs
+++ b/crates/iceberg/src/puffin/metadata.rs
@@ -285,10 +285,13 @@ impl FileMetadata {
let input_file_length = input_file.metadata().await?.size;
let footer_payload_length =
- FileMetadata::read_footer_payload_length(&file_read,
input_file_length).await?;
- let footer_bytes =
- FileMetadata::read_footer_bytes(&file_read, input_file_length,
footer_payload_length)
- .await?;
+ FileMetadata::read_footer_payload_length(file_read.as_ref(),
input_file_length).await?;
+ let footer_bytes = FileMetadata::read_footer_bytes(
+ file_read.as_ref(),
+ input_file_length,
+ footer_payload_length,
+ )
+ .await?;
let magic_length = FileMetadata::MAGIC_LENGTH as usize;
// check first four bytes of footer
diff --git a/crates/iceberg/src/puffin/reader.rs
b/crates/iceberg/src/puffin/reader.rs
index a6308dceb..d272f02d4 100644
--- a/crates/iceberg/src/puffin/reader.rs
+++ b/crates/iceberg/src/puffin/reader.rs
@@ -19,7 +19,7 @@ use tokio::sync::OnceCell;
use super::validate_puffin_compression;
use crate::Result;
-use crate::io::{FileRead, InputFile};
+use crate::io::InputFile;
use crate::puffin::blob::Blob;
use crate::puffin::metadata::{BlobMetadata, FileMetadata};
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index a75c74b3b..da04d5435 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -212,7 +212,7 @@ impl SchemaVisitor for IndexByParquetPathName {
pub struct ParquetWriter {
schema: SchemaRef,
output_file: OutputFile,
- inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn
FileWrite>>>>,
+ inner_writer: Option<AsyncArrowWriter<AsyncFileWriter>>,
writer_properties: WriterProperties,
current_row_num: usize,
nan_value_count_visitor: NanValueCountVisitor,
@@ -577,16 +577,16 @@ impl CurrentFileStatus for ParquetWriter {
/// # NOTES
///
/// We keep this wrapper been used inside only.
-struct AsyncFileWriter<W: FileWrite>(W);
+struct AsyncFileWriter(Box<dyn FileWrite>);
-impl<W: FileWrite> AsyncFileWriter<W> {
+impl AsyncFileWriter {
/// Create a new `AsyncFileWriter` with the given writer.
- pub fn new(writer: W) -> Self {
+ pub fn new(writer: Box<dyn FileWrite>) -> Self {
Self(writer)
}
}
-impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
+impl ArrowAsyncFileWriter for AsyncFileWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_,
parquet::errors::Result<()>> {
Box::pin(async {
self.0