This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch registry-more in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 621cba1bc71e5153de51b35e96cf6e5c09b78fea Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 1 18:50:24 2025 +0800 Introduce operator uri Signed-off-by: Xuanwo <[email protected]> --- core/src/blocking/operator.rs | 8 +- core/src/docs/rfcs/5444_operator_from_uri.md | 32 ++-- core/src/services/azblob/backend.rs | 80 ++------ core/src/services/b2/backend.rs | 69 ++----- core/src/services/cos/backend.rs | 68 ++----- core/src/services/fs/backend.rs | 52 +++--- core/src/services/gcs/backend.rs | 69 ++----- core/src/services/memory/backend.rs | 25 ++- core/src/services/obs/backend.rs | 67 ++----- core/src/services/oss/backend.rs | 68 ++----- core/src/services/s3/backend.rs | 38 ++-- core/src/services/upyun/backend.rs | 69 ++----- core/src/types/builder.rs | 7 +- core/src/types/mod.rs | 2 + core/src/types/operator/builder.rs | 10 +- core/src/types/operator/mod.rs | 3 + core/src/types/operator/registry.rs | 56 +----- core/src/types/operator/uri.rs | 264 +++++++++++++++++++++++++++ 18 files changed, 459 insertions(+), 528 deletions(-) diff --git a/core/src/blocking/operator.rs b/core/src/blocking/operator.rs index 0ae55953f..34a83c4ff 100644 --- a/core/src/blocking/operator.rs +++ b/core/src/blocking/operator.rs @@ -17,6 +17,7 @@ use tokio::runtime::Handle; +use crate::types::IntoOperatorUri; use crate::Operator as AsyncOperator; use crate::*; @@ -136,11 +137,8 @@ impl Operator { } /// Create a blocking operator from URI based configuration. - pub fn from_uri( - uri: &str, - options: impl IntoIterator<Item = (String, String)>, - ) -> Result<Self> { - let op = AsyncOperator::from_uri(uri, options)?; + pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> { + let op = AsyncOperator::from_uri(uri)?; Self::new(op) } diff --git a/core/src/docs/rfcs/5444_operator_from_uri.md b/core/src/docs/rfcs/5444_operator_from_uri.md index 5b5f21c2d..5bb203d5b 100644 --- a/core/src/docs/rfcs/5444_operator_from_uri.md +++ b/core/src/docs/rfcs/5444_operator_from_uri.md @@ -24,18 +24,17 @@ The new API allows creating operators directly from URIs: ```rust // Create an operator using URI -let op = Operator::from_uri("s3://my-bucket/path", vec![ - ("endpoint".to_string(), "http://localhost:8080"to_string()), -])?; +let op = Operator::from_uri("s3://my-bucket/path")?; // Users can pass options through the URI along with additional key-value pairs // The extra options will override identical options specified in the URI -let op = Operator::from_uri("s3://my-bucket/path?region=us-east-1", vec![ - ("endpoint".to_string(), "http://localhost:8080"to_string()), -])?; +let op = Operator::from_uri(( + "s3://my-bucket/path?region=us-east-1", + [("endpoint", "http://localhost:8080")], +))?; // Create a file system operator -let op = Operator::from_uri("fs:///tmp/test", vec![])?; +let op = Operator::from_uri("fs:///tmp/test")?; ``` OpenDAL will, by default, register services enabled by features in a global `OperatorRegistry`. Users can also create custom operator registries to support their own schemes or additional options. @@ -53,7 +52,7 @@ registry.register::<services::S3>("r2"); // Cloudflare R2 is S3-compatible registry.register::<services::S3>("company-storage"); registry.register::<services::Azblob>("backup-storage"); -let op = registry.load("company-storage://bucket/path", [])?; +let op = registry.load("company-storage://bucket/path")?; ``` # Reference-level explanation @@ -62,12 +61,10 @@ The implementation consists of three main components: 1. The `OperatorFactory` and `OperatorRegistry`: -`OperatorFactory` is a function type that takes a URI string plus options and returns an `Operator`. `OperatorRegistry` manages factories registered under different schemes. - -`OperatorFactory` is a function type that takes a URI string plus options and returns an `Operator`. `OperatorRegistry` manages factories registered under different schemes. +`OperatorFactory` is a function type that takes a parsed `OperatorUri` and returns an `Operator`. `OperatorRegistry` manages factories registered under different schemes. ```rust -type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>; +type OperatorFactory = fn(&OperatorUri) -> Result<Operator>; pub struct OperatorRegistry { ... } @@ -76,7 +73,7 @@ impl OperatorRegistry { ... } - fn load(&self, uri: &str, options: impl IntoIterator<Item = (String, String)>) -> Result<Operator> { + fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> { ... } } @@ -84,11 +81,11 @@ impl OperatorRegistry { 2. The `Configurator` trait extension: -`Configurator` will add a new API to create a configuration from a URI and options. Services should only parse the URI components relevant to their configuration (host, path, query parameters) without concerning themselves with the scheme portion. +`Configurator` will add a new API to create a configuration from a parsed `OperatorUri`. Services should only inspect the URI components relevant to their configuration (name, root, options) without concerning themselves with the scheme portion. ```rust impl Configurator for S3Config { - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { + fn from_uri(uri: &OperatorUri) -> Result<Self> { ... } } @@ -102,10 +99,7 @@ The `Operator` trait will add a new `from_uri` method to create an operator from ```rust impl Operator { - pub fn from_uri( - uri: &str, - options: impl IntoIterator<Item = (String, String)>, - ) -> Result<Self> { + pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> { ... } } diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 72b7436cc..812b657a8 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -24,9 +23,7 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; use http::Response; use http::StatusCode; -use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use reqsign::AzureStorageConfig; use reqsign::AzureStorageLoader; use reqsign::AzureStorageSigner; @@ -44,6 +41,7 @@ use super::writer::AzblobWriters; use super::AZBLOB_SCHEME; use crate::raw::*; use crate::services::AzblobConfig; +use crate::types::OperatorUri; use crate::*; const AZBLOB_BATCH_LIMIT: usize = 256; @@ -62,60 +60,15 @@ impl From<AzureStorageConfig> for AzblobConfig { impl Configurator for AzblobConfig { type Builder = AzblobBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_container = map.get("container").map(|v| !v.is_empty()).unwrap_or(false); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_container { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if host.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "azblob uri requires container", - )); - } - map.insert("container".to_string(), host.to_string()); - - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "azblob uri requires container", - )); - } - - let mut segments = trimmed.splitn(2, '/'); - let container = segments.next().unwrap(); - if container.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "azblob uri requires container", - )); - } + if let Some(container) = uri.name() { map.insert("container".to_string(), container.to_string()); + } - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -658,22 +611,29 @@ impl Access for AzblobBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_with_host_container() { - let uri: Uri = "azblob://my-container/path/to/root".parse().unwrap(); - let cfg = AzblobConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "azblob://my-container/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = AzblobConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.container, "my-container"); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } #[test] fn from_uri_with_path_container() { - let uri: Uri = "azblob:///my-container/nested/root".parse().unwrap(); - let cfg = AzblobConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "azblob:///my-container/nested/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = AzblobConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.container, "my-container"); assert_eq!(cfg.root.as_deref(), Some("nested/root")); } diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs index 868039c8f..afe70b23d 100644 --- a/core/src/services/b2/backend.rs +++ b/core/src/services/b2/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -23,9 +22,7 @@ use std::sync::Arc; use http::Request; use http::Response; use http::StatusCode; -use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use tokio::sync::RwLock; use super::core::constants; @@ -40,59 +37,20 @@ use super::writer::B2Writers; use super::B2_SCHEME; use crate::raw::*; use crate::services::B2Config; +use crate::types::OperatorUri; use crate::*; impl Configurator for B2Config { type Builder = B2Builder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_bucket = map.get("bucket").map(|v| !v.is_empty()).unwrap_or(false); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_bucket { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if !host.is_empty() { - map.insert("bucket".to_string(), host.to_string()); - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - if !map.contains_key("bucket") { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "b2 uri requires bucket", - )); - } - let mut segments = trimmed.splitn(2, '/'); - let bucket = segments.next().unwrap(); - if bucket.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "b2 uri requires bucket", - )); - } - map.insert("bucket".to_string(), bucket.to_string()); - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -493,17 +451,18 @@ impl Access for B2Backend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "b2://example-bucket/path/to/root".parse().unwrap(); - let mut options = HashMap::new(); - options.insert("bucket_id".to_string(), "bucket-id".to_string()); + let uri = OperatorUri::new( + "b2://example-bucket/path/to/root".parse().unwrap(), + vec![("bucket_id".to_string(), "bucket-id".to_string())], + ) + .unwrap(); - let cfg = B2Config::from_uri(&uri, &options).unwrap(); + let cfg = B2Config::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket, "example-bucket"); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); assert_eq!(cfg.bucket_id, "bucket-id"); diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index d1750fb5a..aa0b58ec6 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -23,7 +22,6 @@ use http::Response; use http::StatusCode; use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use reqsign::TencentCosConfig; use reqsign::TencentCosCredentialLoader; use reqsign::TencentCosSigner; @@ -40,61 +38,20 @@ use super::COS_SCHEME; use crate::raw::oio::PageLister; use crate::raw::*; use crate::services::CosConfig; +use crate::types::OperatorUri; use crate::*; impl Configurator for CosConfig { type Builder = CosBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_bucket = map.get("bucket").map(|v| !v.is_empty()).unwrap_or(false); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_bucket { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if !host.is_empty() { - map.insert("bucket".to_string(), host.to_string()); - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - let has_bucket = map.get("bucket").map(|v| !v.is_empty()).unwrap_or(false); - - if !has_bucket { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "cos uri requires bucket", - )); - } - let mut segments = trimmed.splitn(2, '/'); - let bucket = segments.next().unwrap(); - if bucket.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "cos uri requires bucket", - )); - } - map.insert("bucket".to_string(), bucket.to_string()); - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -502,14 +459,17 @@ impl Access for CosBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "cos://example-bucket/path/to/root".parse().unwrap(); - let cfg = CosConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "cos://example-bucket/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = CosConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket.as_deref(), Some("example-bucket")); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 93d0e40fd..a5ddac1ff 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -15,15 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use log::debug; -use http::Uri; -use percent_encoding::percent_decode_str; - use super::core::*; use super::delete::FsDeleter; use super::lister::FsLister; @@ -33,27 +29,28 @@ use super::writer::FsWriters; use super::FS_SCHEME; use crate::raw::*; use crate::services::FsConfig; +use crate::types::OperatorUri; use crate::*; impl Configurator for FsConfig { type Builder = FsBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); - - if !map.contains_key("root") { - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - if path.is_empty() || path == "/" { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "fs uri requires absolute path", - )); - } - if !path.starts_with('/') { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "fs uri root must be absolute", - )); - } - map.insert("root".to_string(), path.to_string()); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); + + if map.contains_key("root") { + return Self::from_iter(map); + } + + if let Some(root) = uri.root() { + let value = if uri.name().is_none() { + if root.starts_with('/') { + root.to_string() + } else { + format!("/{}", root) + } + } else { + root.to_string() + }; + map.insert("root".to_string(), value); } Self::from_iter(map) @@ -301,14 +298,17 @@ impl Access for FsBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_root() { - let uri: Uri = "fs:///tmp/data".parse().unwrap(); - let cfg = FsConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "fs:///tmp/data".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = FsConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.root.as_deref(), Some("/tmp/data")); } } diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index 00848905d..ff6e93ade 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -15,16 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; use http::Response; use http::StatusCode; -use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use reqsign::GoogleCredentialLoader; use reqsign::GoogleSigner; use reqsign::GoogleTokenLoad; @@ -40,6 +37,7 @@ use super::GCS_SCHEME; use crate::raw::oio::BatchDeleter; use crate::raw::*; use crate::services::GcsConfig; +use crate::types::OperatorUri; use crate::*; const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com"; const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write"; @@ -47,57 +45,15 @@ const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read impl Configurator for GcsConfig { type Builder = GcsBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_bucket = !map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_bucket { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if !host.is_empty() { - map.insert("bucket".to_string(), host.to_string()); - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - let has_bucket = !map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - - if !has_bucket { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "gcs uri requires bucket", - )); - } - let mut segments = trimmed.splitn(2, '/'); - let bucket = segments.next().unwrap(); - if bucket.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "gcs uri requires bucket", - )); - } - map.insert("bucket".to_string(), bucket.to_string()); - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -560,14 +516,17 @@ impl Access for GcsBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "gcs://example-bucket/path/to/root".parse().unwrap(); - let cfg = GcsConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "gcs://example-bucket/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = GcsConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket, "example-bucket"); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } diff --git a/core/src/services/memory/backend.rs b/core/src/services/memory/backend.rs index b1108fd63..5ee731efa 100644 --- a/core/src/services/memory/backend.rs +++ b/core/src/services/memory/backend.rs @@ -15,13 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use http::Uri; -use percent_encoding::percent_decode_str; - use super::core::*; use super::delete::MemoryDeleter; use super::lister::MemoryLister; @@ -30,16 +26,16 @@ use super::MEMORY_SCHEME; use crate::raw::oio; use crate::raw::*; use crate::services::MemoryConfig; +use crate::types::OperatorUri; use crate::*; impl Configurator for MemoryConfig { type Builder = MemoryBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); if !map.contains_key("root") { - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - if !path.is_empty() && path != "/" { - map.insert("root".to_string(), path.trim_start_matches('/').to_string()); + if let Some(root) = uri.root().filter(|v| !v.is_empty()) { + map.insert("root".to_string(), root.to_string()); } } @@ -193,9 +189,8 @@ impl Access for MemoryAccessor { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn test_accessor_metadata_name() { @@ -208,8 +203,12 @@ mod tests { #[test] fn from_uri_extracts_root() { - let uri: Uri = "memory://localhost/path/to/root".parse().unwrap(); - let cfg = MemoryConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "memory://localhost/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = MemoryConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } } diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index f61167429..831c7f8ed 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -24,7 +24,6 @@ use http::Response; use http::StatusCode; use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use reqsign::HuaweicloudObsConfig; use reqsign::HuaweicloudObsCredentialLoader; use reqsign::HuaweicloudObsSigner; @@ -39,61 +38,20 @@ use super::writer::ObsWriters; use super::OBS_SCHEME; use crate::raw::*; use crate::services::ObsConfig; +use crate::types::OperatorUri; use crate::*; impl Configurator for ObsConfig { type Builder = ObsBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_bucket = map.get("bucket").map(|v| !v.is_empty()).unwrap_or(false); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_bucket { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if !host.is_empty() { - map.insert("bucket".to_string(), host.to_string()); - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - let has_bucket = map.get("bucket").map(|v| !v.is_empty()).unwrap_or(false); - - if !has_bucket { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "obs uri requires bucket", - )); - } - let mut segments = trimmed.splitn(2, '/'); - let bucket = segments.next().unwrap(); - if bucket.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "obs uri requires bucket", - )); - } - map.insert("bucket".to_string(), bucket.to_string()); - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -501,14 +459,17 @@ impl Access for ObsBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "obs://example-bucket/path/to/root".parse().unwrap(); - let cfg = ObsConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "obs://example-bucket/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = ObsConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket.as_deref(), Some("example-bucket")); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 140738275..0f167d748 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -24,7 +23,6 @@ use http::Response; use http::StatusCode; use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use reqsign::AliyunConfig; use reqsign::AliyunLoader; use reqsign::AliyunOssSigner; @@ -40,63 +38,22 @@ use super::writer::OssWriters; use super::OSS_SCHEME; use crate::raw::*; use crate::services::OssConfig; +use crate::types::OperatorUri; use crate::*; const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000; impl Configurator for OssConfig { type Builder = OssBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_bucket = !map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_bucket { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if !host.is_empty() { - map.insert("bucket".to_string(), host.to_string()); - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - let has_bucket = !map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - - if !has_bucket { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "oss uri requires bucket", - )); - } - let mut segments = trimmed.splitn(2, '/'); - let bucket = segments.next().unwrap(); - if bucket.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "oss uri requires bucket", - )); - } - map.insert("bucket".to_string(), bucket.to_string()); - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -794,14 +751,17 @@ impl Access for OssBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "oss://example-bucket/path/to/root".parse().unwrap(); - let cfg = OssConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "oss://example-bucket/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = OssConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket, "example-bucket"); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 4f3676866..d590ee0b8 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -30,12 +30,10 @@ use constants::X_AMZ_META_PREFIX; use constants::X_AMZ_VERSION_ID; use http::Response; use http::StatusCode; -use http::Uri; use log::debug; use log::warn; use md5::Digest; use md5::Md5; -use percent_encoding::percent_decode_str; use reqsign::AwsAssumeRoleLoader; use reqsign::AwsConfig; use reqsign::AwsCredentialLoad; @@ -56,6 +54,7 @@ use super::S3_SCHEME; use crate::raw::oio::PageLister; use crate::raw::*; use crate::services::S3Config; +use crate::types::OperatorUri; use crate::*; /// Allow constructing correct region endpoint if user gives a global endpoint. @@ -74,25 +73,15 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000; impl Configurator for S3Config { type Builder = S3Builder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); - - let bucket_missing = map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - if bucket_missing { - let bucket = uri - .authority() - .map(|authority| authority.host()) - .filter(|host| !host.is_empty()) - .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri requires bucket"))?; - map.insert("bucket".to_string(), bucket.to_string()); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); + + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - if !map.contains_key("root") { - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -1188,9 +1177,8 @@ impl Access for S3Backend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn test_is_valid_bucket() { @@ -1296,8 +1284,12 @@ mod tests { #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "s3://example-bucket/path/to/root".parse().unwrap(); - let cfg = S3Config::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "s3://example-bucket/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = S3Config::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket, "example-bucket"); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } diff --git a/core/src/services/upyun/backend.rs b/core/src/services/upyun/backend.rs index c13f421c4..f9e71c5b4 100644 --- a/core/src/services/upyun/backend.rs +++ b/core/src/services/upyun/backend.rs @@ -15,16 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; use http::Response; use http::StatusCode; -use http::Uri; use log::debug; -use percent_encoding::percent_decode_str; use super::core::*; use super::delete::UpyunDeleter; @@ -35,61 +32,20 @@ use super::writer::UpyunWriters; use super::UPYUN_SCHEME; use crate::raw::*; use crate::services::UpyunConfig; +use crate::types::OperatorUri; use crate::*; impl Configurator for UpyunConfig { type Builder = UpyunBuilder; - fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> { - let mut map = options.clone(); + fn from_uri(uri: &OperatorUri) -> Result<Self> { + let mut map = uri.options().clone(); - let has_bucket = !map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - - let path = percent_decode_str(uri.path()).decode_utf8_lossy(); - - if has_bucket { - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } else if let Some(authority) = uri.authority() { - let host = authority.host(); - if !host.is_empty() { - map.insert("bucket".to_string(), host.to_string()); - if !map.contains_key("root") { - let trimmed = path.trim_matches('/'); - if !trimmed.is_empty() { - map.insert("root".to_string(), trimmed.to_string()); - } - } - } + if let Some(name) = uri.name() { + map.insert("bucket".to_string(), name.to_string()); } - let has_bucket = !map.get("bucket").map(|v| v.is_empty()).unwrap_or(true); - - if !has_bucket { - let trimmed = path.trim_matches('/'); - if trimmed.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "upyun uri requires bucket", - )); - } - let mut segments = trimmed.splitn(2, '/'); - let bucket = segments.next().unwrap(); - if bucket.is_empty() { - return Err(Error::new( - ErrorKind::ConfigInvalid, - "upyun uri requires bucket", - )); - } - map.insert("bucket".to_string(), bucket.to_string()); - if let Some(rest) = segments.next() { - if !rest.is_empty() && !map.contains_key("root") { - map.insert("root".to_string(), rest.to_string()); - } - } + if let Some(root) = uri.root() { + map.insert("root".to_string(), root.to_string()); } Self::from_iter(map) @@ -378,14 +334,17 @@ impl Access for UpyunBackend { #[cfg(test)] mod tests { use super::*; + use crate::types::OperatorUri; use crate::Configurator; - use http::Uri; - use std::collections::HashMap; #[test] fn from_uri_extracts_bucket_and_root() { - let uri: Uri = "upyun://example-bucket/path/to/root".parse().unwrap(); - let cfg = UpyunConfig::from_uri(&uri, &HashMap::new()).unwrap(); + let uri = OperatorUri::new( + "upyun://example-bucket/path/to/root".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + let cfg = UpyunConfig::from_uri(&uri).unwrap(); assert_eq!(cfg.bucket, "example-bucket"); assert_eq!(cfg.root.as_deref(), Some("path/to/root")); } diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs index b8cc5c960..2ebe36144 100644 --- a/core/src/types/builder.rs +++ b/core/src/types/builder.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; -use http::Uri; use serde::de::DeserializeOwned; use serde::Serialize; use crate::raw::*; +use crate::types::OperatorUri; use crate::*; /// Builder is used to set up underlying services. @@ -125,8 +124,8 @@ pub trait Configurator: Serialize + DeserializeOwned + Debug + 'static { /// Associated builder for this configuration. type Builder: Builder; - /// Build configuration from a URI plus merged options. - fn from_uri(_uri: &Uri, _options: &HashMap<String, String>) -> Result<Self> { + /// Build configuration from a parsed URI plus merged options. + fn from_uri(_uri: &OperatorUri) -> Result<Self> { Err(Error::new(ErrorKind::Unsupported, "uri is not supported")) } diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index dd8a09ede..f26511073 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -44,11 +44,13 @@ pub use execute::*; mod operator; pub use operator::operator_futures; +pub use operator::IntoOperatorUri; pub use operator::Operator; pub use operator::OperatorBuilder; pub use operator::OperatorFactory; pub use operator::OperatorInfo; pub use operator::OperatorRegistry; +pub use operator::OperatorUri; pub use operator::DEFAULT_OPERATOR_REGISTRY; mod builder; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 29ae15c82..e50e2f32a 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use crate::layers::*; use crate::raw::*; +use crate::types::IntoOperatorUri; use crate::*; /// # Operator build API @@ -141,16 +142,13 @@ impl Operator { /// use opendal::Operator; /// /// # fn example() -> Result<()> { - /// let op = Operator::from_uri("memory://localhost/", [])?; + /// let op = Operator::from_uri("memory://localhost/")?; /// # let _ = op; /// # Ok(()) /// # } /// ``` - pub fn from_uri( - uri: &str, - options: impl IntoIterator<Item = (String, String)>, - ) -> Result<Operator> { - crate::DEFAULT_OPERATOR_REGISTRY.load(uri, options) + pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Operator> { + crate::DEFAULT_OPERATOR_REGISTRY.load(uri) } /// Create a new operator via given scheme and iterator of config value in dynamic dispatch. diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs index 1c0044c87..87b27f21a 100644 --- a/core/src/types/operator/mod.rs +++ b/core/src/types/operator/mod.rs @@ -31,3 +31,6 @@ pub mod operator_futures; mod registry; pub use registry::{OperatorFactory, OperatorRegistry, DEFAULT_OPERATOR_REGISTRY}; + +mod uri; +pub use uri::{IntoOperatorUri, OperatorUri}; diff --git a/core/src/types/operator/registry.rs b/core/src/types/operator/registry.rs index 22d05c408..006c6d843 100644 --- a/core/src/types/operator/registry.rs +++ b/core/src/types/operator/registry.rs @@ -18,15 +18,13 @@ use std::collections::HashMap; use std::sync::{LazyLock, Mutex}; -use http::Uri; -use percent_encoding::percent_decode_str; - use crate::services; use crate::types::builder::{Builder, Configurator}; +use crate::types::{IntoOperatorUri, OperatorUri}; use crate::{Error, ErrorKind, Operator, Result}; /// Factory signature used to construct [`Operator`] from a URI and extra options. -pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>; +pub type OperatorFactory = fn(&OperatorUri) -> Result<Operator>; /// Default registry initialized with builtin services. pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = LazyLock::new(|| { @@ -60,33 +58,22 @@ impl OperatorRegistry { } /// Load an [`Operator`] via the factory registered for the URI's scheme. - pub fn load( - &self, - uri: &str, - options: impl IntoIterator<Item = (String, String)>, - ) -> Result<Operator> { - let parsed = uri.parse::<Uri>().map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err) - })?; - - let scheme = parsed - .scheme_str() - .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is required"))?; + pub fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> { + let parsed = uri.into_operator_uri()?; + let scheme = parsed.scheme(); - let key = scheme.to_ascii_lowercase(); let factory = self .factories .lock() .expect("operator registry mutex poisoned") - .get(key.as_str()) + .get(scheme) .copied() .ok_or_else(|| { Error::new(ErrorKind::Unsupported, "scheme is not registered") - .with_context("scheme", scheme) + .with_context("scheme", scheme.to_string()) })?; - let opts: Vec<(String, String)> = options.into_iter().collect(); - factory(uri, opts) + factory(&parsed) } } @@ -114,30 +101,7 @@ fn register_builtin_services(registry: &OperatorRegistry) { } /// Factory adapter that builds an operator from a configurator type. -fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) -> Result<Operator> { - let parsed = uri.parse::<Uri>().map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err) - })?; - - let mut params = HashMap::new(); - if let Some(query) = parsed.query() { - for pair in query.split('&') { - if pair.is_empty() { - continue; - } - let mut parts = pair.splitn(2, '='); - let key = parts.next().unwrap_or(""); - let value = parts.next().unwrap_or(""); - let key = percent_decode_str(key).decode_utf8_lossy().to_string(); - let value = percent_decode_str(value).decode_utf8_lossy().to_string(); - params.insert(key.to_ascii_lowercase(), value); - } - } - - for (key, value) in options { - params.insert(key.to_ascii_lowercase(), value); - } - - let cfg = C::from_uri(&parsed, ¶ms)?; +fn factory<C: Configurator>(uri: &OperatorUri) -> Result<Operator> { + let cfg = C::from_uri(uri)?; Ok(Operator::from_config(cfg)?.finish()) } diff --git a/core/src/types/operator/uri.rs b/core/src/types/operator/uri.rs new file mode 100644 index 000000000..823db7dbe --- /dev/null +++ b/core/src/types/operator/uri.rs @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use http::Uri; +use percent_encoding::percent_decode_str; + +use crate::{Error, ErrorKind, Result}; + +/// Parsed representation of an operator URI with normalized components. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct OperatorUri { + scheme: String, + name: Option<String>, + root: Option<String>, + options: HashMap<String, String>, +} + +impl OperatorUri { + /// Build [`OperatorUri`] from a [`Uri`] plus additional options. + pub fn new( + uri: Uri, + extra_options: impl IntoIterator<Item = (String, String)>, + ) -> Result<Self> { + let scheme = uri + .scheme_str() + .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is required"))? + .to_ascii_lowercase(); + + let mut options = HashMap::new(); + + if let Some(query) = uri.query() { + for pair in query.split('&') { + if pair.is_empty() { + continue; + } + let mut parts = pair.splitn(2, '='); + let key = parts.next().unwrap_or(""); + let value = parts.next().unwrap_or(""); + let key = percent_decode_str(key) + .decode_utf8_lossy() + .to_ascii_lowercase(); + let value = percent_decode_str(value).decode_utf8_lossy().to_string(); + options.insert(key, value); + } + } + + for (key, value) in extra_options { + options.insert(key.to_ascii_lowercase(), value); + } + + let name = uri.authority().and_then(|authority| { + let host = authority.host(); + if host.is_empty() { + None + } else { + Some(host.to_string()) + } + }); + + let decoded_path = percent_decode_str(uri.path()).decode_utf8_lossy(); + let trimmed = decoded_path.trim_matches('/'); + let root = if trimmed.is_empty() { + None + } else { + Some(trimmed.to_string()) + }; + + Ok(Self { + scheme, + name, + root, + options, + }) + } + + /// Normalized scheme in lowercase. + pub fn scheme(&self) -> &str { + self.scheme.as_str() + } + + /// Name extracted from the URI authority, if present. + pub fn name(&self) -> Option<&str> { + self.name.as_deref() + } + + /// Root path (without leading slash) extracted from the URI path, if present. + pub fn root(&self) -> Option<&str> { + self.root.as_deref() + } + + /// Normalized option map merged from query string and extra options (excluding reserved keys). + pub fn options(&self) -> &HashMap<String, String> { + &self.options + } +} + +/// Conversion trait that builds [`OperatorUri`] from various inputs. +pub trait IntoOperatorUri { + /// Convert the input into an [`OperatorUri`]. + fn into_operator_uri(self) -> Result<OperatorUri>; +} + +impl IntoOperatorUri for OperatorUri { + fn into_operator_uri(self) -> Result<OperatorUri> { + Ok(self) + } +} + +impl IntoOperatorUri for &OperatorUri { + fn into_operator_uri(self) -> Result<OperatorUri> { + Ok(self.clone()) + } +} + +impl IntoOperatorUri for Uri { + fn into_operator_uri(self) -> Result<OperatorUri> { + OperatorUri::new(self, Vec::<(String, String)>::new()) + } +} + +impl IntoOperatorUri for &Uri { + fn into_operator_uri(self) -> Result<OperatorUri> { + OperatorUri::new(self.clone(), Vec::<(String, String)>::new()) + } +} + +impl IntoOperatorUri for &str { + fn into_operator_uri(self) -> Result<OperatorUri> { + let uri = self.parse::<Uri>().map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err) + })?; + OperatorUri::new(uri, Vec::<(String, String)>::new()) + } +} + +impl IntoOperatorUri for String { + fn into_operator_uri(self) -> Result<OperatorUri> { + let uri = self.parse::<Uri>().map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err) + })?; + OperatorUri::new(uri, Vec::<(String, String)>::new()) + } +} + +impl<O, K, V> IntoOperatorUri for (Uri, O) +where + O: IntoIterator<Item = (K, V)>, + K: Into<String>, + V: Into<String>, +{ + fn into_operator_uri(self) -> Result<OperatorUri> { + let (uri, extra) = self; + let opts = extra + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect::<Vec<_>>(); + OperatorUri::new(uri, opts) + } +} + +impl<O, K, V> IntoOperatorUri for (&Uri, O) +where + O: IntoIterator<Item = (K, V)>, + K: Into<String>, + V: Into<String>, +{ + fn into_operator_uri(self) -> Result<OperatorUri> { + let (uri, extra) = self; + let opts = extra + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect::<Vec<_>>(); + OperatorUri::new(uri.clone(), opts) + } +} + +impl<O, K, V> IntoOperatorUri for (&str, O) +where + O: IntoIterator<Item = (K, V)>, + K: Into<String>, + V: Into<String>, +{ + fn into_operator_uri(self) -> Result<OperatorUri> { + let (base, extra) = self; + let uri = base.parse::<Uri>().map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "failed to parse uri").set_source(err) + })?; + let opts = extra + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect::<Vec<_>>(); + OperatorUri::new(uri, opts) + } +} + +impl<O, K, V> IntoOperatorUri for (String, O) +where + O: IntoIterator<Item = (K, V)>, + K: Into<String>, + V: Into<String>, +{ + fn into_operator_uri(self) -> Result<OperatorUri> { + let (base, extra) = self; + (&base[..], extra).into_operator_uri() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::IntoOperatorUri; + + #[test] + fn parse_uri_with_name_and_root() { + let uri = OperatorUri::new( + "s3://example-bucket/photos/2024".parse().unwrap(), + Vec::<(String, String)>::new(), + ) + .unwrap(); + + assert_eq!(uri.scheme(), "s3"); + assert_eq!(uri.name(), Some("example-bucket")); + assert_eq!(uri.root(), Some("photos/2024")); + assert!(uri.options().is_empty()); + } + + #[test] + fn into_operator_uri_merges_extra_options() { + let uri = ( + "s3://bucket/path?region=us-east-1", + vec![("region", "override"), ("endpoint", "https://custom")], + ) + .into_operator_uri() + .unwrap(); + + assert_eq!(uri.scheme(), "s3"); + assert_eq!(uri.name(), Some("bucket")); + assert_eq!(uri.root(), Some("path")); + assert_eq!( + uri.options().get("region").map(String::as_str), + Some("override") + ); + assert_eq!( + uri.options().get("endpoint").map(String::as_str), + Some("https://custom") + ); + } +}
