This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8248c91 refactor(iceberg): Remove an extra config parse logic (#499)
8248c91 is described below
commit 8248c919c50ea7f36f2af0dd8a4699c7574f88b2
Author: Xuanwo <[email protected]>
AuthorDate: Mon Jul 29 22:18:53 2024 +0800
refactor(iceberg): Remove an extra config parse logic (#499)
* refactor(iceberg): Remove an extra config parse logic
Signed-off-by: Xuanwo <[email protected]>
* Format toml
Signed-off-by: Xuanwo <[email protected]>
* reduce some allocs
Signed-off-by: Xuanwo <[email protected]>
* Cleanup more
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
crates/iceberg/src/io/storage.rs | 47 ++++++-----------
crates/iceberg/src/io/storage_fs.rs | 38 +++----------
crates/iceberg/src/io/storage_memory.rs | 28 ++--------
crates/iceberg/src/io/storage_s3.rs | 94 +++++++++++----------------------
4 files changed, 56 insertions(+), 151 deletions(-)
diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs
index 7383b8f..d13ac04 100644
--- a/crates/iceberg/src/io/storage.rs
+++ b/crates/iceberg/src/io/storage.rs
@@ -15,30 +15,28 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
+#[cfg(feature = "storage-s3")]
+use opendal::services::S3Config;
use opendal::{Operator, Scheme};
use super::FileIOBuilder;
-#[cfg(feature = "storage-fs")]
-use super::FsConfig;
-#[cfg(feature = "storage-memory")]
-use super::MemoryConfig;
-#[cfg(feature = "storage-s3")]
-use super::S3Config;
use crate::{Error, ErrorKind};
/// The storage carries all supported storage services in iceberg
#[derive(Debug)]
pub(crate) enum Storage {
#[cfg(feature = "storage-memory")]
- Memory { config: MemoryConfig },
+ Memory,
#[cfg(feature = "storage-fs")]
- LocalFs { config: FsConfig },
+ LocalFs,
#[cfg(feature = "storage-s3")]
S3 {
/// s3 storage could have `s3://` and `s3a://`.
/// Storing the scheme string here to return the correct path.
scheme_str: String,
- config: S3Config,
+ config: Arc<S3Config>,
},
}
@@ -50,17 +48,13 @@ impl Storage {
match scheme {
#[cfg(feature = "storage-memory")]
- Scheme::Memory => Ok(Self::Memory {
- config: MemoryConfig::new(props),
- }),
+ Scheme::Memory => Ok(Self::Memory),
#[cfg(feature = "storage-fs")]
- Scheme::Fs => Ok(Self::LocalFs {
- config: FsConfig::new(props),
- }),
+ Scheme::Fs => Ok(Self::LocalFs),
#[cfg(feature = "storage-s3")]
Scheme::S3 => Ok(Self::S3 {
scheme_str,
- config: S3Config::new(props),
+ config: super::s3_config_parse(props).into(),
}),
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
@@ -88,8 +82,8 @@ impl Storage {
let path = path.as_ref();
match self {
#[cfg(feature = "storage-memory")]
- Storage::Memory { config } => {
- let op = config.build(path)?;
+ Storage::Memory => {
+ let op = super::memory_config_build()?;
if let Some(stripped) = path.strip_prefix("memory:/") {
Ok((op, stripped))
@@ -98,8 +92,8 @@ impl Storage {
}
}
#[cfg(feature = "storage-fs")]
- Storage::LocalFs { config } => {
- let op = config.build(path)?;
+ Storage::LocalFs => {
+ let op = super::fs_config_build()?;
if let Some(stripped) = path.strip_prefix("file:/") {
Ok((op, stripped))
@@ -109,7 +103,7 @@ impl Storage {
}
#[cfg(feature = "storage-s3")]
Storage::S3 { scheme_str, config } => {
- let op = config.build(path)?;
+ let op = super::s3_config_build(config, path)?;
let op_info = op.info();
// Check prefix of s3 path.
@@ -141,14 +135,3 @@ impl Storage {
}
}
}
-
-/// redact_secret will redact the secret part of the string.
-#[inline]
-pub(crate) fn redact_secret(s: &str) -> String {
- let len = s.len();
- if len <= 6 {
- return "***".to_string();
- }
-
- format!("{}***{}", &s[0..3], &s[len - 3..len])
-}
diff --git a/crates/iceberg/src/io/storage_fs.rs
b/crates/iceberg/src/io/storage_fs.rs
index 1942156..ff38d76 100644
--- a/crates/iceberg/src/io/storage_fs.rs
+++ b/crates/iceberg/src/io/storage_fs.rs
@@ -15,39 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
-
-use opendal::{Operator, Scheme};
+use opendal::services::FsConfig;
+use opendal::Operator;
use crate::Result;
-/// # TODO
-///
-/// opendal has a plan to introduce native config support.
-/// We manually parse the config here and those code will be finally removed.
-#[derive(Default, Clone)]
-pub(crate) struct FsConfig {}
-
-impl Debug for FsConfig {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("FsConfig").finish()
- }
-}
-
-impl FsConfig {
- /// Decode from iceberg props.
- pub fn new(_: HashMap<String, String>) -> Self {
- Self::default()
- }
+/// Build new opendal operator from give path.
+pub(crate) fn fs_config_build() -> Result<Operator> {
+ let mut cfg = FsConfig::default();
+ cfg.root = Some("/".to_string());
- /// Build new opendal operator from give path.
- ///
- /// fs always build from `/`
- pub fn build(&self, _: &str) -> Result<Operator> {
- Ok(Operator::via_iter(Scheme::Fs, [(
- "root".to_string(),
- "/".to_string(),
- )])?)
- }
+ Ok(Operator::from_config(cfg)?.finish())
}
diff --git a/crates/iceberg/src/io/storage_memory.rs
b/crates/iceberg/src/io/storage_memory.rs
index e160c23..ffc082d 100644
--- a/crates/iceberg/src/io/storage_memory.rs
+++ b/crates/iceberg/src/io/storage_memory.rs
@@ -15,31 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
-
-use opendal::{Operator, Scheme};
+use opendal::services::MemoryConfig;
+use opendal::Operator;
use crate::Result;
-#[derive(Default, Clone)]
-pub(crate) struct MemoryConfig {}
-
-impl Debug for MemoryConfig {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("MemoryConfig").finish()
- }
-}
-
-impl MemoryConfig {
- /// Decode from iceberg props.
- pub fn new(_: HashMap<String, String>) -> Self {
- Self::default()
- }
-
- /// Build new opendal operator from given path.
- pub fn build(&self, _: &str) -> Result<Operator> {
- let m = HashMap::new();
- Ok(Operator::via_iter(Scheme::Memory, m)?)
- }
+pub(crate) fn memory_config_build() -> Result<Operator> {
+ Ok(Operator::from_config(MemoryConfig::default())?.finish())
}
diff --git a/crates/iceberg/src/io/storage_s3.rs
b/crates/iceberg/src/io/storage_s3.rs
index 9332f65..acce18e 100644
--- a/crates/iceberg/src/io/storage_s3.rs
+++ b/crates/iceberg/src/io/storage_s3.rs
@@ -16,12 +16,11 @@
// under the License.
use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
-use opendal::{Operator, Scheme};
+use opendal::services::S3Config;
+use opendal::Operator;
use url::Url;
-use crate::io::storage::redact_secret;
use crate::{Error, ErrorKind, Result};
/// Following are arguments for [s3 file
io](https://py.iceberg.apache.org/configuration/#s3).
@@ -34,69 +33,36 @@ pub const S3_SECRET_ACCESS_KEY: &str =
"s3.secret-access-key";
/// S3 region.
pub const S3_REGION: &str = "s3.region";
-/// # TODO
-///
-/// opendal has a plan to introduce native config support.
-/// We manually parse the config here and those code will be finally removed.
-#[derive(Default, Clone)]
-pub(crate) struct S3Config {
- pub endpoint: String,
- pub access_key_id: String,
- pub secret_access_key: String,
- pub region: String,
-}
+/// Parse iceberg props to s3 config.
+pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> S3Config {
+ let mut cfg = S3Config::default();
+ if let Some(endpoint) = m.remove(S3_ENDPOINT) {
+ cfg.endpoint = Some(endpoint);
+ };
+ if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
+ cfg.access_key_id = Some(access_key_id);
+ };
+ if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
+ cfg.secret_access_key = Some(secret_access_key);
+ };
+ if let Some(region) = m.remove(S3_REGION) {
+ cfg.region = Some(region);
+ };
-impl Debug for S3Config {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("S3Config")
- .field("endpoint", &self.endpoint)
- .field("region", &self.region)
- .field("access_key_id", &redact_secret(&self.access_key_id))
- .field("secret_access_key",
&redact_secret(&self.secret_access_key))
- .finish()
- }
+ cfg
}
-impl S3Config {
- /// Decode from iceberg props.
- pub fn new(m: HashMap<String, String>) -> Self {
- let mut cfg = Self::default();
- if let Some(endpoint) = m.get(S3_ENDPOINT) {
- cfg.endpoint = endpoint.clone();
- };
- if let Some(access_key_id) = m.get(S3_ACCESS_KEY_ID) {
- cfg.access_key_id = access_key_id.clone();
- };
- if let Some(secret_access_key) = m.get(S3_SECRET_ACCESS_KEY) {
- cfg.secret_access_key = secret_access_key.clone();
- };
- if let Some(region) = m.get(S3_REGION) {
- cfg.region = region.clone();
- };
-
- cfg
- }
-
- /// Build new opendal operator from give path.
- pub fn build(&self, path: &str) -> Result<Operator> {
- let url = Url::parse(path)?;
- let bucket = url.host_str().ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid s3 url: {}, missing bucket", path),
- )
- })?;
-
- let mut m = HashMap::with_capacity(5);
- m.insert("bucket".to_string(), bucket.to_string());
- m.insert("endpoint".to_string(), self.endpoint.clone());
- m.insert("access_key_id".to_string(), self.access_key_id.clone());
- m.insert(
- "secret_access_key".to_string(),
- self.secret_access_key.clone(),
- );
- m.insert("region".to_string(), self.region.clone());
+/// Build new opendal operator from give path.
+pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result<Operator> {
+ let url = Url::parse(path)?;
+ let bucket = url.host_str().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid s3 url: {}, missing bucket", path),
+ )
+ })?;
- Ok(Operator::via_iter(Scheme::S3, m)?)
- }
+ let mut cfg = cfg.clone();
+ cfg.bucket = bucket.to_string();
+ Ok(Operator::from_config(cfg)?.finish())
}