This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8562708  refactor(io): Split io into smaller mods (#438)
8562708 is described below

commit 85627083f0791629be3d73413af936165ebd38eb
Author: Xuanwo <[email protected]>
AuthorDate: Fri Jul 5 21:27:01 2024 +0800

    refactor(io): Split io into smaller mods (#438)
    
    * refactor(io): Split io into smaller mods
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Format
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix cap
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Remove not used deps
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 Cargo.toml                                  |  10 +-
 crates/iceberg/Cargo.toml                   |   3 -
 crates/iceberg/src/{io.rs => io/file_io.rs} | 308 +++++++---------------------
 crates/iceberg/src/io/mod.rs                |  58 ++++++
 crates/iceberg/src/io/storage.rs            | 122 +++++++++++
 crates/iceberg/src/io/storage_fs.rs         |  49 +++++
 crates/iceberg/src/io/storage_s3.rs         | 100 +++++++++
 7 files changed, 413 insertions(+), 237 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index ce4d300..e51d24f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,11 +18,11 @@
 [workspace]
 resolver = "2"
 members = [
-    "crates/catalog/*",
-    "crates/examples",
-    "crates/iceberg",
-    "crates/integrations/*",
-    "crates/test_utils",
+  "crates/catalog/*",
+  "crates/examples",
+  "crates/iceberg",
+  "crates/integrations/*",
+  "crates/test_utils",
 ]
 
 [workspace.package]
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index c43f54f..b610680 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -56,12 +56,10 @@ bitvec = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
 derive_builder = { workspace = true }
-either = { workspace = true }
 fnv = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
 lazy_static = { workspace = true }
-log = { workspace = true }
 murmur3 = { workspace = true }
 once_cell = { workspace = true }
 opendal = { workspace = true }
@@ -78,7 +76,6 @@ serde_with = { workspace = true }
 tokio = { workspace = true, optional = true }
 typed-builder = { workspace = true }
 url = { workspace = true }
-urlencoding = { workspace = true }
 uuid = { workspace = true }
 
 [dev-dependencies]
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io/file_io.rs
similarity index 66%
rename from crates/iceberg/src/io.rs
rename to crates/iceberg/src/io/file_io.rs
index c045b22..54b2cd4 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io/file_io.rs
@@ -15,71 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! File io implementation.
-//!
-//! # How to build `FileIO`
-//!
-//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
-//! ```rust
-//! use iceberg::io::{FileIOBuilder, S3_REGION};
-//!
-//! let file_io = FileIOBuilder::new("s3")
-//!     .with_prop(S3_REGION, "us-east-1")
-//!     .build()
-//!     .unwrap();
-//! ```
-//!
-//! Or you can pass a path to ask `FileIO` to infer schema for you:
-//! ```rust
-//! use iceberg::io::{FileIO, S3_REGION};
-//! let file_io = FileIO::from_path("s3://bucket/a")
-//!     .unwrap()
-//!     .with_prop(S3_REGION, "us-east-1")
-//!     .build()
-//!     .unwrap();
-//! ```
-//!
-//! # How to use `FileIO`
-//!
-//! Currently `FileIO` provides simple methods for file operations:
-//!
-//! - `delete`: Delete file.
-//! - `is_exist`: Check if file exists.
-//! - `new_input`: Create input file for reading.
-//! - `new_output`: Create output file for writing.
-
+use super::storage::Storage;
+use crate::{Error, ErrorKind, Result};
 use bytes::Bytes;
+use opendal::Operator;
+use std::collections::HashMap;
 use std::ops::Range;
-use std::{collections::HashMap, sync::Arc};
-
-use crate::{error::Result, Error, ErrorKind};
-use once_cell::sync::Lazy;
-use opendal::{Operator, Scheme};
+use std::sync::Arc;
 use url::Url;
 
-/// Following are arguments for [s3 file 
io](https://py.iceberg.apache.org/configuration/#s3).
-/// S3 endopint.
-pub const S3_ENDPOINT: &str = "s3.endpoint";
-/// S3 access key id.
-pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
-/// S3 secret access key.
-pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
-/// S3 region.
-pub const S3_REGION: &str = "s3.region";
-
-/// A mapping from iceberg s3 configuration key to [`opendal::Operator`] 
configuration key.
-static S3_CONFIG_MAPPING: Lazy<HashMap<&'static str, &'static str>> = 
Lazy::new(|| {
-    let mut m = HashMap::with_capacity(4);
-    m.insert(S3_ENDPOINT, "endpoint");
-    m.insert(S3_ACCESS_KEY_ID, "access_key_id");
-    m.insert(S3_SECRET_ACCESS_KEY, "secret_access_key");
-    m.insert(S3_REGION, "region");
-
-    m
-});
-
-const DEFAULT_ROOT_PATH: &str = "/";
-
 /// FileIO implementation, used to manipulate files in underlying storage.
 ///
 /// # Note
@@ -91,59 +35,6 @@ pub struct FileIO {
     inner: Arc<Storage>,
 }
 
-/// Builder for [`FileIO`].
-#[derive(Debug)]
-pub struct FileIOBuilder {
-    /// This is used to infer scheme of operator.
-    ///
-    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) 
will build a local file io.
-    scheme_str: Option<String>,
-    /// Arguments for operator.
-    props: HashMap<String, String>,
-}
-
-impl FileIOBuilder {
-    /// Creates a new builder with scheme.
-    pub fn new(scheme_str: impl ToString) -> Self {
-        Self {
-            scheme_str: Some(scheme_str.to_string()),
-            props: HashMap::default(),
-        }
-    }
-
-    /// Creates a new builder for local file io.
-    pub fn new_fs_io() -> Self {
-        Self {
-            scheme_str: None,
-            props: HashMap::default(),
-        }
-    }
-
-    /// Add argument for operator.
-    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> 
Self {
-        self.props.insert(key.to_string(), value.to_string());
-        self
-    }
-
-    /// Add argument for operator.
-    pub fn with_props(
-        mut self,
-        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
-    ) -> Self {
-        self.props
-            .extend(args.into_iter().map(|e| (e.0.to_string(), 
e.1.to_string())));
-        self
-    }
-
-    /// Builds [`FileIO`].
-    pub fn build(self) -> Result<FileIO> {
-        let storage = Storage::build(self)?;
-        Ok(FileIO {
-            inner: Arc::new(storage),
-        })
-    }
-}
-
 impl FileIO {
     /// Try to infer file io scheme from path.
     ///
@@ -151,7 +42,7 @@ impl FileIO {
     /// If it's not a valid url, will try to detect if it's a file path.
     ///
     /// Otherwise will return parsing error.
-    pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
         let url = Url::parse(path.as_ref())
             .map_err(Error::from)
             .or_else(|e| {
@@ -205,6 +96,66 @@ impl FileIO {
     }
 }
 
+/// Builder for [`FileIO`].
+#[derive(Debug)]
+pub struct FileIOBuilder {
+    /// This is used to infer scheme of operator.
+    ///
+    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) 
will build a local file io.
+    scheme_str: Option<String>,
+    /// Arguments for operator.
+    props: HashMap<String, String>,
+}
+
+impl FileIOBuilder {
+    /// Creates a new builder with scheme.
+    pub fn new(scheme_str: impl ToString) -> Self {
+        Self {
+            scheme_str: Some(scheme_str.to_string()),
+            props: HashMap::default(),
+        }
+    }
+
+    /// Creates a new builder for local file io.
+    pub fn new_fs_io() -> Self {
+        Self {
+            scheme_str: None,
+            props: HashMap::default(),
+        }
+    }
+
+    /// Fetch the scheme string.
+    ///
+    /// The scheme_str will be empty if it's None.
+    pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
+        (self.scheme_str.unwrap_or_default(), self.props)
+    }
+
+    /// Add argument for operator.
+    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> 
Self {
+        self.props.insert(key.to_string(), value.to_string());
+        self
+    }
+
+    /// Add argument for operator.
+    pub fn with_props(
+        mut self,
+        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
+    ) -> Self {
+        self.props
+            .extend(args.into_iter().map(|e| (e.0.to_string(), 
e.1.to_string())));
+        self
+    }
+
+    /// Builds [`FileIO`].
+    pub fn build(self) -> crate::Result<FileIO> {
+        let storage = Storage::build(self)?;
+        Ok(FileIO {
+            inner: Arc::new(storage),
+        })
+    }
+}
+
 /// The struct the represents the metadata of a file.
 ///
 /// TODO: we can add last modified time, content type, etc. in the future.
@@ -224,12 +175,12 @@ pub trait FileRead: Send + Unpin + 'static {
     /// Read file content with given range.
     ///
     /// TODO: we can support reading non-contiguous bytes in the future.
-    async fn read(&self, range: Range<u64>) -> Result<Bytes>;
+    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
 }
 
 #[async_trait::async_trait]
 impl FileRead for opendal::Reader {
-    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
         Ok(opendal::Reader::read(self, range).await?.to_bytes())
     }
 }
@@ -251,7 +202,7 @@ impl InputFile {
     }
 
     /// Check if file exists.
-    pub async fn exists(&self) -> Result<bool> {
+    pub async fn exists(&self) -> crate::Result<bool> {
         Ok(self
             .op
             .is_exist(&self.path[self.relative_path_pos..])
@@ -259,7 +210,7 @@ impl InputFile {
     }
 
     /// Fetch and returns metadata of file.
-    pub async fn metadata(&self) -> Result<FileMetadata> {
+    pub async fn metadata(&self) -> crate::Result<FileMetadata> {
         let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
 
         Ok(FileMetadata {
@@ -270,7 +221,7 @@ impl InputFile {
     /// Read and returns whole content of file.
     ///
     /// For continues reading, use [`Self::reader`] instead.
-    pub async fn read(&self) -> Result<Bytes> {
+    pub async fn read(&self) -> crate::Result<Bytes> {
         Ok(self
             .op
             .read(&self.path[self.relative_path_pos..])
@@ -281,7 +232,7 @@ impl InputFile {
     /// Creates [`FileRead`] for continues reading.
     ///
     /// For one-time reading, use [`Self::read`] instead.
-    pub async fn reader(&self) -> Result<impl FileRead> {
+    pub async fn reader(&self) -> crate::Result<impl FileRead> {
         Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
     }
 }
@@ -297,21 +248,21 @@ pub trait FileWrite: Send + Unpin + 'static {
     /// Write bytes to file.
     ///
     /// TODO: we can support writing non-contiguous bytes in the future.
-    async fn write(&mut self, bs: Bytes) -> Result<()>;
+    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
 
     /// Close file.
     ///
     /// Calling close on closed file will generate an error.
-    async fn close(&mut self) -> Result<()>;
+    async fn close(&mut self) -> crate::Result<()>;
 }
 
 #[async_trait::async_trait]
 impl FileWrite for opendal::Writer {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
         Ok(opendal::Writer::write(self, bs).await?)
     }
 
-    async fn close(&mut self) -> Result<()> {
+    async fn close(&mut self) -> crate::Result<()> {
         Ok(opendal::Writer::close(self).await?)
     }
 }
@@ -333,7 +284,7 @@ impl OutputFile {
     }
 
     /// Checks if file exists.
-    pub async fn exists(&self) -> Result<bool> {
+    pub async fn exists(&self) -> crate::Result<bool> {
         Ok(self
             .op
             .is_exist(&self.path[self.relative_path_pos..])
@@ -355,7 +306,7 @@ impl OutputFile {
     ///
     /// Calling `write` will overwrite the file if it exists.
     /// For continues writing, use [`Self::writer`].
-    pub async fn write(&self, bs: Bytes) -> Result<()> {
+    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
         let mut writer = self.writer().await?;
         writer.write(bs).await?;
         writer.close().await
@@ -366,114 +317,13 @@ impl OutputFile {
     /// # Notes
     ///
     /// For one-time writing, use [`Self::write`] instead.
-    pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
+    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
         Ok(Box::new(
             self.op.writer(&self.path[self.relative_path_pos..]).await?,
         ))
     }
 }
 
-// We introduce this because I don't want to handle unsupported `Scheme` in 
every method.
-#[derive(Debug)]
-enum Storage {
-    LocalFs {
-        op: Operator,
-    },
-    S3 {
-        scheme_str: String,
-        props: HashMap<String, String>,
-    },
-}
-
-impl Storage {
-    /// Creates operator from path.
-    ///
-    /// # Arguments
-    ///
-    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
-    ///
-    /// # Returns
-    ///
-    /// The return value consists of two parts:
-    ///
-    /// * An [`opendal::Operator`] instance used to operate on file.
-    /// * Relative path to the root uri of [`opendal::Operator`].
-    ///
-    fn create_operator<'a>(&self, path: &'a impl AsRef<str>) -> 
Result<(Operator, &'a str)> {
-        let path = path.as_ref();
-        match self {
-            Storage::LocalFs { op } => {
-                if let Some(stripped) = path.strip_prefix("file:/") {
-                    Ok((op.clone(), stripped))
-                } else {
-                    Ok((op.clone(), &path[1..]))
-                }
-            }
-            Storage::S3 { scheme_str, props } => {
-                let mut props = props.clone();
-                let url = Url::parse(path)?;
-                let bucket = url.host_str().ok_or_else(|| {
-                    Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Invalid s3 url: {}, missing bucket", path),
-                    )
-                })?;
-
-                props.insert("bucket".to_string(), bucket.to_string());
-
-                let prefix = format!("{}://{}/", scheme_str, bucket);
-                if path.starts_with(&prefix) {
-                    Ok((Operator::via_map(Scheme::S3, props)?, 
&path[prefix.len()..]))
-                } else {
-                    Err(Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Invalid s3 url: {}, should start with {}", 
path, prefix),
-                    ))
-                }
-            }
-        }
-    }
-
-    /// Parse scheme.
-    fn parse_scheme(scheme: &str) -> Result<Scheme> {
-        match scheme {
-            "file" | "" => Ok(Scheme::Fs),
-            "s3" | "s3a" => Ok(Scheme::S3),
-            s => Ok(s.parse::<Scheme>()?),
-        }
-    }
-
-    /// Convert iceberg config to opendal config.
-    fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
-        let scheme_str = file_io_builder.scheme_str.unwrap_or("".to_string());
-        let scheme = Self::parse_scheme(&scheme_str)?;
-        let mut new_props = HashMap::default();
-        new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
-
-        match scheme {
-            Scheme::Fs => Ok(Self::LocalFs {
-                op: Operator::via_map(Scheme::Fs, new_props)?,
-            }),
-            Scheme::S3 => {
-                for prop in file_io_builder.props {
-                    if let Some(op_key) = 
S3_CONFIG_MAPPING.get(prop.0.as_str()) {
-                        new_props.insert(op_key.to_string(), prop.1);
-                    }
-                }
-
-                Ok(Self::S3 {
-                    scheme_str,
-                    props: new_props,
-                })
-            }
-            _ => Err(Error::new(
-                ErrorKind::FeatureUnsupported,
-                format!("Constructing file io from scheme: {scheme} not 
supported now",),
-            )),
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::io::Write;
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
new file mode 100644
index 0000000..6da605d
--- /dev/null
+++ b/crates/iceberg/src/io/mod.rs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! File io implementation.
+//!
+//! # How to build `FileIO`
+//!
+//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
+//! ```rust
+//! use iceberg::io::{FileIOBuilder, S3_REGION};
+//!
+//! let file_io = FileIOBuilder::new("s3")
+//!     .with_prop(S3_REGION, "us-east-1")
+//!     .build()
+//!     .unwrap();
+//! ```
+//!
+//! Or you can pass a path to ask `FileIO` to infer schema for you:
+//! ```rust
+//! use iceberg::io::{FileIO, S3_REGION};
+//! let file_io = FileIO::from_path("s3://bucket/a")
+//!     .unwrap()
+//!     .with_prop(S3_REGION, "us-east-1")
+//!     .build()
+//!     .unwrap();
+//! ```
+//!
+//! # How to use `FileIO`
+//!
+//! Currently `FileIO` provides simple methods for file operations:
+//!
+//! - `delete`: Delete file.
+//! - `is_exist`: Check if file exists.
+//! - `new_input`: Create input file for reading.
+//! - `new_output`: Create output file for writing.
+
+mod file_io;
+pub use file_io::*;
+
+mod storage;
+mod storage_s3;
+pub use storage_s3::*;
+mod storage_fs;
+use storage_fs::*;
diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs
new file mode 100644
index 0000000..b188c29
--- /dev/null
+++ b/crates/iceberg/src/io/storage.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{FileIOBuilder, FsConfig, S3Config};
+use crate::{Error, ErrorKind};
+use opendal::{Operator, Scheme};
+
+/// The storage carries all supported storage services in iceberg
+#[derive(Debug)]
+pub(crate) enum Storage {
+    LocalFs {
+        config: FsConfig,
+    },
+    S3 {
+        /// s3 storage could have `s3://` and `s3a://`.
+        /// Storing the scheme string here to return the correct path.
+        scheme_str: String,
+        config: S3Config,
+    },
+}
+
+impl Storage {
+    /// Convert iceberg config to opendal config.
+    pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> 
{
+        let (scheme_str, props) = file_io_builder.into_parts();
+        let scheme = Self::parse_scheme(&scheme_str)?;
+
+        match scheme {
+            Scheme::Fs => Ok(Self::LocalFs {
+                config: FsConfig::new(props),
+            }),
+            Scheme::S3 => Ok(Self::S3 {
+                scheme_str,
+                config: S3Config::new(props),
+            }),
+            _ => Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                format!("Constructing file io from scheme: {scheme} not 
supported now",),
+            )),
+        }
+    }
+
+    /// Creates operator from path.
+    ///
+    /// # Arguments
+    ///
+    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
+    ///
+    /// # Returns
+    ///
+    /// The return value consists of two parts:
+    ///
+    /// * An [`opendal::Operator`] instance used to operate on file.
+    /// * Relative path to the root uri of [`opendal::Operator`].
+    ///
+    pub(crate) fn create_operator<'a>(
+        &self,
+        path: &'a impl AsRef<str>,
+    ) -> crate::Result<(Operator, &'a str)> {
+        let path = path.as_ref();
+        match self {
+            Storage::LocalFs { config } => {
+                let op = config.build(path)?;
+
+                if let Some(stripped) = path.strip_prefix("file:/") {
+                    Ok((op, stripped))
+                } else {
+                    Ok((op, &path[1..]))
+                }
+            }
+            Storage::S3 { scheme_str, config } => {
+                let op = config.build(path)?;
+                let op_info = op.info();
+
+                // Check prefix of s3 path.
+                let prefix = format!("{}://{}/", scheme_str, op_info.name());
+                if path.starts_with(&prefix) {
+                    Ok((op, &path[prefix.len()..]))
+                } else {
+                    Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid s3 url: {}, should start with {}", 
path, prefix),
+                    ))
+                }
+            }
+        }
+    }
+
+    /// Parse scheme.
+    fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
+        match scheme {
+            "file" | "" => Ok(Scheme::Fs),
+            "s3" | "s3a" => Ok(Scheme::S3),
+            s => Ok(s.parse::<Scheme>()?),
+        }
+    }
+}
+
+/// redact_secret will redact the secret part of the string.
+#[inline]
+pub(crate) fn redact_secret(s: &str) -> String {
+    let len = s.len();
+    if len <= 6 {
+        return "***".to_string();
+    }
+
+    format!("{}***{}", &s[0..3], &s[len - 3..len])
+}
diff --git a/crates/iceberg/src/io/storage_fs.rs 
b/crates/iceberg/src/io/storage_fs.rs
new file mode 100644
index 0000000..38c3fa1
--- /dev/null
+++ b/crates/iceberg/src/io/storage_fs.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Result;
+use opendal::{Operator, Scheme};
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+
+/// # TODO
+///
+/// opendal has a plan to introduce native config support.
+/// We manually parse the config here and those code will be finally removed.
+#[derive(Default, Clone)]
+pub(crate) struct FsConfig {}
+
+impl Debug for FsConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("FsConfig").finish()
+    }
+}
+
+impl FsConfig {
+    /// Decode from iceberg props.
+    pub fn new(_: HashMap<String, String>) -> Self {
+        Self::default()
+    }
+
+    /// Build new opendal operator from give path.
+    ///
+    /// fs always build from `/`
+    pub fn build(&self, _: &str) -> Result<Operator> {
+        let m = HashMap::from_iter([("root".to_string(), "/".to_string())]);
+        Ok(Operator::via_map(Scheme::Fs, m)?)
+    }
+}
diff --git a/crates/iceberg/src/io/storage_s3.rs 
b/crates/iceberg/src/io/storage_s3.rs
new file mode 100644
index 0000000..d001e06
--- /dev/null
+++ b/crates/iceberg/src/io/storage_s3.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::storage::redact_secret;
+use crate::{Error, ErrorKind, Result};
+use opendal::{Operator, Scheme};
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use url::Url;
+
+/// Following are arguments for [s3 file 
io](https://py.iceberg.apache.org/configuration/#s3).
+/// S3 endpoint.
+pub const S3_ENDPOINT: &str = "s3.endpoint";
+/// S3 access key id.
+pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
+/// S3 secret access key.
+pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
+/// S3 region.
+pub const S3_REGION: &str = "s3.region";
+
+/// # TODO
+///
+/// opendal has a plan to introduce native config support.
+/// We manually parse the config here and those code will be finally removed.
+#[derive(Default, Clone)]
+pub(crate) struct S3Config {
+    pub endpoint: String,
+    pub access_key_id: String,
+    pub secret_access_key: String,
+    pub region: String,
+}
+
+impl Debug for S3Config {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("S3Config")
+            .field("endpoint", &self.endpoint)
+            .field("region", &self.region)
+            .field("access_key_id", &redact_secret(&self.access_key_id))
+            .field("secret_access_key", 
&redact_secret(&self.secret_access_key))
+            .finish()
+    }
+}
+
+impl S3Config {
+    /// Decode from iceberg props.
+    pub fn new(m: HashMap<String, String>) -> Self {
+        let mut cfg = Self::default();
+        if let Some(endpoint) = m.get(S3_ENDPOINT) {
+            cfg.endpoint = endpoint.clone();
+        };
+        if let Some(access_key_id) = m.get(S3_ACCESS_KEY_ID) {
+            cfg.access_key_id = access_key_id.clone();
+        };
+        if let Some(secret_access_key) = m.get(S3_SECRET_ACCESS_KEY) {
+            cfg.secret_access_key = secret_access_key.clone();
+        };
+        if let Some(region) = m.get(S3_REGION) {
+            cfg.region = region.clone();
+        };
+
+        cfg
+    }
+
+    /// Build new opendal operator from give path.
+    pub fn build(&self, path: &str) -> Result<Operator> {
+        let url = Url::parse(path)?;
+        let bucket = url.host_str().ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Invalid s3 url: {}, missing bucket", path),
+            )
+        })?;
+
+        let mut m = HashMap::with_capacity(5);
+        m.insert("bucket".to_string(), bucket.to_string());
+        m.insert("endpoint".to_string(), self.endpoint.clone());
+        m.insert("access_key_id".to_string(), self.access_key_id.clone());
+        m.insert(
+            "secret_access_key".to_string(),
+            self.secret_access_key.clone(),
+        );
+        m.insert("region".to_string(), self.region.clone());
+
+        Ok(Operator::via_map(Scheme::S3, m)?)
+    }
+}

Reply via email to