This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch 6560-reqsign-core-v1-s3 in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 7b9dab80f528542d8c8cb28002ea7e13cde0557e Author: Xuanwo <[email protected]> AuthorDate: Mon Oct 13 13:03:32 2025 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/Cargo.lock | 111 +++++++++++++---- core/Cargo.toml | 12 +- core/src/services/s3/backend.rs | 264 +++++++++++++++++----------------------- core/src/services/s3/core.rs | 160 ++++++++++-------------- core/src/services/s3/writer.rs | 12 +- 5 files changed, 272 insertions(+), 287 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 7d1d09700..c88d54cb6 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4440,7 +4440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a793df0d7afeac54f95b471d3af7f0d4fb975699f972341a4b76988d49cdf0c" dependencies = [ "cfg-if", - "windows-targets 0.53.0", + "windows-targets 0.53.3", ] [[package]] @@ -5360,11 +5360,15 @@ dependencies = [ "prometheus 0.14.0", "prometheus-client", "prost 0.13.5", - "quick-xml 0.38.3", + "quick-xml", "rand 0.8.5", "redb", "redis", "reqsign", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-http-send-reqwest", "reqwest", "rocksdb", "rustls-native-certs 0.8.1", @@ -6513,16 +6517,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "quick-xml" -version = "0.37.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quick-xml" version = "0.38.3" @@ -6941,18 +6935,86 @@ dependencies = [ "log", "once_cell", "percent-encoding", - "quick-xml 0.37.5", "rand 0.8.5", "reqwest", "rsa", + "serde", + "serde_json", + "sha1", + "sha2", +] + +[[package]] +name = "reqsign-aws-v4" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c50993dfb45a89b82dba66b2251984baad70e1b3c502db980f077f095615a26e" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "form_urlencoded", + "http 1.3.1", + "log", + "percent-encoding", + "quick-xml", + "reqsign-core", "rust-ini", "serde", "serde_json", + "serde_urlencoded", + "sha1", +] + +[[package]] +name = "reqsign-core" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f2f07d63648c81c8dbccc19e8e10ef8d57daafb8174e4c2a75f14f33fe8c5ec" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 1.3.1", + "jiff", + "log", + "percent-encoding", "sha1", "sha2", + "windows-sys 0.61.2", +] + +[[package]] +name = "reqsign-file-read-tokio" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "262eb485bb6e8213b13ef10e86ef8613539fb03daa2123b57d96675f784b15b6" +dependencies = [ + "anyhow", + "async-trait", + "reqsign-core", "tokio", ] +[[package]] +name = "reqsign-http-send-reqwest" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ff9bb6507b23175dbda8a91ae1a0ad2317471f6ee117e500d1cf6b9ed1eeb0b" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "http 1.3.1", + "http-body-util", + "reqsign-core", + "reqwest", +] + [[package]] name = "reqwest" version = "0.12.23" @@ -9702,7 +9764,7 @@ dependencies = [ "windows-collections", "windows-core 0.61.2", "windows-future", - "windows-link 0.1.1", + "windows-link 0.1.3", "windows-numerics", ] @@ -9735,7 +9797,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement 0.60.0", "windows-interface 0.59.1", - "windows-link 0.1.1", + "windows-link 0.1.3", "windows-result 0.3.4", "windows-strings", ] @@ -9747,7 +9809,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ "windows-core 0.61.2", - "windows-link 0.1.1", + "windows-link 0.1.3", "windows-threading", ] @@ -9797,9 +9859,9 @@ dependencies = [ [[package]] name = "windows-link" -version = "0.1.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" [[package]] name = "windows-link" @@ -9814,7 +9876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ "windows-core 0.61.2", - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -9832,7 +9894,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -9841,7 +9903,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -9913,10 +9975,11 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.0" +version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", @@ -9933,7 +9996,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index cede6ede5..8f4a4cafa 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -199,9 +199,10 @@ services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", + "dep:reqsign-core", + "dep:reqsign-aws-v4", + "dep:reqsign-file-read-tokio", + "dep:reqsign-http-send-reqwest", "dep:crc32c", ] services-seafile = [] @@ -279,6 +280,11 @@ sqlx = { version = "0.8.0", features = [ # For http based services. reqsign = { version = "0.16.5", default-features = false, optional = true } +# For S3 service migration to v1 +reqsign-core = { version = "2.0", default-features = false, optional = true } +reqsign-aws-v4 = { version = "2.0", default-features = false, optional = true } +reqsign-file-read-tokio = { version = "2.0", default-features = false, optional = true } +reqsign-http-send-reqwest = { version = "2.0", default-features = false, optional = true } # for self-referencing structs ouroboros = { version = "0.18.4", optional = true } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 4570ce6b0..d0cb7a695 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -22,7 +22,6 @@ use std::fmt::Write; use std::str::FromStr; use std::sync::Arc; use std::sync::LazyLock; -use std::sync::atomic::AtomicBool; use base64::Engine; use base64::prelude::BASE64_STANDARD; @@ -36,11 +35,16 @@ use log::warn; use md5::Digest; use md5::Md5; use percent_encoding::percent_decode_str; -use reqsign::AwsAssumeRoleLoader; -use reqsign::AwsConfig; -use reqsign::AwsCredentialLoad; -use reqsign::AwsDefaultLoader; -use reqsign::AwsV4Signer; +use reqsign_aws_v4::Credential; +use reqsign_aws_v4::DefaultCredentialProvider; +use reqsign_aws_v4::RequestSigner as AwsV4Signer; +use reqsign_aws_v4::StaticCredentialProvider; +use reqsign_core::Context; +use reqsign_core::ProvideCredential; +use reqsign_core::ProvideCredentialChain; +use reqsign_core::Signer; +use reqsign_file_read_tokio::TokioFileRead; +use reqsign_http_send_reqwest::ReqwestHttpSend; use reqwest::Url; use super::S3_SCHEME; @@ -102,9 +106,9 @@ impl Configurator for S3Config { fn into_builder(self) -> Self::Builder { S3Builder { config: self, - customized_credential_load: None, http_client: None, + credential_providers: None, } } } @@ -117,10 +121,9 @@ impl Configurator for S3Config { pub struct S3Builder { config: S3Config, - customized_credential_load: Option<Box<dyn AwsCredentialLoad>>, - #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")] http_client: Option<HttpClient>, + credential_providers: Option<ProvideCredentialChain<Credential>>, } impl Debug for S3Builder { @@ -492,15 +495,6 @@ impl S3Builder { self } - /// Adding a customized credential load for service. - /// - /// If customized_credential_load has been set, we will ignore all other - /// credential load methods. - pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self { - self.customized_credential_load = Some(cred); - self - } - /// Specify the http client that used by this service. /// /// # Notes @@ -521,31 +515,37 @@ impl S3Builder { self } - /// Check if `bucket` is valid + /// Replace the credential providers with a custom chain. + pub fn credential_provider_chain(mut self, chain: ProvideCredentialChain<Credential>) -> Self { + self.credential_providers = Some(chain); + self + } + + /// Check if `bucket` is valid. /// `bucket` must be not empty and if `enable_virtual_host_style` is true - /// it couldn't contain dot(.) character - fn is_bucket_valid(&self) -> bool { - if self.config.bucket.is_empty() { + /// it could not contain dot (.) character. + fn is_bucket_valid(config: &S3Config) -> bool { + if config.bucket.is_empty() { return false; } // If enable virtual host style, `bucket` will reside in domain part, // for example `https://bucket_name.s3.us-east-1.amazonaws.com`, // so `bucket` with dot can't be recognized correctly for this format. - if self.config.enable_virtual_host_style && self.config.bucket.contains('.') { + if config.enable_virtual_host_style && config.bucket.contains('.') { return false; } true } /// Build endpoint with given region. - fn build_endpoint(&self, region: &str) -> String { + fn build_endpoint(config: &S3Config, region: &str) -> String { let bucket = { - debug_assert!(self.is_bucket_valid(), "bucket must be valid"); + debug_assert!(Self::is_bucket_valid(config), "bucket must be valid"); - self.config.bucket.as_str() + config.bucket.as_str() }; - let mut endpoint = match &self.config.endpoint { + let mut endpoint = match &config.endpoint { Some(endpoint) => { if endpoint.starts_with("http") { endpoint.to_string() @@ -576,7 +576,7 @@ impl S3Builder { }; // Apply virtual host style. - if self.config.enable_virtual_host_style { + if config.enable_virtual_host_style { endpoint = endpoint.replace("//", &format!("//{bucket}.")) } else { write!(endpoint, "/{bucket}").expect("write into string must succeed"); @@ -745,15 +745,22 @@ impl S3Builder { impl Builder for S3Builder { type Config = S3Config; - fn build(mut self) -> Result<impl Access> { + fn build(self) -> Result<impl Access> { debug!("backend build started: {:?}", &self); - let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + #[allow(deprecated)] + let S3Builder { + config, + http_client, + credential_providers, + } = self; + + let root = normalize_root(&config.root.clone().unwrap_or_default()); debug!("backend use root {}", &root); // Handle bucket name. - let bucket = if self.is_bucket_valid() { - Ok(&self.config.bucket) + let bucket = if Self::is_bucket_valid(&config) { + Ok(&config.bucket) } else { Err( Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured") @@ -762,14 +769,14 @@ impl Builder for S3Builder { }?; debug!("backend use bucket {}", &bucket); - let default_storage_class = match &self.config.default_storage_class { + let default_storage_class = match &config.default_storage_class { None => None, Some(v) => Some( build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?, ), }; - let server_side_encryption = match &self.config.server_side_encryption { + let server_side_encryption = match &config.server_side_encryption { None => None, Some(v) => Some( build_header_value(v) @@ -778,7 +785,7 @@ impl Builder for S3Builder { }; let server_side_encryption_aws_kms_key_id = - match &self.config.server_side_encryption_aws_kms_key_id { + match &config.server_side_encryption_aws_kms_key_id { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_aws_kms_key_id") @@ -786,7 +793,7 @@ impl Builder for S3Builder { }; let server_side_encryption_customer_algorithm = - match &self.config.server_side_encryption_customer_algorithm { + match &config.server_side_encryption_customer_algorithm { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_customer_algorithm") @@ -794,7 +801,7 @@ impl Builder for S3Builder { }; let server_side_encryption_customer_key = - match &self.config.server_side_encryption_customer_key { + match &config.server_side_encryption_customer_key { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_customer_key") @@ -802,14 +809,14 @@ impl Builder for S3Builder { }; let server_side_encryption_customer_key_md5 = - match &self.config.server_side_encryption_customer_key_md5 { + match &config.server_side_encryption_customer_key_md5 { None => None, Some(v) => Some(build_header_value(v).map_err(|err| { err.with_context("key", "server_side_encryption_customer_key_md5") })?), }; - let checksum_algorithm = match self.config.checksum_algorithm.as_deref() { + let checksum_algorithm = match config.checksum_algorithm.as_deref() { Some("crc32c") => Some(ChecksumAlgorithm::Crc32c), None => None, v => { @@ -820,109 +827,65 @@ impl Builder for S3Builder { } }; - // This is our current config. - let mut cfg = AwsConfig::default(); - if !self.config.disable_config_load { - #[cfg(not(target_arch = "wasm32"))] - { - cfg = cfg.from_profile(); - cfg = cfg.from_env(); - } - } - - if let Some(ref v) = self.config.region { - cfg.region = Some(v.to_string()); - } - - if cfg.region.is_none() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "region is missing. Please find it by S3::detect_region() or set them in env.", - ) - .with_operation("Builder::build") - .with_context("service", Scheme::S3)); - } - - let region = cfg.region.to_owned().unwrap(); + // Determine the region + let region = if let Some(ref v) = config.region { + v.to_string() + } else { + // Try to get region from environment + std::env::var("AWS_REGION") + .or_else(|_| std::env::var("AWS_DEFAULT_REGION")) + .map_err(|_| { + Error::new( + ErrorKind::ConfigInvalid, + "region is missing. Please find it by S3::detect_region() or set them in env.", + ) + .with_operation("Builder::build") + .with_context("service", Scheme::S3) + })? + }; debug!("backend use region: {region}"); - // Retain the user's endpoint if it exists; otherwise, try loading it from the environment. - self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone()); - // Building endpoint. - let endpoint = self.build_endpoint(®ion); + let endpoint = Self::build_endpoint(&config, ®ion); debug!("backend use endpoint: {endpoint}"); - // Setting all value from user input if available. - if let Some(v) = self.config.access_key_id { - cfg.access_key_id = Some(v) - } - if let Some(v) = self.config.secret_access_key { - cfg.secret_access_key = Some(v) - } - if let Some(v) = self.config.session_token { - cfg.session_token = Some(v) - } + // Create the context for reqsign-core + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone())); - let mut loader: Option<Box<dyn AwsCredentialLoad>> = None; - // If customized_credential_load is set, we will use it. - if let Some(v) = self.customized_credential_load { - loader = Some(v); - } + let mut provider = if let Some(chain) = credential_providers { + chain + } else { + let mut builder = DefaultCredentialProvider::builder(); - // If role_arn is set, we must use AssumeRoleLoad. - if let Some(role_arn) = self.config.role_arn { - // use current env as source credential loader. - let default_loader = - AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone()); - - // Build the config for assume role. - let mut assume_role_cfg = AwsConfig { - region: Some(region.clone()), - role_arn: Some(role_arn), - external_id: self.config.external_id.clone(), - sts_regional_endpoints: "regional".to_string(), - ..Default::default() - }; + if config.disable_config_load { + builder = builder.disable_env(true).disable_profile(true); + } - // override default role_session_name if set - if let Some(name) = self.config.role_session_name { - assume_role_cfg.role_session_name = name; + if config.disable_ec2_metadata { + builder = builder.disable_imds(true); } - let assume_role_loader = AwsAssumeRoleLoader::new( - GLOBAL_REQWEST_CLIENT.clone().clone(), - assume_role_cfg, - Box::new(default_loader), - ) - .map_err(|err| { - Error::new( - ErrorKind::ConfigInvalid, - "The assume_role_loader is misconfigured", - ) - .with_context("service", Scheme::S3) - .set_source(err) - })?; - loader = Some(Box::new(assume_role_loader)); + ProvideCredentialChain::new().push(builder.build()) + }; + + if let (Some(ak), Some(sk)) = (&config.access_key_id, &config.secret_access_key) { + let static_provider = if let Some(token) = config.session_token.as_deref() { + StaticCredentialProvider::new(ak, sk).with_session_token(token) + } else { + StaticCredentialProvider::new(ak, sk) + }; + provider = provider.push_front(static_provider); } - // If loader is not set, we will use default loader. - let loader = match loader { - Some(v) => v, - None => { - let mut default_loader = - AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg); - if self.config.disable_ec2_metadata { - default_loader = default_loader.with_disable_ec2_metadata(); - } - Box::new(default_loader) - } - }; + // Create request signer for S3 + let request_signer = AwsV4Signer::new("s3", ®ion); - let signer = AwsV4Signer::new("s3", ®ion); + // Create the signer + let signer = Signer::new(ctx, provider, request_signer); - let delete_max_size = self - .config + let delete_max_size = config .delete_max_size .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS); @@ -939,16 +902,11 @@ impl Builder for S3Builder { stat_with_if_none_match: true, stat_with_if_modified_since: true, stat_with_if_unmodified_since: true, - stat_with_override_cache_control: !self - .config - .disable_stat_with_override, - stat_with_override_content_disposition: !self - .config - .disable_stat_with_override, - stat_with_override_content_type: !self - .config + stat_with_override_cache_control: !config.disable_stat_with_override, + stat_with_override_content_disposition: !config .disable_stat_with_override, - stat_with_version: self.config.enable_versioning, + stat_with_override_content_type: !config.disable_stat_with_override, + stat_with_version: config.enable_versioning, read: true, read_with_if_match: true, @@ -958,17 +916,17 @@ impl Builder for S3Builder { read_with_override_cache_control: true, read_with_override_content_disposition: true, read_with_override_content_type: true, - read_with_version: self.config.enable_versioning, + read_with_version: config.enable_versioning, write: true, write_can_empty: true, write_can_multi: true, - write_can_append: self.config.enable_write_with_append, + write_can_append: config.enable_write_with_append, write_with_cache_control: true, write_with_content_type: true, write_with_content_encoding: true, - write_with_if_match: !self.config.disable_write_with_if_match, + write_with_if_match: !config.disable_write_with_if_match, write_with_if_not_exists: true, write_with_user_metadata: true, @@ -987,7 +945,7 @@ impl Builder for S3Builder { delete: true, delete_max_size: Some(delete_max_size), - delete_with_version: self.config.enable_versioning, + delete_with_version: config.enable_versioning, copy: true, @@ -995,8 +953,8 @@ impl Builder for S3Builder { list_with_limit: true, list_with_start_after: true, list_with_recursive: true, - list_with_versions: self.config.enable_versioning, - list_with_deleted: self.config.enable_versioning, + list_with_versions: config.enable_versioning, + list_with_deleted: config.enable_versioning, presign: true, presign_stat: true, @@ -1010,7 +968,7 @@ impl Builder for S3Builder { // allow deprecated api here for compatibility #[allow(deprecated)] - if let Some(client) = self.http_client { + if let Some(client) = http_client { am.update_http_client(|_| client); } @@ -1025,12 +983,10 @@ impl Builder for S3Builder { server_side_encryption_customer_key, server_side_encryption_customer_key_md5, default_storage_class, - allow_anonymous: self.config.allow_anonymous, - disable_list_objects_v2: self.config.disable_list_objects_v2, - enable_request_payer: self.config.enable_request_payer, + allow_anonymous: config.allow_anonymous, + disable_list_objects_v2: config.disable_list_objects_v2, + enable_request_payer: config.enable_request_payer, signer, - loader, - credential_loaded: AtomicBool::new(false), checksum_algorithm, }), }) @@ -1170,9 +1126,9 @@ impl Access for S3Backend { "operation is not supported", )), }; - let mut req = req?; + let req = req?; - self.core.sign_query(&mut req, expire).await?; + let req = self.core.sign_query(req, expire).await?; // We don't need this request anymore, consume it directly. let (parts, _) = req.into_parts(); @@ -1206,7 +1162,7 @@ mod tests { if enable_virtual_host_style { b = b.enable_virtual_host_style(); } - assert_eq!(b.is_bucket_valid(), expected) + assert_eq!(S3Builder::is_bucket_valid(&b.config), expected) } } @@ -1227,7 +1183,7 @@ mod tests { b = b.endpoint(endpoint); } - let endpoint = b.build_endpoint("us-east-2"); + let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2"); assert_eq!(endpoint, "https://s3.us-east-2.amazonaws.com/test"); } @@ -1239,7 +1195,7 @@ mod tests { b = b.endpoint(endpoint); } - let endpoint = b.build_endpoint("us-east-2"); + let endpoint = S3Builder::build_endpoint(&b.config, "us-east-2"); assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com"); } } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 22e7d1f57..d063ce921 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -21,8 +21,6 @@ use std::fmt::Display; use std::fmt::Formatter; use std::fmt::Write; use std::sync::Arc; -use std::sync::atomic; -use std::sync::atomic::AtomicBool; use std::time::Duration; use base64::Engine; @@ -43,9 +41,8 @@ use http::header::IF_MATCH; use http::header::IF_MODIFIED_SINCE; use http::header::IF_NONE_MATCH; use http::header::IF_UNMODIFIED_SINCE; -use reqsign::AwsCredential; -use reqsign::AwsCredentialLoad; -use reqsign::AwsV4Signer; +use reqsign_aws_v4::Credential; +use reqsign_core::Signer; use serde::Deserialize; use serde::Serialize; @@ -104,9 +101,7 @@ pub struct S3Core { pub disable_list_objects_v2: bool, pub enable_request_payer: bool, - pub signer: AwsV4Signer, - pub loader: Box<dyn AwsCredentialLoad>, - pub credential_loaded: AtomicBool, + pub signer: Signer<Credential>, pub checksum_algorithm: Option<ChecksumAlgorithm>, } @@ -121,52 +116,43 @@ impl Debug for S3Core { } impl S3Core { - /// If credential is not found, we will not sign the request. - async fn load_credential(&self) -> Result<Option<AwsCredential>> { - let cred = self - .loader - .load_credential(GLOBAL_REQWEST_CLIENT.clone()) + pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> Result<Request<T>> { + // Skip signing for anonymous access + if self.allow_anonymous { + return Ok(req); + } + + // Sign the request with presigned URL + let (mut parts, body) = req.into_parts(); + + self.signer + .sign(&mut parts, Some(duration)) .await - .map_err(new_request_credential_error)?; + .map_err(|e| new_request_sign_error(e.into()))?; - if let Some(cred) = cred { - // Update credential_loaded to true if we have load credential successfully. - self.credential_loaded - .store(true, atomic::Ordering::Relaxed); - return Ok(Some(cred)); - } + // Always remove host header, let users' client to set it based on HTTP + // version. + // + // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>, + // google server could send RST_STREAM of PROTOCOL_ERROR if our request + // contains host header. + parts.headers.remove(HOST); - // If we have load credential before but failed to load this time, we should - // return error instead. - if self.credential_loaded.load(atomic::Ordering::Relaxed) { - return Err(Error::new( - ErrorKind::PermissionDenied, - "credential was previously loaded successfully but has failed this time", - ) - .set_temporary()); - } + Ok(Request::from_parts(parts, body)) + } - // Credential is empty and users allow anonymous access, we will not sign the request. + pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> { + // Skip signing for anonymous access if self.allow_anonymous { - return Ok(None); + return self.info.http_client().send(req).await; } - Err(Error::new( - ErrorKind::PermissionDenied, - "no valid credential found and anonymous access is not allowed", - )) - } - - pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { - let cred = if let Some(cred) = self.load_credential().await? { - cred - } else { - return Ok(()); - }; + let (mut parts, body) = req.into_parts(); self.signer - .sign(req, &cred) - .map_err(new_request_sign_error)?; + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; // Always remove host header, let users' client to set it based on HTTP // version. @@ -174,21 +160,26 @@ impl S3Core { // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>, // google server could send RST_STREAM of PROTOCOL_ERROR if our request // contains host header. - req.headers_mut().remove(HOST); + parts.headers.remove(HOST); - Ok(()) + self.info + .http_client() + .send(Request::from_parts(parts, body)) + .await } - pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> { - let cred = if let Some(cred) = self.load_credential().await? { - cred - } else { - return Ok(()); - }; + pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> { + // Skip signing for anonymous access + if self.allow_anonymous { + return self.info.http_client().fetch(req).await; + } + + let (mut parts, body) = req.into_parts(); self.signer - .sign_query(req, duration, &cred) - .map_err(new_request_sign_error)?; + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; // Always remove host header, let users' client to set it based on HTTP // version. @@ -196,14 +187,12 @@ impl S3Core { // As discussed in <https://github.com/seanmonstar/reqwest/issues/1809>, // google server could send RST_STREAM of PROTOCOL_ERROR if our request // contains host header. - req.headers_mut().remove(HOST); + parts.headers.remove(HOST); - Ok(()) - } - - #[inline] - pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> { - self.info.http_client().send(req).await + self.info + .http_client() + .fetch(Request::from_parts(parts, body)) + .await } /// # Note @@ -529,11 +518,8 @@ impl S3Core { range: BytesRange, args: &OpRead, ) -> Result<Response<HttpBody>> { - let mut req = self.s3_get_object_request(path, range, args)?; - - self.sign(&mut req).await?; - - self.info.http_client().fetch(req).await + let req = self.s3_get_object_request(path, range, args)?; + self.fetch(req).await } pub fn s3_put_object_request( @@ -610,10 +596,7 @@ impl S3Core { } pub async fn s3_head_object(&self, path: &str, args: OpStat) -> Result<Response<Buffer>> { - let mut req = self.s3_head_object_request(path, args)?; - - self.sign(&mut req).await?; - + let req = self.s3_head_object_request(path, args)?; self.send(req).await } @@ -641,14 +624,12 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::Delete) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } @@ -703,15 +684,13 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::Copy) .header(constants::X_AMZ_COPY_SOURCE, &source) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } @@ -744,14 +723,12 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } @@ -796,14 +773,12 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } @@ -854,9 +829,7 @@ impl S3Core { // Inject operation to the request. req = req.extension(Operation::Write); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; self.send(req).await } @@ -937,12 +910,10 @@ impl S3Core { // Inject operation to the request. req = req.extension(Operation::Write); - let mut req = req + let req = req .body(Buffer::from(Bytes::from(content))) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } @@ -966,13 +937,12 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::Write) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; self.send(req).await } @@ -1008,12 +978,10 @@ impl S3Core { // Inject operation to the request. req = req.extension(Operation::Delete); - let mut req = req + let req = req .body(Buffer::from(Bytes::from(content))) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } @@ -1057,14 +1025,12 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; - self.send(req).await } } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 54a1b6e3f..82932cae8 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -66,12 +66,10 @@ impl S3Writer { impl oio::MultipartWrite for S3Writer { async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> { - let mut req = self + let req = self .core .s3_put_object_request(&self.path, Some(size), &self.op, body)?; - self.core.sign(&mut req).await?; - let resp = self.core.send(req).await?; let status = resp.status(); @@ -117,7 +115,7 @@ impl oio::MultipartWrite for S3Writer { let checksum = self.core.calculate_checksum(&body); - let mut req = self.core.s3_upload_part_request( + let req = self.core.s3_upload_part_request( &self.path, upload_id, part_number, @@ -126,8 +124,6 @@ impl oio::MultipartWrite for S3Writer { checksum.clone(), )?; - self.core.sign(&mut req).await?; - let resp = self.core.send(req).await?; let status = resp.status(); @@ -242,12 +238,10 @@ impl oio::AppendWrite for S3Writer { } async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> { - let mut req = self + let req = self .core .s3_append_object_request(&self.path, offset, size, &self.op, body)?; - self.core.sign(&mut req).await?; - let resp = self.core.send(req).await?; let status = resp.status();
