This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2ef54656e feat(services): Implement ConfigDeserializer and add
S3Config as example (#3490)
2ef54656e is described below
commit 2ef54656edbbc883705a5eb376bd831911021503
Author: Xuanwo <[email protected]>
AuthorDate: Mon Nov 6 21:25:23 2023 +0800
feat(services): Implement ConfigDeserializer and add S3Config as example
(#3490)
* feat(services): Implement ConfigDeserializer and add S3Config as example
Signed-off-by: Xuanwo <[email protected]>
* FIx on support
Signed-off-by: Xuanwo <[email protected]>
* Add test case
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/serde_util.rs | 379 ++++++++++++++++++++++++++++++++++++++++
core/src/services/mod.rs | 2 +
core/src/services/s3/backend.rs | 335 ++++++++++++++++++++++-------------
core/src/services/s3/mod.rs | 1 +
4 files changed, 596 insertions(+), 121 deletions(-)
diff --git a/core/src/raw/serde_util.rs b/core/src/raw/serde_util.rs
index 5d04ede15..5edde1dde 100644
--- a/core/src/raw/serde_util.rs
+++ b/core/src/raw/serde_util.rs
@@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use serde::de::value::{MapDeserializer, SeqDeserializer};
+use serde::de::{self, Deserializer, IntoDeserializer, Visitor};
+use std::collections::hash_map::IntoIter;
+use std::collections::HashMap;
+use std::iter::empty;
+
use crate::*;
/// Parse xml deserialize error into opendal::Error.
@@ -31,3 +37,376 @@ pub fn new_json_serialize_error(e: serde_json::Error) ->
Error {
pub fn new_json_deserialize_error(e: serde_json::Error) -> Error {
Error::new(ErrorKind::Unexpected, "deserialize json").set_source(e)
}
+
+/// ConfigDeserializer is used to deserialize given configs from
`HashMap<String, String>`.
+///
+/// This is only used by our services config.
+pub struct ConfigDeserializer(MapDeserializer<'static, Pairs,
de::value::Error>);
+
+impl ConfigDeserializer {
+ /// Create a new config deserializer.
+ pub fn new(map: HashMap<String, String>) -> Self {
+ let pairs = Pairs(map.into_iter());
+ Self(MapDeserializer::new(pairs))
+ }
+}
+
+impl<'de> Deserializer<'de> for ConfigDeserializer {
+ type Error = de::value::Error;
+
+ fn deserialize_any<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ self.deserialize_map(visitor)
+ }
+
+ fn deserialize_map<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ visitor.visit_map(self.0)
+ }
+
+ serde::forward_to_deserialize_any! {
+ bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit seq
+ bytes byte_buf unit_struct tuple_struct
+ identifier tuple ignored_any option newtype_struct enum
+ struct
+ }
+}
+
+/// Pairs is used to implement Iterator to meet the requirement of
[`MapDeserializer`].
+struct Pairs(IntoIter<String, String>);
+
+impl Iterator for Pairs {
+ type Item = (String, Pair);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.next().map(|(k, v)| (k.to_lowercase(), Pair(k, v)))
+ }
+}
+
+/// Pair is used to hold both key and value of a config for better error
output.
+struct Pair(String, String);
+
+impl<'de> IntoDeserializer<'de, de::value::Error> for Pair {
+ type Deserializer = Self;
+
+ fn into_deserializer(self) -> Self::Deserializer {
+ self
+ }
+}
+
+impl<'de> Deserializer<'de> for Pair {
+ type Error = de::value::Error;
+
+ fn deserialize_any<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ self.1.into_deserializer().deserialize_any(visitor)
+ }
+
+ fn deserialize_bool<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.to_lowercase().as_str() {
+ "true" | "on" =>
true.into_deserializer().deserialize_bool(visitor),
+ "false" | "off" =>
false.into_deserializer().deserialize_bool(visitor),
+ _ => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, "invalid bool value"
+ ))),
+ }
+ }
+
+ fn deserialize_i8<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<i8>() {
+ Ok(val) => val.into_deserializer().deserialize_i8(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_i16<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<i16>() {
+ Ok(val) => val.into_deserializer().deserialize_i16(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_i32<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<i32>() {
+ Ok(val) => val.into_deserializer().deserialize_i32(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_i64<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<i64>() {
+ Ok(val) => val.into_deserializer().deserialize_i64(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_u8<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<u8>() {
+ Ok(val) => val.into_deserializer().deserialize_u8(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_u16<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<u16>() {
+ Ok(val) => val.into_deserializer().deserialize_u16(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_u32<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<u32>() {
+ Ok(val) => val.into_deserializer().deserialize_u32(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_u64<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<u64>() {
+ Ok(val) => val.into_deserializer().deserialize_u64(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_f32<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<f32>() {
+ Ok(val) => val.into_deserializer().deserialize_f32(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_f64<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ match self.1.parse::<f64>() {
+ Ok(val) => val.into_deserializer().deserialize_f64(visitor),
+ Err(e) => Err(de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))),
+ }
+ }
+
+ fn deserialize_option<V>(self, visitor: V) ->
std::result::Result<V::Value, Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ if self.1.is_empty() {
+ visitor.visit_none()
+ } else {
+ visitor.visit_some(self)
+ }
+ }
+
+ fn deserialize_seq<V>(self, visitor: V) -> std::result::Result<V::Value,
Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ // Return empty instead of `[""]`.
+ if self.1.is_empty() {
+ SeqDeserializer::new(empty::<Pair>())
+ .deserialize_seq(visitor)
+ .map_err(|e| {
+ de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))
+ })
+ } else {
+ let values = self
+ .1
+ .split(',')
+ .map(|v| Pair(self.0.clone(), v.trim().to_owned()));
+ SeqDeserializer::new(values)
+ .deserialize_seq(visitor)
+ .map_err(|e| {
+ de::Error::custom(format_args!(
+ "parse config '{}' with value '{}' failed for {:?}",
+ self.0, self.1, e
+ ))
+ })
+ }
+ }
+
+ serde::forward_to_deserialize_any! {
+ char str string unit newtype_struct enum
+ bytes byte_buf map unit_struct tuple_struct
+ identifier tuple ignored_any
+ struct
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use serde::Deserialize;
+
+ #[derive(Debug, Default, Deserialize, Eq, PartialEq)]
+ #[serde(default)]
+ #[non_exhaustive]
+ pub struct TestConfig {
+ bool_value: bool,
+ bool_option_value_none: Option<bool>,
+ bool_option_value_some: Option<bool>,
+ bool_value_with_on: bool,
+ bool_value_with_off: bool,
+
+ string_value: String,
+ string_option_value_none: Option<String>,
+ string_option_value_some: Option<String>,
+
+ u8_value: u8,
+ u16_value: u16,
+ u32_value: u32,
+ u64_value: u64,
+ i8_value: i8,
+ i16_value: i16,
+ i32_value: i32,
+ i64_value: i64,
+
+ vec_value: Vec<String>,
+ vec_value_two: Vec<String>,
+ vec_none: Option<Vec<String>>,
+ vec_empty: Vec<String>,
+ }
+
+ #[test]
+ fn test_config_deserializer() {
+ let mut map = HashMap::new();
+ map.insert("bool_value", "true");
+ map.insert("bool_option_value_none", "");
+ map.insert("bool_option_value_some", "false");
+ map.insert("bool_value_with_on", "on");
+ map.insert("bool_value_with_off", "off");
+ map.insert("string_value", "hello");
+ map.insert("string_option_value_none", "");
+ map.insert("string_option_value_some", "hello");
+ map.insert("u8_value", "8");
+ map.insert("u16_value", "16");
+ map.insert("u32_value", "32");
+ map.insert("u64_value", "64");
+ map.insert("i8_value", "-8");
+ map.insert("i16_value", "16");
+ map.insert("i32_value", "-32");
+ map.insert("i64_value", "64");
+ map.insert("vec_value", "hello");
+ map.insert("vec_value_two", "hello,world");
+ map.insert("vec_none", "");
+ map.insert("vec_empty", "");
+ let map = map
+ .into_iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect();
+
+ let output =
TestConfig::deserialize(ConfigDeserializer::new(map)).unwrap();
+ assert_eq!(
+ output,
+ TestConfig {
+ bool_value: true,
+ bool_option_value_none: None,
+ bool_option_value_some: Some(false),
+ bool_value_with_on: true,
+ bool_value_with_off: false,
+ string_value: "hello".to_string(),
+ string_option_value_none: None,
+ string_option_value_some: Some("hello".to_string()),
+ u8_value: 8,
+ u16_value: 16,
+ u32_value: 32,
+ u64_value: 64,
+ i8_value: -8,
+ i16_value: 16,
+ i32_value: -32,
+ i64_value: 64,
+ vec_value: vec!["hello".to_string()],
+ vec_value_two: vec!["hello".to_string(), "world".to_string()],
+ vec_none: None,
+ vec_empty: vec![],
+ }
+ );
+ }
+
+ #[test]
+ fn test_part_config_deserializer() {
+ let mut map = HashMap::new();
+ map.insert("bool_value", "true");
+ let map = map
+ .into_iter()
+ .map(|(k, v)| (k.to_string(), v.to_string()))
+ .collect();
+
+ let output =
TestConfig::deserialize(ConfigDeserializer::new(map)).unwrap();
+ assert_eq!(
+ output,
+ TestConfig {
+ bool_value: true,
+ ..TestConfig::default()
+ }
+ );
+ }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index e9f9b4ea3..cb84289a3 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -147,6 +147,8 @@ pub use self::rocksdb::Rocksdb;
#[cfg(feature = "services-s3")]
mod s3;
#[cfg(feature = "services-s3")]
+pub use s3::S3Config;
+#[cfg(feature = "services-s3")]
pub use s3::S3;
#[cfg(feature = "services-sftp")]
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 1afa5cf02..840e05149 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -36,6 +36,7 @@ use reqsign::AwsConfig;
use reqsign::AwsCredentialLoad;
use reqsign::AwsDefaultLoader;
use reqsign::AwsV4Signer;
+use serde::Deserialize;
use super::core::*;
use super::error::parse_error;
@@ -59,51 +60,174 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str,
&'static str>> = Lazy::new
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
+/// Config for Aws S3 and compatible services (including minio, digitalocean
space, Tencent Cloud Object Storage(COS) and so on) support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct S3Config {
+ /// root of this backend.
+ ///
+ /// All operations will happen under this root.
+ ///
+ /// default to `/` if not set.
+ pub root: Option<String>,
+ /// bucket name of this backend.
+ ///
+ /// required.
+ pub bucket: String,
+ /// endpoint of this backend.
+ ///
+ /// Endpoint must be full uri, e.g.
+ ///
+ /// - AWS S3: `https://s3.amazonaws.com` or
`https://s3.{region}.amazonaws.com`
+ /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
+ /// - Aliyun OSS: `https://{region}.aliyuncs.com`
+ /// - Tencent COS: `https://cos.{region}.myqcloud.com`
+ /// - Minio: `http://127.0.0.1:9000`
+ ///
+ /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
+ /// will prepend "https://" before it.
+ ///
+ /// default to `https://s3.amazonaws.com` if not set.
+ pub endpoint: Option<String>,
+ /// Region represent the signing region of this endpoint. This is required
+ /// if you are using the default AWS S3 endpoint.
+ ///
+ /// If using a custom endpoint,
+ /// - If region is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub region: Option<String>,
+
+ /// access_key_id of this backend.
+ ///
+ /// - If access_key_id is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub access_key_id: Option<String>,
+ /// secret_access_key of this backend.
+ ///
+ /// - If secret_access_key is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub secret_access_key: Option<String>,
+ /// security_token (aka, session token) of this backend.
+ ///
+ /// This token will expire after sometime, it's recommended to set
security_token
+ /// by hand.
+ pub security_token: Option<String>,
+ /// role_arn for this backend.
+ ///
+ /// If `role_arn` is set, we will use already known config as source
+ /// credential to assume role with `role_arn`.
+ pub role_arn: Option<String>,
+ /// external_id for this backend.
+ pub external_id: Option<String>,
+ /// Disable config load so that opendal will not load config from
+ /// environment.
+ ///
+ /// For examples:
+ ///
+ /// - envs like `AWS_ACCESS_KEY_ID`
+ /// - files like `~/.aws/config`
+ pub disable_config_load: bool,
+ /// Disable load credential from ec2 metadata.
+ ///
+ /// This option is used to disable the default behavior of opendal
+ /// to load credential from ec2 metadata, a.k.a, IMDSv2
+ pub disable_ec2_metadata: bool,
+ /// Allow anonymous will allow opendal to send request without signing
+ /// when credential is not loaded.
+ pub allow_anonymous: bool,
+ /// server_side_encryption for this backend.
+ ///
+ /// Available values: `AES256`, `aws:kms`.
+ pub server_side_encryption: Option<String>,
+ /// server_side_encryption_aws_kms_key_id for this backend
+ ///
+ /// - If `server_side_encryption` set to `aws:kms`, and
`server_side_encryption_aws_kms_key_id`
+ /// is not set, S3 will use aws managed kms key to encrypt data.
+ /// - If `server_side_encryption` set to `aws:kms`, and
`server_side_encryption_aws_kms_key_id`
+ /// is a valid kms key id, S3 will use the provided kms key to encrypt
data.
+ /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not
found, an error will be
+ /// returned.
+ /// - If `server_side_encryption` is not `aws:kms`, setting
`server_side_encryption_aws_kms_key_id`
+ /// is a noop.
+ pub server_side_encryption_aws_kms_key_id: Option<String>,
+ /// server_side_encryption_customer_algorithm for this backend.
+ ///
+ /// Available values: `AES256`.
+ pub server_side_encryption_customer_algorithm: Option<String>,
+ /// server_side_encryption_customer_key for this backend.
+ ///
+ /// # Value
+ ///
+ /// base64 encoded key that matches algorithm specified in
+ /// `server_side_encryption_customer_algorithm`.
+ pub server_side_encryption_customer_key: Option<String>,
+ /// Set server_side_encryption_customer_key_md5 for this backend.
+ ///
+ /// # Value
+ ///
+ /// MD5 digest of key specified in `server_side_encryption_customer_key`.
+ pub server_side_encryption_customer_key_md5: Option<String>,
+ /// default storage_class for this backend.
+ ///
+ /// Available values:
+ /// - `DEEP_ARCHIVE`
+ /// - `GLACIER`
+ /// - `GLACIER_IR`
+ /// - `INTELLIGENT_TIERING`
+ /// - `ONEZONE_IA`
+ /// - `OUTPOSTS`
+ /// - `REDUCED_REDUNDANCY`
+ /// - `STANDARD`
+ /// - `STANDARD_IA`
+ ///
+ /// S3 compatible services don't support all of them
+ pub default_storage_class: Option<String>,
+ /// Enable virtual host style so that opendal will send API requests
+ /// in virtual host style instead of path style.
+ ///
+ /// - By default, opendal will send API to
`https://s3.us-east-1.amazonaws.com/bucket_name`
+ /// - Enabled, opendal will send API to
`https://bucket_name.s3.us-east-1.amazonaws.com`
+ pub enable_virtual_host_style: bool,
+ /// Set maximum batch operations of this backend.
+ ///
+ /// Some compatible services have a limit on the number of operations in a
batch request.
+ /// For example, R2 could return `Internal Error` while batch delete 1000
files.
+ ///
+ /// Please tune this value based on services' document.
+ pub batch_max_operations: Option<usize>,
+}
+
+impl Debug for S3Config {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("S3Config");
+
+ d.field("root", &self.root)
+ .field("bucket", &self.bucket)
+ .field("endpoint", &self.endpoint)
+ .field("region", &self.region);
+
+ d.finish_non_exhaustive()
+ }
+}
+
/// Aws S3 and compatible services (including minio, digitalocean space,
Tencent Cloud Object Storage(COS) and so on) support.
/// For more information about s3-compatible services, refer to [Compatible
Services](#compatible-services).
#[doc = include_str!("docs.md")]
#[doc = include_str!("compatible_services.md")]
#[derive(Default)]
pub struct S3Builder {
- root: Option<String>,
-
- bucket: String,
- endpoint: Option<String>,
- region: Option<String>,
-
- // Credentials related values.
- access_key_id: Option<String>,
- secret_access_key: Option<String>,
- security_token: Option<String>,
- role_arn: Option<String>,
- external_id: Option<String>,
- disable_config_load: bool,
- disable_ec2_metadata: bool,
- allow_anonymous: bool,
- customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
-
- // S3 feature
- server_side_encryption: Option<String>,
- server_side_encryption_aws_kms_key_id: Option<String>,
- server_side_encryption_customer_algorithm: Option<String>,
- server_side_encryption_customer_key: Option<String>,
- server_side_encryption_customer_key_md5: Option<String>,
- default_storage_class: Option<String>,
- enable_virtual_host_style: bool,
- batch_max_operations: Option<usize>,
+ config: S3Config,
+ customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,
http_client: Option<HttpClient>,
}
impl Debug for S3Builder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut d = f.debug_struct("Builder");
-
- d.field("root", &self.root)
- .field("bucket", &self.bucket)
- .field("endpoint", &self.endpoint)
- .field("region", &self.region);
+ let mut d = f.debug_struct("S3Builder");
+ d.field("config", &self.config);
d.finish_non_exhaustive()
}
}
@@ -113,7 +237,7 @@ impl S3Builder {
///
/// All operations will happen under this root.
pub fn root(&mut self, root: &str) -> &mut Self {
- self.root = if root.is_empty() {
+ self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
@@ -124,7 +248,7 @@ impl S3Builder {
/// Set bucket name of this backend.
pub fn bucket(&mut self, bucket: &str) -> &mut Self {
- self.bucket = bucket.to_string();
+ self.config.bucket = bucket.to_string();
self
}
@@ -144,7 +268,7 @@ impl S3Builder {
pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
if !endpoint.is_empty() {
// Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
- self.endpoint = Some(endpoint.trim_end_matches('/').to_string())
+ self.config.endpoint =
Some(endpoint.trim_end_matches('/').to_string())
}
self
@@ -158,7 +282,7 @@ impl S3Builder {
/// - If not, we will try to load it from environment.
pub fn region(&mut self, region: &str) -> &mut Self {
if !region.is_empty() {
- self.region = Some(region.to_string())
+ self.config.region = Some(region.to_string())
}
self
@@ -170,7 +294,7 @@ impl S3Builder {
/// - If not, we will try to load it from environment.
pub fn access_key_id(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
- self.access_key_id = Some(v.to_string())
+ self.config.access_key_id = Some(v.to_string())
}
self
@@ -182,7 +306,7 @@ impl S3Builder {
/// - If not, we will try to load it from environment.
pub fn secret_access_key(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
- self.secret_access_key = Some(v.to_string())
+ self.config.secret_access_key = Some(v.to_string())
}
self
@@ -194,7 +318,7 @@ impl S3Builder {
/// credential to assume role with `role_arn`.
pub fn role_arn(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
- self.role_arn = Some(v.to_string())
+ self.config.role_arn = Some(v.to_string())
}
self
@@ -203,7 +327,7 @@ impl S3Builder {
/// Set external_id for this backend.
pub fn external_id(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
- self.external_id = Some(v.to_string())
+ self.config.external_id = Some(v.to_string())
}
self
@@ -223,7 +347,7 @@ impl S3Builder {
/// - `STANDARD_IA`
pub fn default_storage_class(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
- self.default_storage_class = Some(v.to_string())
+ self.config.default_storage_class = Some(v.to_string())
}
self
@@ -241,7 +365,7 @@ impl S3Builder {
/// Please use `server_side_encryption_with_*` helpers if even possible.
pub fn server_side_encryption(&mut self, v: &str) -> &mut Self {
if !v.is_empty() {
- self.server_side_encryption = Some(v.to_string())
+ self.config.server_side_encryption = Some(v.to_string())
}
self
@@ -266,7 +390,7 @@ impl S3Builder {
/// Please use `server_side_encryption_with_*` helpers if even possible.
pub fn server_side_encryption_aws_kms_key_id(&mut self, v: &str) -> &mut
Self {
if !v.is_empty() {
- self.server_side_encryption_aws_kms_key_id = Some(v.to_string())
+ self.config.server_side_encryption_aws_kms_key_id =
Some(v.to_string())
}
self
@@ -284,7 +408,7 @@ impl S3Builder {
/// Please use `server_side_encryption_with_*` helpers if even possible.
pub fn server_side_encryption_customer_algorithm(&mut self, v: &str) ->
&mut Self {
if !v.is_empty() {
- self.server_side_encryption_customer_algorithm =
Some(v.to_string())
+ self.config.server_side_encryption_customer_algorithm =
Some(v.to_string())
}
self
@@ -305,7 +429,7 @@ impl S3Builder {
/// Please use `server_side_encryption_with_*` helpers if even possible.
pub fn server_side_encryption_customer_key(&mut self, v: &str) -> &mut
Self {
if !v.is_empty() {
- self.server_side_encryption_customer_key = Some(v.to_string())
+ self.config.server_side_encryption_customer_key =
Some(v.to_string())
}
self
@@ -325,7 +449,7 @@ impl S3Builder {
/// Please use `server_side_encryption_with_*` helpers if even possible.
pub fn server_side_encryption_customer_key_md5(&mut self, v: &str) -> &mut
Self {
if !v.is_empty() {
- self.server_side_encryption_customer_key_md5 = Some(v.to_string())
+ self.config.server_side_encryption_customer_key_md5 =
Some(v.to_string())
}
self
@@ -337,7 +461,7 @@ impl S3Builder {
///
/// NOTE: This function should not be used along with other
`server_side_encryption_with_` functions.
pub fn server_side_encryption_with_aws_managed_kms_key(&mut self) -> &mut
Self {
- self.server_side_encryption = Some("aws:kms".to_string());
+ self.config.server_side_encryption = Some("aws:kms".to_string());
self
}
@@ -350,8 +474,8 @@ impl S3Builder {
&mut self,
aws_kms_key_id: &str,
) -> &mut Self {
- self.server_side_encryption = Some("aws:kms".to_string());
- self.server_side_encryption_aws_kms_key_id =
Some(aws_kms_key_id.to_string());
+ self.config.server_side_encryption = Some("aws:kms".to_string());
+ self.config.server_side_encryption_aws_kms_key_id =
Some(aws_kms_key_id.to_string());
self
}
@@ -361,7 +485,7 @@ impl S3Builder {
///
/// NOTE: This function should not be used along with other
`server_side_encryption_with_` functions.
pub fn server_side_encryption_with_s3_key(&mut self) -> &mut Self {
- self.server_side_encryption = Some("AES256".to_string());
+ self.config.server_side_encryption = Some("AES256".to_string());
self
}
@@ -375,9 +499,9 @@ impl S3Builder {
algorithm: &str,
key: &[u8],
) -> &mut Self {
- self.server_side_encryption_customer_algorithm =
Some(algorithm.to_string());
- self.server_side_encryption_customer_key =
Some(BASE64_STANDARD.encode(key));
- self.server_side_encryption_customer_key_md5 =
+ self.config.server_side_encryption_customer_algorithm =
Some(algorithm.to_string());
+ self.config.server_side_encryption_customer_key =
Some(BASE64_STANDARD.encode(key));
+ self.config.server_side_encryption_customer_key_md5 =
Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice()));
self
}
@@ -389,7 +513,7 @@ impl S3Builder {
/// security token's lifetime is short and requires users to refresh in
time.
pub fn security_token(&mut self, token: &str) -> &mut Self {
if !token.is_empty() {
- self.security_token = Some(token.to_string());
+ self.config.security_token = Some(token.to_string());
}
self
}
@@ -402,7 +526,7 @@ impl S3Builder {
/// - envs like `AWS_ACCESS_KEY_ID`
/// - files like `~/.aws/config`
pub fn disable_config_load(&mut self) -> &mut Self {
- self.disable_config_load = true;
+ self.config.disable_config_load = true;
self
}
@@ -411,14 +535,14 @@ impl S3Builder {
/// This option is used to disable the default behavior of opendal
/// to load credential from ec2 metadata, a.k.a, IMDSv2
pub fn disable_ec2_metadata(&mut self) -> &mut Self {
- self.disable_ec2_metadata = true;
+ self.config.disable_ec2_metadata = true;
self
}
/// Allow anonymous will allow opendal to send request without signing
/// when credential is not loaded.
pub fn allow_anonymous(&mut self) -> &mut Self {
- self.allow_anonymous = true;
+ self.config.allow_anonymous = true;
self
}
@@ -428,7 +552,7 @@ impl S3Builder {
/// - By default, opendal will send API to
`https://s3.us-east-1.amazonaws.com/bucket_name`
/// - Enabled, opendal will send API to
`https://bucket_name.s3.us-east-1.amazonaws.com`
pub fn enable_virtual_host_style(&mut self) -> &mut Self {
- self.enable_virtual_host_style = true;
+ self.config.enable_virtual_host_style = true;
self
}
@@ -456,13 +580,13 @@ impl S3Builder {
/// `bucket` must be not empty and if `enable_virtual_host_style` is true
/// it couldn't contain dot(.) character
fn is_bucket_valid(&self) -> bool {
- if self.bucket.is_empty() {
+ if self.config.bucket.is_empty() {
return false;
}
// If enable virtual host style, `bucket` will reside in domain part,
// for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
// so `bucket` with dot can't be recognized correctly for this format.
- if self.enable_virtual_host_style && self.bucket.contains('.') {
+ if self.config.enable_virtual_host_style &&
self.config.bucket.contains('.') {
return false;
}
true
@@ -473,10 +597,10 @@ impl S3Builder {
let bucket = {
debug_assert!(self.is_bucket_valid(), "bucket must be valid");
- self.bucket.as_str()
+ self.config.bucket.as_str()
};
- let mut endpoint = match &self.endpoint {
+ let mut endpoint = match &self.config.endpoint {
Some(endpoint) => {
if endpoint.starts_with("http") {
endpoint.to_string()
@@ -501,7 +625,7 @@ impl S3Builder {
};
// Apply virtual host style.
- if self.enable_virtual_host_style {
+ if self.config.enable_virtual_host_style {
endpoint = endpoint.replace("//", &format!("//{bucket}."))
} else {
write!(endpoint, "/{bucket}").expect("write into string must
succeed");
@@ -512,7 +636,7 @@ impl S3Builder {
/// Set maximum batch operations of this backend.
pub fn batch_max_operations(&mut self, batch_max_operations: usize) ->
&mut Self {
- self.batch_max_operations = Some(batch_max_operations);
+ self.config.batch_max_operations = Some(batch_max_operations);
self
}
@@ -638,57 +762,25 @@ impl Builder for S3Builder {
type Accessor = S3Backend;
fn from_map(map: HashMap<String, String>) -> Self {
- let mut builder = S3Builder::default();
-
- map.get("root").map(|v| builder.root(v));
- map.get("bucket").map(|v| builder.bucket(v));
- map.get("endpoint").map(|v| builder.endpoint(v));
- map.get("region").map(|v| builder.region(v));
- map.get("access_key_id").map(|v| builder.access_key_id(v));
- map.get("secret_access_key")
- .map(|v| builder.secret_access_key(v));
- map.get("security_token").map(|v| builder.security_token(v));
- map.get("role_arn").map(|v| builder.role_arn(v));
- map.get("external_id").map(|v| builder.external_id(v));
- map.get("server_side_encryption")
- .map(|v| builder.server_side_encryption(v));
- map.get("server_side_encryption_aws_kms_key_id")
- .map(|v| builder.server_side_encryption_aws_kms_key_id(v));
- map.get("server_side_encryption_customer_algorithm")
- .map(|v| builder.server_side_encryption_customer_algorithm(v));
- map.get("server_side_encryption_customer_key")
- .map(|v| builder.server_side_encryption_customer_key(v));
- map.get("server_side_encryption_customer_key_md5")
- .map(|v| builder.server_side_encryption_customer_key_md5(v));
- map.get("disable_config_load")
- .filter(|v| *v == "on" || *v == "true")
- .map(|_| builder.disable_config_load());
- map.get("disable_ec2_metadata")
- .filter(|v| *v == "on" || *v == "true")
- .map(|_| builder.disable_ec2_metadata());
- map.get("enable_virtual_host_style")
- .filter(|v| *v == "on" || *v == "true")
- .map(|_| builder.enable_virtual_host_style());
- map.get("allow_anonymous")
- .filter(|v| *v == "on" || *v == "true")
- .map(|_| builder.allow_anonymous());
- map.get("default_storage_class")
- .map(|v: &String| builder.default_storage_class(v));
- map.get("batch_max_operations")
- .map(|v| builder.batch_max_operations(v.parse().expect("input must
be a number")));
-
- builder
+ let config = S3Config::deserialize(ConfigDeserializer::new(map))
+ .expect("config deserialize must succeed");
+
+ S3Builder {
+ config,
+ customed_credential_load: None,
+ http_client: None,
+ }
}
fn build(&mut self) -> Result<Self::Accessor> {
debug!("backend build started: {:?}", &self);
- let root = normalize_root(&self.root.take().unwrap_or_default());
+ let root =
normalize_root(&self.config.root.clone().unwrap_or_default());
debug!("backend use root {}", &root);
// Handle bucket name.
let bucket = if self.is_bucket_valid() {
- Ok(&self.bucket)
+ Ok(&self.config.bucket)
} else {
Err(
Error::new(ErrorKind::ConfigInvalid, "The bucket is
misconfigured")
@@ -697,14 +789,14 @@ impl Builder for S3Builder {
}?;
debug!("backend use bucket {}", &bucket);
- let default_storage_class = match &self.default_storage_class {
+ let default_storage_class = match &self.config.default_storage_class {
None => None,
Some(v) => Some(
build_header_value(v).map_err(|err| err.with_context("key",
"storage_class"))?,
),
};
- let server_side_encryption = match &self.server_side_encryption {
+ let server_side_encryption = match &self.config.server_side_encryption
{
None => None,
Some(v) => Some(
build_header_value(v)
@@ -713,7 +805,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_aws_kms_key_id =
- match &self.server_side_encryption_aws_kms_key_id {
+ match &self.config.server_side_encryption_aws_kms_key_id {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_aws_kms_key_id")
@@ -721,7 +813,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_customer_algorithm =
- match &self.server_side_encryption_customer_algorithm {
+ match &self.config.server_side_encryption_customer_algorithm {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_customer_algorithm")
@@ -729,7 +821,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_customer_key =
- match &self.server_side_encryption_customer_key {
+ match &self.config.server_side_encryption_customer_key {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_customer_key")
@@ -737,7 +829,7 @@ impl Builder for S3Builder {
};
let server_side_encryption_customer_key_md5 =
- match &self.server_side_encryption_customer_key_md5 {
+ match &self.config.server_side_encryption_customer_key_md5 {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key",
"server_side_encryption_customer_key_md5")
@@ -755,12 +847,12 @@ impl Builder for S3Builder {
// This is our current config.
let mut cfg = AwsConfig::default();
- if !self.disable_config_load {
+ if !self.config.disable_config_load {
cfg = cfg.from_profile();
cfg = cfg.from_env();
}
- if let Some(v) = self.region.take() {
+ if let Some(v) = self.config.region.take() {
cfg.region = Some(v);
}
if cfg.region.is_none() {
@@ -780,13 +872,13 @@ impl Builder for S3Builder {
debug!("backend use endpoint: {endpoint}");
// Setting all value from user input if available.
- if let Some(v) = self.access_key_id.take() {
+ if let Some(v) = self.config.access_key_id.take() {
cfg.access_key_id = Some(v)
}
- if let Some(v) = self.secret_access_key.take() {
+ if let Some(v) = self.config.secret_access_key.take() {
cfg.secret_access_key = Some(v)
}
- if let Some(v) = self.security_token.take() {
+ if let Some(v) = self.config.security_token.take() {
cfg.session_token = Some(v)
}
@@ -797,7 +889,7 @@ impl Builder for S3Builder {
}
// If role_arn is set, we must use AssumeRoleLoad.
- if let Some(role_arn) = self.role_arn.take() {
+ if let Some(role_arn) = self.config.role_arn.take() {
// use current env as source credential loader.
let default_loader = AwsDefaultLoader::new(client.client(),
cfg.clone());
@@ -805,7 +897,7 @@ impl Builder for S3Builder {
let assume_role_cfg = AwsConfig {
region: Some(region.clone()),
role_arn: Some(role_arn),
- external_id: self.external_id.clone(),
+ external_id: self.config.external_id.clone(),
sts_regional_endpoints: "regional".to_string(),
..Default::default()
};
@@ -829,7 +921,7 @@ impl Builder for S3Builder {
Some(v) => v,
None => {
let mut default_loader =
AwsDefaultLoader::new(client.client(), cfg);
- if self.disable_ec2_metadata {
+ if self.config.disable_ec2_metadata {
default_loader =
default_loader.with_disable_ec2_metadata();
}
@@ -840,6 +932,7 @@ impl Builder for S3Builder {
let signer = AwsV4Signer::new("s3", ®ion);
let batch_max_operations = self
+ .config
.batch_max_operations
.unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
debug!("backend build finished");
@@ -854,7 +947,7 @@ impl Builder for S3Builder {
server_side_encryption_customer_key,
server_side_encryption_customer_key_md5,
default_storage_class,
- allow_anonymous: self.allow_anonymous,
+ allow_anonymous: self.config.allow_anonymous,
signer,
loader,
client,
diff --git a/core/src/services/s3/mod.rs b/core/src/services/s3/mod.rs
index 876dfacb4..9e343f9c1 100644
--- a/core/src/services/s3/mod.rs
+++ b/core/src/services/s3/mod.rs
@@ -17,6 +17,7 @@
mod backend;
pub use backend::S3Builder as S3;
+pub use backend::S3Config;
mod core;
mod error;