This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8562708 refactor(io): Split io into smaller mods (#438)
8562708 is described below
commit 85627083f0791629be3d73413af936165ebd38eb
Author: Xuanwo <[email protected]>
AuthorDate: Fri Jul 5 21:27:01 2024 +0800
refactor(io): Split io into smaller mods (#438)
* refactor(io): Split io into smaller mods
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
* Format
Signed-off-by: Xuanwo <[email protected]>
* Fix cap
Signed-off-by: Xuanwo <[email protected]>
* Remove not used deps
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
Cargo.toml | 10 +-
crates/iceberg/Cargo.toml | 3 -
crates/iceberg/src/{io.rs => io/file_io.rs} | 308 +++++++---------------------
crates/iceberg/src/io/mod.rs | 58 ++++++
crates/iceberg/src/io/storage.rs | 122 +++++++++++
crates/iceberg/src/io/storage_fs.rs | 49 +++++
crates/iceberg/src/io/storage_s3.rs | 100 +++++++++
7 files changed, 413 insertions(+), 237 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index ce4d300..e51d24f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,11 +18,11 @@
[workspace]
resolver = "2"
members = [
- "crates/catalog/*",
- "crates/examples",
- "crates/iceberg",
- "crates/integrations/*",
- "crates/test_utils",
+ "crates/catalog/*",
+ "crates/examples",
+ "crates/iceberg",
+ "crates/integrations/*",
+ "crates/test_utils",
]
[workspace.package]
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index c43f54f..b610680 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -56,12 +56,10 @@ bitvec = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
-either = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
-log = { workspace = true }
murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
@@ -78,7 +76,6 @@ serde_with = { workspace = true }
tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
-urlencoding = { workspace = true }
uuid = { workspace = true }
[dev-dependencies]
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io/file_io.rs
similarity index 66%
rename from crates/iceberg/src/io.rs
rename to crates/iceberg/src/io/file_io.rs
index c045b22..54b2cd4 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io/file_io.rs
@@ -15,71 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-//! File io implementation.
-//!
-//! # How to build `FileIO`
-//!
-//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
-//! ```rust
-//! use iceberg::io::{FileIOBuilder, S3_REGION};
-//!
-//! let file_io = FileIOBuilder::new("s3")
-//! .with_prop(S3_REGION, "us-east-1")
-//! .build()
-//! .unwrap();
-//! ```
-//!
-//! Or you can pass a path to ask `FileIO` to infer schema for you:
-//! ```rust
-//! use iceberg::io::{FileIO, S3_REGION};
-//! let file_io = FileIO::from_path("s3://bucket/a")
-//! .unwrap()
-//! .with_prop(S3_REGION, "us-east-1")
-//! .build()
-//! .unwrap();
-//! ```
-//!
-//! # How to use `FileIO`
-//!
-//! Currently `FileIO` provides simple methods for file operations:
-//!
-//! - `delete`: Delete file.
-//! - `is_exist`: Check if file exists.
-//! - `new_input`: Create input file for reading.
-//! - `new_output`: Create output file for writing.
-
+use super::storage::Storage;
+use crate::{Error, ErrorKind, Result};
use bytes::Bytes;
+use opendal::Operator;
+use std::collections::HashMap;
use std::ops::Range;
-use std::{collections::HashMap, sync::Arc};
-
-use crate::{error::Result, Error, ErrorKind};
-use once_cell::sync::Lazy;
-use opendal::{Operator, Scheme};
+use std::sync::Arc;
use url::Url;
-/// Following are arguments for [s3 file
io](https://py.iceberg.apache.org/configuration/#s3).
-/// S3 endopint.
-pub const S3_ENDPOINT: &str = "s3.endpoint";
-/// S3 access key id.
-pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
-/// S3 secret access key.
-pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
-/// S3 region.
-pub const S3_REGION: &str = "s3.region";
-
-/// A mapping from iceberg s3 configuration key to [`opendal::Operator`]
configuration key.
-static S3_CONFIG_MAPPING: Lazy<HashMap<&'static str, &'static str>> =
Lazy::new(|| {
- let mut m = HashMap::with_capacity(4);
- m.insert(S3_ENDPOINT, "endpoint");
- m.insert(S3_ACCESS_KEY_ID, "access_key_id");
- m.insert(S3_SECRET_ACCESS_KEY, "secret_access_key");
- m.insert(S3_REGION, "region");
-
- m
-});
-
-const DEFAULT_ROOT_PATH: &str = "/";
-
/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
@@ -91,59 +35,6 @@ pub struct FileIO {
inner: Arc<Storage>,
}
-/// Builder for [`FileIO`].
-#[derive(Debug)]
-pub struct FileIOBuilder {
- /// This is used to infer scheme of operator.
- ///
- /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build)
will build a local file io.
- scheme_str: Option<String>,
- /// Arguments for operator.
- props: HashMap<String, String>,
-}
-
-impl FileIOBuilder {
- /// Creates a new builder with scheme.
- pub fn new(scheme_str: impl ToString) -> Self {
- Self {
- scheme_str: Some(scheme_str.to_string()),
- props: HashMap::default(),
- }
- }
-
- /// Creates a new builder for local file io.
- pub fn new_fs_io() -> Self {
- Self {
- scheme_str: None,
- props: HashMap::default(),
- }
- }
-
- /// Add argument for operator.
- pub fn with_prop(mut self, key: impl ToString, value: impl ToString) ->
Self {
- self.props.insert(key.to_string(), value.to_string());
- self
- }
-
- /// Add argument for operator.
- pub fn with_props(
- mut self,
- args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
- ) -> Self {
- self.props
- .extend(args.into_iter().map(|e| (e.0.to_string(),
e.1.to_string())));
- self
- }
-
- /// Builds [`FileIO`].
- pub fn build(self) -> Result<FileIO> {
- let storage = Storage::build(self)?;
- Ok(FileIO {
- inner: Arc::new(storage),
- })
- }
-}
-
impl FileIO {
/// Try to infer file io scheme from path.
///
@@ -151,7 +42,7 @@ impl FileIO {
/// If it's not a valid url, will try to detect if it's a file path.
///
/// Otherwise will return parsing error.
- pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+ pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
let url = Url::parse(path.as_ref())
.map_err(Error::from)
.or_else(|e| {
@@ -205,6 +96,66 @@ impl FileIO {
}
}
+/// Builder for [`FileIO`].
+#[derive(Debug)]
+pub struct FileIOBuilder {
+ /// This is used to infer scheme of operator.
+ ///
+ /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build)
will build a local file io.
+ scheme_str: Option<String>,
+ /// Arguments for operator.
+ props: HashMap<String, String>,
+}
+
+impl FileIOBuilder {
+ /// Creates a new builder with scheme.
+ pub fn new(scheme_str: impl ToString) -> Self {
+ Self {
+ scheme_str: Some(scheme_str.to_string()),
+ props: HashMap::default(),
+ }
+ }
+
+ /// Creates a new builder for local file io.
+ pub fn new_fs_io() -> Self {
+ Self {
+ scheme_str: None,
+ props: HashMap::default(),
+ }
+ }
+
+ /// Fetch the scheme string.
+ ///
+ /// The scheme_str will be empty if it's None.
+ pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
+ (self.scheme_str.unwrap_or_default(), self.props)
+ }
+
+ /// Add argument for operator.
+ pub fn with_prop(mut self, key: impl ToString, value: impl ToString) ->
Self {
+ self.props.insert(key.to_string(), value.to_string());
+ self
+ }
+
+ /// Add argument for operator.
+ pub fn with_props(
+ mut self,
+ args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
+ ) -> Self {
+ self.props
+ .extend(args.into_iter().map(|e| (e.0.to_string(),
e.1.to_string())));
+ self
+ }
+
+ /// Builds [`FileIO`].
+ pub fn build(self) -> crate::Result<FileIO> {
+ let storage = Storage::build(self)?;
+ Ok(FileIO {
+ inner: Arc::new(storage),
+ })
+ }
+}
+
/// The struct the represents the metadata of a file.
///
/// TODO: we can add last modified time, content type, etc. in the future.
@@ -224,12 +175,12 @@ pub trait FileRead: Send + Unpin + 'static {
/// Read file content with given range.
///
/// TODO: we can support reading non-contiguous bytes in the future.
- async fn read(&self, range: Range<u64>) -> Result<Bytes>;
+ async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}
#[async_trait::async_trait]
impl FileRead for opendal::Reader {
- async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+ async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
Ok(opendal::Reader::read(self, range).await?.to_bytes())
}
}
@@ -251,7 +202,7 @@ impl InputFile {
}
/// Check if file exists.
- pub async fn exists(&self) -> Result<bool> {
+ pub async fn exists(&self) -> crate::Result<bool> {
Ok(self
.op
.is_exist(&self.path[self.relative_path_pos..])
@@ -259,7 +210,7 @@ impl InputFile {
}
/// Fetch and returns metadata of file.
- pub async fn metadata(&self) -> Result<FileMetadata> {
+ pub async fn metadata(&self) -> crate::Result<FileMetadata> {
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
Ok(FileMetadata {
@@ -270,7 +221,7 @@ impl InputFile {
/// Read and returns whole content of file.
///
/// For continues reading, use [`Self::reader`] instead.
- pub async fn read(&self) -> Result<Bytes> {
+ pub async fn read(&self) -> crate::Result<Bytes> {
Ok(self
.op
.read(&self.path[self.relative_path_pos..])
@@ -281,7 +232,7 @@ impl InputFile {
/// Creates [`FileRead`] for continues reading.
///
/// For one-time reading, use [`Self::read`] instead.
- pub async fn reader(&self) -> Result<impl FileRead> {
+ pub async fn reader(&self) -> crate::Result<impl FileRead> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
}
}
@@ -297,21 +248,21 @@ pub trait FileWrite: Send + Unpin + 'static {
/// Write bytes to file.
///
/// TODO: we can support writing non-contiguous bytes in the future.
- async fn write(&mut self, bs: Bytes) -> Result<()>;
+ async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
/// Close file.
///
/// Calling close on closed file will generate an error.
- async fn close(&mut self) -> Result<()>;
+ async fn close(&mut self) -> crate::Result<()>;
}
#[async_trait::async_trait]
impl FileWrite for opendal::Writer {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
Ok(opendal::Writer::write(self, bs).await?)
}
- async fn close(&mut self) -> Result<()> {
+ async fn close(&mut self) -> crate::Result<()> {
Ok(opendal::Writer::close(self).await?)
}
}
@@ -333,7 +284,7 @@ impl OutputFile {
}
/// Checks if file exists.
- pub async fn exists(&self) -> Result<bool> {
+ pub async fn exists(&self) -> crate::Result<bool> {
Ok(self
.op
.is_exist(&self.path[self.relative_path_pos..])
@@ -355,7 +306,7 @@ impl OutputFile {
///
/// Calling `write` will overwrite the file if it exists.
/// For continues writing, use [`Self::writer`].
- pub async fn write(&self, bs: Bytes) -> Result<()> {
+ pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
let mut writer = self.writer().await?;
writer.write(bs).await?;
writer.close().await
@@ -366,114 +317,13 @@ impl OutputFile {
/// # Notes
///
/// For one-time writing, use [`Self::write`] instead.
- pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
+ pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
}
}
-// We introduce this because I don't want to handle unsupported `Scheme` in
every method.
-#[derive(Debug)]
-enum Storage {
- LocalFs {
- op: Operator,
- },
- S3 {
- scheme_str: String,
- props: HashMap<String, String>,
- },
-}
-
-impl Storage {
- /// Creates operator from path.
- ///
- /// # Arguments
- ///
- /// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
- ///
- /// # Returns
- ///
- /// The return value consists of two parts:
- ///
- /// * An [`opendal::Operator`] instance used to operate on file.
- /// * Relative path to the root uri of [`opendal::Operator`].
- ///
- fn create_operator<'a>(&self, path: &'a impl AsRef<str>) ->
Result<(Operator, &'a str)> {
- let path = path.as_ref();
- match self {
- Storage::LocalFs { op } => {
- if let Some(stripped) = path.strip_prefix("file:/") {
- Ok((op.clone(), stripped))
- } else {
- Ok((op.clone(), &path[1..]))
- }
- }
- Storage::S3 { scheme_str, props } => {
- let mut props = props.clone();
- let url = Url::parse(path)?;
- let bucket = url.host_str().ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid s3 url: {}, missing bucket", path),
- )
- })?;
-
- props.insert("bucket".to_string(), bucket.to_string());
-
- let prefix = format!("{}://{}/", scheme_str, bucket);
- if path.starts_with(&prefix) {
- Ok((Operator::via_map(Scheme::S3, props)?,
&path[prefix.len()..]))
- } else {
- Err(Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid s3 url: {}, should start with {}",
path, prefix),
- ))
- }
- }
- }
- }
-
- /// Parse scheme.
- fn parse_scheme(scheme: &str) -> Result<Scheme> {
- match scheme {
- "file" | "" => Ok(Scheme::Fs),
- "s3" | "s3a" => Ok(Scheme::S3),
- s => Ok(s.parse::<Scheme>()?),
- }
- }
-
- /// Convert iceberg config to opendal config.
- fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
- let scheme_str = file_io_builder.scheme_str.unwrap_or("".to_string());
- let scheme = Self::parse_scheme(&scheme_str)?;
- let mut new_props = HashMap::default();
- new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
-
- match scheme {
- Scheme::Fs => Ok(Self::LocalFs {
- op: Operator::via_map(Scheme::Fs, new_props)?,
- }),
- Scheme::S3 => {
- for prop in file_io_builder.props {
- if let Some(op_key) =
S3_CONFIG_MAPPING.get(prop.0.as_str()) {
- new_props.insert(op_key.to_string(), prop.1);
- }
- }
-
- Ok(Self::S3 {
- scheme_str,
- props: new_props,
- })
- }
- _ => Err(Error::new(
- ErrorKind::FeatureUnsupported,
- format!("Constructing file io from scheme: {scheme} not
supported now",),
- )),
- }
- }
-}
-
#[cfg(test)]
mod tests {
use std::io::Write;
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
new file mode 100644
index 0000000..6da605d
--- /dev/null
+++ b/crates/iceberg/src/io/mod.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! File io implementation.
+//!
+//! # How to build `FileIO`
+//!
+//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
+//! ```rust
+//! use iceberg::io::{FileIOBuilder, S3_REGION};
+//!
+//! let file_io = FileIOBuilder::new("s3")
+//! .with_prop(S3_REGION, "us-east-1")
+//! .build()
+//! .unwrap();
+//! ```
+//!
+//! Or you can pass a path to ask `FileIO` to infer schema for you:
+//! ```rust
+//! use iceberg::io::{FileIO, S3_REGION};
+//! let file_io = FileIO::from_path("s3://bucket/a")
+//! .unwrap()
+//! .with_prop(S3_REGION, "us-east-1")
+//! .build()
+//! .unwrap();
+//! ```
+//!
+//! # How to use `FileIO`
+//!
+//! Currently `FileIO` provides simple methods for file operations:
+//!
+//! - `delete`: Delete file.
+//! - `is_exist`: Check if file exists.
+//! - `new_input`: Create input file for reading.
+//! - `new_output`: Create output file for writing.
+
+mod file_io;
+pub use file_io::*;
+
+mod storage;
+mod storage_s3;
+pub use storage_s3::*;
+mod storage_fs;
+use storage_fs::*;
diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs
new file mode 100644
index 0000000..b188c29
--- /dev/null
+++ b/crates/iceberg/src/io/storage.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{FileIOBuilder, FsConfig, S3Config};
+use crate::{Error, ErrorKind};
+use opendal::{Operator, Scheme};
+
+/// The storage carries all supported storage services in iceberg
+#[derive(Debug)]
+pub(crate) enum Storage {
+ LocalFs {
+ config: FsConfig,
+ },
+ S3 {
+ /// s3 storage could have `s3://` and `s3a://`.
+ /// Storing the scheme string here to return the correct path.
+ scheme_str: String,
+ config: S3Config,
+ },
+}
+
+impl Storage {
+ /// Convert iceberg config to opendal config.
+ pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self>
{
+ let (scheme_str, props) = file_io_builder.into_parts();
+ let scheme = Self::parse_scheme(&scheme_str)?;
+
+ match scheme {
+ Scheme::Fs => Ok(Self::LocalFs {
+ config: FsConfig::new(props),
+ }),
+ Scheme::S3 => Ok(Self::S3 {
+ scheme_str,
+ config: S3Config::new(props),
+ }),
+ _ => Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ format!("Constructing file io from scheme: {scheme} not
supported now",),
+ )),
+ }
+ }
+
+ /// Creates operator from path.
+ ///
+ /// # Arguments
+ ///
+ /// * path: It should be *absolute* path starting with scheme string used
to construct [`FileIO`].
+ ///
+ /// # Returns
+ ///
+ /// The return value consists of two parts:
+ ///
+ /// * An [`opendal::Operator`] instance used to operate on file.
+ /// * Relative path to the root uri of [`opendal::Operator`].
+ ///
+ pub(crate) fn create_operator<'a>(
+ &self,
+ path: &'a impl AsRef<str>,
+ ) -> crate::Result<(Operator, &'a str)> {
+ let path = path.as_ref();
+ match self {
+ Storage::LocalFs { config } => {
+ let op = config.build(path)?;
+
+ if let Some(stripped) = path.strip_prefix("file:/") {
+ Ok((op, stripped))
+ } else {
+ Ok((op, &path[1..]))
+ }
+ }
+ Storage::S3 { scheme_str, config } => {
+ let op = config.build(path)?;
+ let op_info = op.info();
+
+ // Check prefix of s3 path.
+ let prefix = format!("{}://{}/", scheme_str, op_info.name());
+ if path.starts_with(&prefix) {
+ Ok((op, &path[prefix.len()..]))
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid s3 url: {}, should start with {}",
path, prefix),
+ ))
+ }
+ }
+ }
+ }
+
+ /// Parse scheme.
+ fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
+ match scheme {
+ "file" | "" => Ok(Scheme::Fs),
+ "s3" | "s3a" => Ok(Scheme::S3),
+ s => Ok(s.parse::<Scheme>()?),
+ }
+ }
+}
+
+/// redact_secret will redact the secret part of the string.
+#[inline]
+pub(crate) fn redact_secret(s: &str) -> String {
+ let len = s.len();
+ if len <= 6 {
+ return "***".to_string();
+ }
+
+ format!("{}***{}", &s[0..3], &s[len - 3..len])
+}
diff --git a/crates/iceberg/src/io/storage_fs.rs
b/crates/iceberg/src/io/storage_fs.rs
new file mode 100644
index 0000000..38c3fa1
--- /dev/null
+++ b/crates/iceberg/src/io/storage_fs.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Result;
+use opendal::{Operator, Scheme};
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+
+/// # TODO
+///
+/// opendal has a plan to introduce native config support.
+/// We manually parse the config here and those code will be finally removed.
+#[derive(Default, Clone)]
+pub(crate) struct FsConfig {}
+
+impl Debug for FsConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("FsConfig").finish()
+ }
+}
+
+impl FsConfig {
+ /// Decode from iceberg props.
+ pub fn new(_: HashMap<String, String>) -> Self {
+ Self::default()
+ }
+
+ /// Build new opendal operator from give path.
+ ///
+ /// fs always build from `/`
+ pub fn build(&self, _: &str) -> Result<Operator> {
+ let m = HashMap::from_iter([("root".to_string(), "/".to_string())]);
+ Ok(Operator::via_map(Scheme::Fs, m)?)
+ }
+}
diff --git a/crates/iceberg/src/io/storage_s3.rs
b/crates/iceberg/src/io/storage_s3.rs
new file mode 100644
index 0000000..d001e06
--- /dev/null
+++ b/crates/iceberg/src/io/storage_s3.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::storage::redact_secret;
+use crate::{Error, ErrorKind, Result};
+use opendal::{Operator, Scheme};
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use url::Url;
+
+/// Following are arguments for [s3 file
io](https://py.iceberg.apache.org/configuration/#s3).
+/// S3 endpoint.
+pub const S3_ENDPOINT: &str = "s3.endpoint";
+/// S3 access key id.
+pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
+/// S3 secret access key.
+pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
+/// S3 region.
+pub const S3_REGION: &str = "s3.region";
+
+/// # TODO
+///
+/// opendal has a plan to introduce native config support.
+/// We manually parse the config here and those code will be finally removed.
+#[derive(Default, Clone)]
+pub(crate) struct S3Config {
+ pub endpoint: String,
+ pub access_key_id: String,
+ pub secret_access_key: String,
+ pub region: String,
+}
+
+impl Debug for S3Config {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("S3Config")
+ .field("endpoint", &self.endpoint)
+ .field("region", &self.region)
+ .field("access_key_id", &redact_secret(&self.access_key_id))
+ .field("secret_access_key",
&redact_secret(&self.secret_access_key))
+ .finish()
+ }
+}
+
+impl S3Config {
+ /// Decode from iceberg props.
+ pub fn new(m: HashMap<String, String>) -> Self {
+ let mut cfg = Self::default();
+ if let Some(endpoint) = m.get(S3_ENDPOINT) {
+ cfg.endpoint = endpoint.clone();
+ };
+ if let Some(access_key_id) = m.get(S3_ACCESS_KEY_ID) {
+ cfg.access_key_id = access_key_id.clone();
+ };
+ if let Some(secret_access_key) = m.get(S3_SECRET_ACCESS_KEY) {
+ cfg.secret_access_key = secret_access_key.clone();
+ };
+ if let Some(region) = m.get(S3_REGION) {
+ cfg.region = region.clone();
+ };
+
+ cfg
+ }
+
+ /// Build new opendal operator from give path.
+ pub fn build(&self, path: &str) -> Result<Operator> {
+ let url = Url::parse(path)?;
+ let bucket = url.host_str().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid s3 url: {}, missing bucket", path),
+ )
+ })?;
+
+ let mut m = HashMap::with_capacity(5);
+ m.insert("bucket".to_string(), bucket.to_string());
+ m.insert("endpoint".to_string(), self.endpoint.clone());
+ m.insert("access_key_id".to_string(), self.access_key_id.clone());
+ m.insert(
+ "secret_access_key".to_string(),
+ self.secret_access_key.clone(),
+ );
+ m.insert("region".to_string(), self.region.clone());
+
+ Ok(Operator::via_map(Scheme::S3, m)?)
+ }
+}