This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch config-refactor in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 6da129cbfa7d7a0ba5f1d1553f70705cba9602dd Author: Xuanwo <[email protected]> AuthorDate: Mon Nov 6 20:30:49 2023 +0800 feat(services): Implement ConfigDeserializer and add S3Config as example Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/serde_util.rs | 372 ++++++++++++++++++++++++++++++++++++++++ core/src/services/mod.rs | 2 + core/src/services/s3/backend.rs | 335 +++++++++++++++++++++++------------- core/src/services/s3/mod.rs | 1 + 4 files changed, 589 insertions(+), 121 deletions(-) diff --git a/core/src/raw/serde_util.rs b/core/src/raw/serde_util.rs index 5d04ede15..93c2cae3b 100644 --- a/core/src/raw/serde_util.rs +++ b/core/src/raw/serde_util.rs @@ -15,6 +15,12 @@ // specific language governing permissions and limitations // under the License. +use serde::de::value::{MapDeserializer, SeqDeserializer}; +use serde::de::{self, Deserializer, IntoDeserializer, Visitor}; +use std::collections::hash_map::IntoIter; +use std::collections::HashMap; +use std::iter::empty; + use crate::*; /// Parse xml deserialize error into opendal::Error. @@ -31,3 +37,369 @@ pub fn new_json_serialize_error(e: serde_json::Error) -> Error { pub fn new_json_deserialize_error(e: serde_json::Error) -> Error { Error::new(ErrorKind::Unexpected, "deserialize json").set_source(e) } + +/// ConfigDeserializer is used to deserialize given configs from `HashMap<String, String>`. +/// +/// This is only used by our services config. +pub struct ConfigDeserializer(MapDeserializer<'static, Pairs, de::value::Error>); + +impl ConfigDeserializer { + /// Create a new config deserializer. + pub fn new(map: HashMap<String, String>) -> Self { + let pairs = Pairs(map.into_iter()); + Self(MapDeserializer::new(pairs)) + } +} + +impl<'de> Deserializer<'de> for ConfigDeserializer { + type Error = de::value::Error; + + fn deserialize_any<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + self.deserialize_map(visitor) + } + + fn deserialize_map<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + visitor.visit_map(self.0) + } + + serde::forward_to_deserialize_any! { + bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit seq + bytes byte_buf unit_struct tuple_struct + identifier tuple ignored_any option newtype_struct enum + struct + } +} + +/// Pairs is used to implement Iterator to meet the requirement of [`MapDeserializer`]. +struct Pairs(IntoIter<String, String>); + +impl Iterator for Pairs { + type Item = (String, Pair); + + fn next(&mut self) -> Option<Self::Item> { + self.0.next().map(|(k, v)| (k.to_lowercase(), Pair(k, v))) + } +} + +/// Pair is used to hold both key and value of a config for better error output. +struct Pair(String, String); + +impl<'de> IntoDeserializer<'de, de::value::Error> for Pair { + type Deserializer = Self; + + fn into_deserializer(self) -> Self::Deserializer { + self + } +} + +impl<'de> Deserializer<'de> for Pair { + type Error = de::value::Error; + + fn deserialize_any<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + self.1.into_deserializer().deserialize_any(visitor) + } + + fn deserialize_bool<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<bool>() { + Ok(val) => val.into_deserializer().deserialize_bool(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_i8<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<i8>() { + Ok(val) => val.into_deserializer().deserialize_i8(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_i16<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<i16>() { + Ok(val) => val.into_deserializer().deserialize_i16(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_i32<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<i32>() { + Ok(val) => val.into_deserializer().deserialize_i32(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_i64<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<i64>() { + Ok(val) => val.into_deserializer().deserialize_i64(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_u8<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<u8>() { + Ok(val) => val.into_deserializer().deserialize_u8(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_u16<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<u16>() { + Ok(val) => val.into_deserializer().deserialize_u16(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_u32<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<u32>() { + Ok(val) => val.into_deserializer().deserialize_u32(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_u64<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<u64>() { + Ok(val) => val.into_deserializer().deserialize_u64(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_f32<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<f32>() { + Ok(val) => val.into_deserializer().deserialize_f32(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_f64<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.1.parse::<f64>() { + Ok(val) => val.into_deserializer().deserialize_f64(visitor), + Err(e) => Err(de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + ))), + } + } + + fn deserialize_option<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if self.1.is_empty() { + visitor.visit_none() + } else { + visitor.visit_some(self) + } + } + + fn deserialize_seq<V>(self, visitor: V) -> std::result::Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + // Return empty instead of `[""]`. + if self.1.is_empty() { + SeqDeserializer::new(empty::<Pair>()) + .deserialize_seq(visitor) + .map_err(|e| { + de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + )) + }) + } else { + let values = self + .1 + .split(',') + .map(|v| Pair(self.0.clone(), v.trim().to_owned())); + SeqDeserializer::new(values) + .deserialize_seq(visitor) + .map_err(|e| { + de::Error::custom(format_args!( + "parse config '{}' with value '{}' failed for {:?}", + self.0, self.1, e + )) + }) + } + } + + serde::forward_to_deserialize_any! { + char str string unit newtype_struct enum + bytes byte_buf map unit_struct tuple_struct + identifier tuple ignored_any + struct + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Deserialize; + + #[derive(Debug, Default, Deserialize, Eq, PartialEq)] + #[serde(default)] + #[non_exhaustive] + pub struct TestConfig { + bool_value: bool, + bool_option_value_none: Option<bool>, + bool_option_value_some: Option<bool>, + + string_value: String, + string_option_value_none: Option<String>, + string_option_value_some: Option<String>, + + u8_value: u8, + u16_value: u16, + u32_value: u32, + u64_value: u64, + i8_value: i8, + i16_value: i16, + i32_value: i32, + i64_value: i64, + + vec_value: Vec<String>, + vec_value_two: Vec<String>, + vec_none: Option<Vec<String>>, + vec_empty: Vec<String>, + } + + #[test] + fn test_config_deserializer() { + let mut map = HashMap::new(); + map.insert("bool_value", "true"); + map.insert("bool_option_value_none", ""); + map.insert("bool_option_value_some", "false"); + map.insert("string_value", "hello"); + map.insert("string_option_value_none", ""); + map.insert("string_option_value_some", "hello"); + map.insert("u8_value", "8"); + map.insert("u16_value", "16"); + map.insert("u32_value", "32"); + map.insert("u64_value", "64"); + map.insert("i8_value", "-8"); + map.insert("i16_value", "16"); + map.insert("i32_value", "-32"); + map.insert("i64_value", "64"); + map.insert("vec_value", "hello"); + map.insert("vec_value_two", "hello,world"); + map.insert("vec_none", ""); + map.insert("vec_empty", ""); + let map = map + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + let output = TestConfig::deserialize(ConfigDeserializer::new(map)).unwrap(); + assert_eq!( + output, + TestConfig { + bool_value: true, + bool_option_value_none: None, + bool_option_value_some: Some(false), + string_value: "hello".to_string(), + string_option_value_none: None, + string_option_value_some: Some("hello".to_string()), + u8_value: 8, + u16_value: 16, + u32_value: 32, + u64_value: 64, + i8_value: -8, + i16_value: 16, + i32_value: -32, + i64_value: 64, + vec_value: vec!["hello".to_string()], + vec_value_two: vec!["hello".to_string(), "world".to_string()], + vec_none: None, + vec_empty: vec![], + } + ); + } + + #[test] + fn test_part_config_deserializer() { + let mut map = HashMap::new(); + map.insert("bool_value", "true"); + let map = map + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + let output = TestConfig::deserialize(ConfigDeserializer::new(map)).unwrap(); + assert_eq!( + output, + TestConfig { + bool_value: true, + ..TestConfig::default() + } + ); + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index e9f9b4ea3..cb84289a3 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -147,6 +147,8 @@ pub use self::rocksdb::Rocksdb; #[cfg(feature = "services-s3")] mod s3; #[cfg(feature = "services-s3")] +pub use s3::S3Config; +#[cfg(feature = "services-s3")] pub use s3::S3; #[cfg(feature = "services-sftp")] diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 1afa5cf02..840e05149 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -36,6 +36,7 @@ use reqsign::AwsConfig; use reqsign::AwsCredentialLoad; use reqsign::AwsDefaultLoader; use reqsign::AwsV4Signer; +use serde::Deserialize; use super::core::*; use super::error::parse_error; @@ -59,51 +60,174 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000; +/// Config for Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct S3Config { + /// root of this backend. + /// + /// All operations will happen under this root. + /// + /// default to `/` if not set. + pub root: Option<String>, + /// bucket name of this backend. + /// + /// required. + pub bucket: String, + /// endpoint of this backend. + /// + /// Endpoint must be full uri, e.g. + /// + /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com` + /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com` + /// - Aliyun OSS: `https://{region}.aliyuncs.com` + /// - Tencent COS: `https://cos.{region}.myqcloud.com` + /// - Minio: `http://127.0.0.1:9000` + /// + /// If user inputs endpoint without scheme like "s3.amazonaws.com", we + /// will prepend "https://" before it. + /// + /// default to `https://s3.amazonaws.com` if not set. + pub endpoint: Option<String>, + /// Region represent the signing region of this endpoint. This is required + /// if you are using the default AWS S3 endpoint. + /// + /// If using a custom endpoint, + /// - If region is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub region: Option<String>, + + /// access_key_id of this backend. + /// + /// - If access_key_id is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub access_key_id: Option<String>, + /// secret_access_key of this backend. + /// + /// - If secret_access_key is set, we will take user's input first. + /// - If not, we will try to load it from environment. + pub secret_access_key: Option<String>, + /// security_token (aka, session token) of this backend. + /// + /// This token will expire after sometime, it's recommended to set security_token + /// by hand. + pub security_token: Option<String>, + /// role_arn for this backend. + /// + /// If `role_arn` is set, we will use already known config as source + /// credential to assume role with `role_arn`. + pub role_arn: Option<String>, + /// external_id for this backend. + pub external_id: Option<String>, + /// Disable config load so that opendal will not load config from + /// environment. + /// + /// For examples: + /// + /// - envs like `AWS_ACCESS_KEY_ID` + /// - files like `~/.aws/config` + pub disable_config_load: bool, + /// Disable load credential from ec2 metadata. + /// + /// This option is used to disable the default behavior of opendal + /// to load credential from ec2 metadata, a.k.a, IMDSv2 + pub disable_ec2_metadata: bool, + /// Allow anonymous will allow opendal to send request without signing + /// when credential is not loaded. + pub allow_anonymous: bool, + /// server_side_encryption for this backend. + /// + /// Available values: `AES256`, `aws:kms`. + pub server_side_encryption: Option<String>, + /// server_side_encryption_aws_kms_key_id for this backend + /// + /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id` + /// is not set, S3 will use aws managed kms key to encrypt data. + /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id` + /// is a valid kms key id, S3 will use the provided kms key to encrypt data. + /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be + /// returned. + /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` + /// is a noop. + pub server_side_encryption_aws_kms_key_id: Option<String>, + /// server_side_encryption_customer_algorithm for this backend. + /// + /// Available values: `AES256`. + pub server_side_encryption_customer_algorithm: Option<String>, + /// server_side_encryption_customer_key for this backend. + /// + /// # Value + /// + /// base64 encoded key that matches algorithm specified in + /// `server_side_encryption_customer_algorithm`. + pub server_side_encryption_customer_key: Option<String>, + /// Set server_side_encryption_customer_key_md5 for this backend. + /// + /// # Value + /// + /// MD5 digest of key specified in `server_side_encryption_customer_key`. + pub server_side_encryption_customer_key_md5: Option<String>, + /// default storage_class for this backend. + /// + /// Available values: + /// - `DEEP_ARCHIVE` + /// - `GLACIER` + /// - `GLACIER_IR` + /// - `INTELLIGENT_TIERING` + /// - `ONEZONE_IA` + /// - `OUTPOSTS` + /// - `REDUCED_REDUNDANCY` + /// - `STANDARD` + /// - `STANDARD_IA` + /// + /// S3 compatible services don't support all of them + pub default_storage_class: Option<String>, + /// Enable virtual host style so that opendal will send API requests + /// in virtual host style instead of path style. + /// + /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name` + /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com` + pub enable_virtual_host_style: bool, + /// Set maximum batch operations of this backend. + /// + /// Some compatible services have a limit on the number of operations in a batch request. + /// For example, R2 could return `Internal Error` while batch delete 1000 files. + /// + /// Please tune this value based on services' document. + pub batch_max_operations: Option<usize>, +} + +impl Debug for S3Config { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("S3Config"); + + d.field("root", &self.root) + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .field("region", &self.region); + + d.finish_non_exhaustive() + } +} + /// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support. /// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services). #[doc = include_str!("docs.md")] #[doc = include_str!("compatible_services.md")] #[derive(Default)] pub struct S3Builder { - root: Option<String>, - - bucket: String, - endpoint: Option<String>, - region: Option<String>, - - // Credentials related values. - access_key_id: Option<String>, - secret_access_key: Option<String>, - security_token: Option<String>, - role_arn: Option<String>, - external_id: Option<String>, - disable_config_load: bool, - disable_ec2_metadata: bool, - allow_anonymous: bool, - customed_credential_load: Option<Box<dyn AwsCredentialLoad>>, - - // S3 feature - server_side_encryption: Option<String>, - server_side_encryption_aws_kms_key_id: Option<String>, - server_side_encryption_customer_algorithm: Option<String>, - server_side_encryption_customer_key: Option<String>, - server_side_encryption_customer_key_md5: Option<String>, - default_storage_class: Option<String>, - enable_virtual_host_style: bool, - batch_max_operations: Option<usize>, + config: S3Config, + customed_credential_load: Option<Box<dyn AwsCredentialLoad>>, http_client: Option<HttpClient>, } impl Debug for S3Builder { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut d = f.debug_struct("Builder"); - - d.field("root", &self.root) - .field("bucket", &self.bucket) - .field("endpoint", &self.endpoint) - .field("region", &self.region); + let mut d = f.debug_struct("S3Builder"); + d.field("config", &self.config); d.finish_non_exhaustive() } } @@ -113,7 +237,7 @@ impl S3Builder { /// /// All operations will happen under this root. pub fn root(&mut self, root: &str) -> &mut Self { - self.root = if root.is_empty() { + self.config.root = if root.is_empty() { None } else { Some(root.to_string()) @@ -124,7 +248,7 @@ impl S3Builder { /// Set bucket name of this backend. pub fn bucket(&mut self, bucket: &str) -> &mut Self { - self.bucket = bucket.to_string(); + self.config.bucket = bucket.to_string(); self } @@ -144,7 +268,7 @@ impl S3Builder { pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { if !endpoint.is_empty() { // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/` - self.endpoint = Some(endpoint.trim_end_matches('/').to_string()) + self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string()) } self @@ -158,7 +282,7 @@ impl S3Builder { /// - If not, we will try to load it from environment. pub fn region(&mut self, region: &str) -> &mut Self { if !region.is_empty() { - self.region = Some(region.to_string()) + self.config.region = Some(region.to_string()) } self @@ -170,7 +294,7 @@ impl S3Builder { /// - If not, we will try to load it from environment. pub fn access_key_id(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.access_key_id = Some(v.to_string()) + self.config.access_key_id = Some(v.to_string()) } self @@ -182,7 +306,7 @@ impl S3Builder { /// - If not, we will try to load it from environment. pub fn secret_access_key(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.secret_access_key = Some(v.to_string()) + self.config.secret_access_key = Some(v.to_string()) } self @@ -194,7 +318,7 @@ impl S3Builder { /// credential to assume role with `role_arn`. pub fn role_arn(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.role_arn = Some(v.to_string()) + self.config.role_arn = Some(v.to_string()) } self @@ -203,7 +327,7 @@ impl S3Builder { /// Set external_id for this backend. pub fn external_id(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.external_id = Some(v.to_string()) + self.config.external_id = Some(v.to_string()) } self @@ -223,7 +347,7 @@ impl S3Builder { /// - `STANDARD_IA` pub fn default_storage_class(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.default_storage_class = Some(v.to_string()) + self.config.default_storage_class = Some(v.to_string()) } self @@ -241,7 +365,7 @@ impl S3Builder { /// Please use `server_side_encryption_with_*` helpers if even possible. pub fn server_side_encryption(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.server_side_encryption = Some(v.to_string()) + self.config.server_side_encryption = Some(v.to_string()) } self @@ -266,7 +390,7 @@ impl S3Builder { /// Please use `server_side_encryption_with_*` helpers if even possible. pub fn server_side_encryption_aws_kms_key_id(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.server_side_encryption_aws_kms_key_id = Some(v.to_string()) + self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string()) } self @@ -284,7 +408,7 @@ impl S3Builder { /// Please use `server_side_encryption_with_*` helpers if even possible. pub fn server_side_encryption_customer_algorithm(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.server_side_encryption_customer_algorithm = Some(v.to_string()) + self.config.server_side_encryption_customer_algorithm = Some(v.to_string()) } self @@ -305,7 +429,7 @@ impl S3Builder { /// Please use `server_side_encryption_with_*` helpers if even possible. pub fn server_side_encryption_customer_key(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.server_side_encryption_customer_key = Some(v.to_string()) + self.config.server_side_encryption_customer_key = Some(v.to_string()) } self @@ -325,7 +449,7 @@ impl S3Builder { /// Please use `server_side_encryption_with_*` helpers if even possible. pub fn server_side_encryption_customer_key_md5(&mut self, v: &str) -> &mut Self { if !v.is_empty() { - self.server_side_encryption_customer_key_md5 = Some(v.to_string()) + self.config.server_side_encryption_customer_key_md5 = Some(v.to_string()) } self @@ -337,7 +461,7 @@ impl S3Builder { /// /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions. pub fn server_side_encryption_with_aws_managed_kms_key(&mut self) -> &mut Self { - self.server_side_encryption = Some("aws:kms".to_string()); + self.config.server_side_encryption = Some("aws:kms".to_string()); self } @@ -350,8 +474,8 @@ impl S3Builder { &mut self, aws_kms_key_id: &str, ) -> &mut Self { - self.server_side_encryption = Some("aws:kms".to_string()); - self.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string()); + self.config.server_side_encryption = Some("aws:kms".to_string()); + self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string()); self } @@ -361,7 +485,7 @@ impl S3Builder { /// /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions. pub fn server_side_encryption_with_s3_key(&mut self) -> &mut Self { - self.server_side_encryption = Some("AES256".to_string()); + self.config.server_side_encryption = Some("AES256".to_string()); self } @@ -375,9 +499,9 @@ impl S3Builder { algorithm: &str, key: &[u8], ) -> &mut Self { - self.server_side_encryption_customer_algorithm = Some(algorithm.to_string()); - self.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key)); - self.server_side_encryption_customer_key_md5 = + self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string()); + self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key)); + self.config.server_side_encryption_customer_key_md5 = Some(BASE64_STANDARD.encode(Md5::digest(key).as_slice())); self } @@ -389,7 +513,7 @@ impl S3Builder { /// security token's lifetime is short and requires users to refresh in time. pub fn security_token(&mut self, token: &str) -> &mut Self { if !token.is_empty() { - self.security_token = Some(token.to_string()); + self.config.security_token = Some(token.to_string()); } self } @@ -402,7 +526,7 @@ impl S3Builder { /// - envs like `AWS_ACCESS_KEY_ID` /// - files like `~/.aws/config` pub fn disable_config_load(&mut self) -> &mut Self { - self.disable_config_load = true; + self.config.disable_config_load = true; self } @@ -411,14 +535,14 @@ impl S3Builder { /// This option is used to disable the default behavior of opendal /// to load credential from ec2 metadata, a.k.a, IMDSv2 pub fn disable_ec2_metadata(&mut self) -> &mut Self { - self.disable_ec2_metadata = true; + self.config.disable_ec2_metadata = true; self } /// Allow anonymous will allow opendal to send request without signing /// when credential is not loaded. pub fn allow_anonymous(&mut self) -> &mut Self { - self.allow_anonymous = true; + self.config.allow_anonymous = true; self } @@ -428,7 +552,7 @@ impl S3Builder { /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name` /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com` pub fn enable_virtual_host_style(&mut self) -> &mut Self { - self.enable_virtual_host_style = true; + self.config.enable_virtual_host_style = true; self } @@ -456,13 +580,13 @@ impl S3Builder { /// `bucket` must be not empty and if `enable_virtual_host_style` is true /// it couldn't contain dot(.) character fn is_bucket_valid(&self) -> bool { - if self.bucket.is_empty() { + if self.config.bucket.is_empty() { return false; } // If enable virtual host style, `bucket` will reside in domain part, // for example `https://bucket_name.s3.us-east-1.amazonaws.com`, // so `bucket` with dot can't be recognized correctly for this format. - if self.enable_virtual_host_style && self.bucket.contains('.') { + if self.config.enable_virtual_host_style && self.config.bucket.contains('.') { return false; } true @@ -473,10 +597,10 @@ impl S3Builder { let bucket = { debug_assert!(self.is_bucket_valid(), "bucket must be valid"); - self.bucket.as_str() + self.config.bucket.as_str() }; - let mut endpoint = match &self.endpoint { + let mut endpoint = match &self.config.endpoint { Some(endpoint) => { if endpoint.starts_with("http") { endpoint.to_string() @@ -501,7 +625,7 @@ impl S3Builder { }; // Apply virtual host style. - if self.enable_virtual_host_style { + if self.config.enable_virtual_host_style { endpoint = endpoint.replace("//", &format!("//{bucket}.")) } else { write!(endpoint, "/{bucket}").expect("write into string must succeed"); @@ -512,7 +636,7 @@ impl S3Builder { /// Set maximum batch operations of this backend. pub fn batch_max_operations(&mut self, batch_max_operations: usize) -> &mut Self { - self.batch_max_operations = Some(batch_max_operations); + self.config.batch_max_operations = Some(batch_max_operations); self } @@ -638,57 +762,25 @@ impl Builder for S3Builder { type Accessor = S3Backend; fn from_map(map: HashMap<String, String>) -> Self { - let mut builder = S3Builder::default(); - - map.get("root").map(|v| builder.root(v)); - map.get("bucket").map(|v| builder.bucket(v)); - map.get("endpoint").map(|v| builder.endpoint(v)); - map.get("region").map(|v| builder.region(v)); - map.get("access_key_id").map(|v| builder.access_key_id(v)); - map.get("secret_access_key") - .map(|v| builder.secret_access_key(v)); - map.get("security_token").map(|v| builder.security_token(v)); - map.get("role_arn").map(|v| builder.role_arn(v)); - map.get("external_id").map(|v| builder.external_id(v)); - map.get("server_side_encryption") - .map(|v| builder.server_side_encryption(v)); - map.get("server_side_encryption_aws_kms_key_id") - .map(|v| builder.server_side_encryption_aws_kms_key_id(v)); - map.get("server_side_encryption_customer_algorithm") - .map(|v| builder.server_side_encryption_customer_algorithm(v)); - map.get("server_side_encryption_customer_key") - .map(|v| builder.server_side_encryption_customer_key(v)); - map.get("server_side_encryption_customer_key_md5") - .map(|v| builder.server_side_encryption_customer_key_md5(v)); - map.get("disable_config_load") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.disable_config_load()); - map.get("disable_ec2_metadata") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.disable_ec2_metadata()); - map.get("enable_virtual_host_style") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.enable_virtual_host_style()); - map.get("allow_anonymous") - .filter(|v| *v == "on" || *v == "true") - .map(|_| builder.allow_anonymous()); - map.get("default_storage_class") - .map(|v: &String| builder.default_storage_class(v)); - map.get("batch_max_operations") - .map(|v| builder.batch_max_operations(v.parse().expect("input must be a number"))); - - builder + let config = S3Config::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + S3Builder { + config, + customed_credential_load: None, + http_client: None, + } } fn build(&mut self) -> Result<Self::Accessor> { debug!("backend build started: {:?}", &self); - let root = normalize_root(&self.root.take().unwrap_or_default()); + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); debug!("backend use root {}", &root); // Handle bucket name. let bucket = if self.is_bucket_valid() { - Ok(&self.bucket) + Ok(&self.config.bucket) } else { Err( Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured") @@ -697,14 +789,14 @@ impl Builder for S3Builder { }?; debug!("backend use bucket {}", &bucket); - let default_storage_class = match &self.default_storage_class { + let default_storage_class = match &self.config.default_storage_class { None => None, Some(v) => Some( build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?, ), }; - let server_side_encryption = match &self.server_side_encryption { + let server_side_encryption = match &self.config.server_side_encryption { None => None, Some(v) => Some( build_header_value(v) @@ -713,7 +805,7 @@ impl Builder for S3Builder { }; let server_side_encryption_aws_kms_key_id = - match &self.server_side_encryption_aws_kms_key_id { + match &self.config.server_side_encryption_aws_kms_key_id { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_aws_kms_key_id") @@ -721,7 +813,7 @@ impl Builder for S3Builder { }; let server_side_encryption_customer_algorithm = - match &self.server_side_encryption_customer_algorithm { + match &self.config.server_side_encryption_customer_algorithm { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_customer_algorithm") @@ -729,7 +821,7 @@ impl Builder for S3Builder { }; let server_side_encryption_customer_key = - match &self.server_side_encryption_customer_key { + match &self.config.server_side_encryption_customer_key { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_customer_key") @@ -737,7 +829,7 @@ impl Builder for S3Builder { }; let server_side_encryption_customer_key_md5 = - match &self.server_side_encryption_customer_key_md5 { + match &self.config.server_side_encryption_customer_key_md5 { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_customer_key_md5") @@ -755,12 +847,12 @@ impl Builder for S3Builder { // This is our current config. let mut cfg = AwsConfig::default(); - if !self.disable_config_load { + if !self.config.disable_config_load { cfg = cfg.from_profile(); cfg = cfg.from_env(); } - if let Some(v) = self.region.take() { + if let Some(v) = self.config.region.take() { cfg.region = Some(v); } if cfg.region.is_none() { @@ -780,13 +872,13 @@ impl Builder for S3Builder { debug!("backend use endpoint: {endpoint}"); // Setting all value from user input if available. - if let Some(v) = self.access_key_id.take() { + if let Some(v) = self.config.access_key_id.take() { cfg.access_key_id = Some(v) } - if let Some(v) = self.secret_access_key.take() { + if let Some(v) = self.config.secret_access_key.take() { cfg.secret_access_key = Some(v) } - if let Some(v) = self.security_token.take() { + if let Some(v) = self.config.security_token.take() { cfg.session_token = Some(v) } @@ -797,7 +889,7 @@ impl Builder for S3Builder { } // If role_arn is set, we must use AssumeRoleLoad. - if let Some(role_arn) = self.role_arn.take() { + if let Some(role_arn) = self.config.role_arn.take() { // use current env as source credential loader. let default_loader = AwsDefaultLoader::new(client.client(), cfg.clone()); @@ -805,7 +897,7 @@ impl Builder for S3Builder { let assume_role_cfg = AwsConfig { region: Some(region.clone()), role_arn: Some(role_arn), - external_id: self.external_id.clone(), + external_id: self.config.external_id.clone(), sts_regional_endpoints: "regional".to_string(), ..Default::default() }; @@ -829,7 +921,7 @@ impl Builder for S3Builder { Some(v) => v, None => { let mut default_loader = AwsDefaultLoader::new(client.client(), cfg); - if self.disable_ec2_metadata { + if self.config.disable_ec2_metadata { default_loader = default_loader.with_disable_ec2_metadata(); } @@ -840,6 +932,7 @@ impl Builder for S3Builder { let signer = AwsV4Signer::new("s3", ®ion); let batch_max_operations = self + .config .batch_max_operations .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS); debug!("backend build finished"); @@ -854,7 +947,7 @@ impl Builder for S3Builder { server_side_encryption_customer_key, server_side_encryption_customer_key_md5, default_storage_class, - allow_anonymous: self.allow_anonymous, + allow_anonymous: self.config.allow_anonymous, signer, loader, client, diff --git a/core/src/services/s3/mod.rs b/core/src/services/s3/mod.rs index 876dfacb4..9e343f9c1 100644 --- a/core/src/services/s3/mod.rs +++ b/core/src/services/s3/mod.rs @@ -17,6 +17,7 @@ mod backend; pub use backend::S3Builder as S3; +pub use backend::S3Config; mod core; mod error;
