This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch registry-more
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 621cba1bc71e5153de51b35e96cf6e5c09b78fea
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 1 18:50:24 2025 +0800

    Introduce operator uri
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/blocking/operator.rs                |   8 +-
 core/src/docs/rfcs/5444_operator_from_uri.md |  32 ++--
 core/src/services/azblob/backend.rs          |  80 ++------
 core/src/services/b2/backend.rs              |  69 ++-----
 core/src/services/cos/backend.rs             |  68 ++-----
 core/src/services/fs/backend.rs              |  52 +++---
 core/src/services/gcs/backend.rs             |  69 ++-----
 core/src/services/memory/backend.rs          |  25 ++-
 core/src/services/obs/backend.rs             |  67 ++-----
 core/src/services/oss/backend.rs             |  68 ++-----
 core/src/services/s3/backend.rs              |  38 ++--
 core/src/services/upyun/backend.rs           |  69 ++-----
 core/src/types/builder.rs                    |   7 +-
 core/src/types/mod.rs                        |   2 +
 core/src/types/operator/builder.rs           |  10 +-
 core/src/types/operator/mod.rs               |   3 +
 core/src/types/operator/registry.rs          |  56 +-----
 core/src/types/operator/uri.rs               | 264 +++++++++++++++++++++++++++
 18 files changed, 459 insertions(+), 528 deletions(-)

diff --git a/core/src/blocking/operator.rs b/core/src/blocking/operator.rs
index 0ae55953f..34a83c4ff 100644
--- a/core/src/blocking/operator.rs
+++ b/core/src/blocking/operator.rs
@@ -17,6 +17,7 @@
 
 use tokio::runtime::Handle;
 
+use crate::types::IntoOperatorUri;
 use crate::Operator as AsyncOperator;
 use crate::*;
 
@@ -136,11 +137,8 @@ impl Operator {
     }
 
     /// Create a blocking operator from URI based configuration.
-    pub fn from_uri(
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Self> {
-        let op = AsyncOperator::from_uri(uri, options)?;
+    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
+        let op = AsyncOperator::from_uri(uri)?;
         Self::new(op)
     }
 
diff --git a/core/src/docs/rfcs/5444_operator_from_uri.md 
b/core/src/docs/rfcs/5444_operator_from_uri.md
index 5b5f21c2d..5bb203d5b 100644
--- a/core/src/docs/rfcs/5444_operator_from_uri.md
+++ b/core/src/docs/rfcs/5444_operator_from_uri.md
@@ -24,18 +24,17 @@ The new API allows creating operators directly from URIs:
 
 ```rust
 // Create an operator using URI
-let op = Operator::from_uri("s3://my-bucket/path", vec![
-    ("endpoint".to_string(), "http://localhost:8080"to_string()),
-])?;
+let op = Operator::from_uri("s3://my-bucket/path")?;
 
 // Users can pass options through the URI along with additional key-value pairs
 // The extra options will override identical options specified in the URI
-let op = Operator::from_uri("s3://my-bucket/path?region=us-east-1", vec![
-    ("endpoint".to_string(), "http://localhost:8080"to_string()),
-])?;
+let op = Operator::from_uri((
+    "s3://my-bucket/path?region=us-east-1",
+    [("endpoint", "http://localhost:8080";)],
+))?;
 
 // Create a file system operator
-let op = Operator::from_uri("fs:///tmp/test", vec![])?;
+let op = Operator::from_uri("fs:///tmp/test")?;
 ```
 
 OpenDAL will, by default, register services enabled by features in a global 
`OperatorRegistry`. Users can also create custom operator registries to support 
their own schemes or additional options.
@@ -53,7 +52,7 @@ registry.register::<services::S3>("r2");     // Cloudflare R2 
is S3-compatible
 registry.register::<services::S3>("company-storage");
 registry.register::<services::Azblob>("backup-storage");
 
-let op = registry.load("company-storage://bucket/path", [])?;
+let op = registry.load("company-storage://bucket/path")?;
 ```
 
 # Reference-level explanation
@@ -62,12 +61,10 @@ The implementation consists of three main components:
 
 1. The `OperatorFactory` and `OperatorRegistry`:
 
-`OperatorFactory` is a function type that takes a URI string plus options and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
-
-`OperatorFactory` is a function type that takes a URI string plus options and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
+`OperatorFactory` is a function type that takes a parsed `OperatorUri` and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
 
 ```rust
-type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+type OperatorFactory = fn(&OperatorUri) -> Result<Operator>;
 
 pub struct OperatorRegistry { ... }
 
@@ -76,7 +73,7 @@ impl OperatorRegistry {
         ...
     }
 
-    fn load(&self, uri: &str, options: impl IntoIterator<Item = (String, 
String)>) -> Result<Operator> {
+    fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> {
         ...
     }
 }
@@ -84,11 +81,11 @@ impl OperatorRegistry {
 
 2. The `Configurator` trait extension:
 
-`Configurator` will add a new API to create a configuration from a URI and 
options. Services should only parse the URI components relevant to their 
configuration (host, path, query parameters) without concerning themselves with 
the scheme portion.
+`Configurator` will add a new API to create a configuration from a parsed 
`OperatorUri`. Services should only inspect the URI components relevant to 
their configuration (name, root, options) without concerning themselves with 
the scheme portion.
 
 ```rust
 impl Configurator for S3Config {
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
         ...
     }
 }
@@ -102,10 +99,7 @@ The `Operator` trait will add a new `from_uri` method to 
create an operator from
 
 ```rust
 impl Operator {
-    pub fn from_uri(
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Self> {
+    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
         ...
     }
 }
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 72b7436cc..812b657a8 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
@@ -24,9 +23,7 @@ use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 use reqsign::AzureStorageConfig;
 use reqsign::AzureStorageLoader;
 use reqsign::AzureStorageSigner;
@@ -44,6 +41,7 @@ use super::writer::AzblobWriters;
 use super::AZBLOB_SCHEME;
 use crate::raw::*;
 use crate::services::AzblobConfig;
+use crate::types::OperatorUri;
 use crate::*;
 const AZBLOB_BATCH_LIMIT: usize = 256;
 
@@ -62,60 +60,15 @@ impl From<AzureStorageConfig> for AzblobConfig {
 impl Configurator for AzblobConfig {
     type Builder = AzblobBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_container = map.get("container").map(|v| 
!v.is_empty()).unwrap_or(false);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_container {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if host.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "azblob uri requires container",
-                ));
-            }
-            map.insert("container".to_string(), host.to_string());
-
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "azblob uri requires container",
-                ));
-            }
-
-            let mut segments = trimmed.splitn(2, '/');
-            let container = segments.next().unwrap();
-            if container.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "azblob uri requires container",
-                ));
-            }
+        if let Some(container) = uri.name() {
             map.insert("container".to_string(), container.to_string());
+        }
 
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -658,22 +611,29 @@ impl Access for AzblobBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_with_host_container() {
-        let uri: Uri = "azblob://my-container/path/to/root".parse().unwrap();
-        let cfg = AzblobConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "azblob://my-container/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = AzblobConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.container, "my-container");
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
 
     #[test]
     fn from_uri_with_path_container() {
-        let uri: Uri = "azblob:///my-container/nested/root".parse().unwrap();
-        let cfg = AzblobConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "azblob:///my-container/nested/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = AzblobConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.container, "my-container");
         assert_eq!(cfg.root.as_deref(), Some("nested/root"));
     }
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 868039c8f..afe70b23d 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
@@ -23,9 +22,7 @@ use std::sync::Arc;
 use http::Request;
 use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 use tokio::sync::RwLock;
 
 use super::core::constants;
@@ -40,59 +37,20 @@ use super::writer::B2Writers;
 use super::B2_SCHEME;
 use crate::raw::*;
 use crate::services::B2Config;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for B2Config {
     type Builder = B2Builder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_bucket = map.get("bucket").map(|v| 
!v.is_empty()).unwrap_or(false);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_bucket {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if !host.is_empty() {
-                map.insert("bucket".to_string(), host.to_string());
-                if !map.contains_key("root") {
-                    let trimmed = path.trim_matches('/');
-                    if !trimmed.is_empty() {
-                        map.insert("root".to_string(), trimmed.to_string());
-                    }
-                }
-            }
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        if !map.contains_key("bucket") {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "b2 uri requires bucket",
-                ));
-            }
-            let mut segments = trimmed.splitn(2, '/');
-            let bucket = segments.next().unwrap();
-            if bucket.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "b2 uri requires bucket",
-                ));
-            }
-            map.insert("bucket".to_string(), bucket.to_string());
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -493,17 +451,18 @@ impl Access for B2Backend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "b2://example-bucket/path/to/root".parse().unwrap();
-        let mut options = HashMap::new();
-        options.insert("bucket_id".to_string(), "bucket-id".to_string());
+        let uri = OperatorUri::new(
+            "b2://example-bucket/path/to/root".parse().unwrap(),
+            vec![("bucket_id".to_string(), "bucket-id".to_string())],
+        )
+        .unwrap();
 
-        let cfg = B2Config::from_uri(&uri, &options).unwrap();
+        let cfg = B2Config::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket, "example-bucket");
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
         assert_eq!(cfg.bucket_id, "bucket-id");
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index d1750fb5a..aa0b58ec6 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 
@@ -23,7 +22,6 @@ use http::Response;
 use http::StatusCode;
 use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 use reqsign::TencentCosConfig;
 use reqsign::TencentCosCredentialLoader;
 use reqsign::TencentCosSigner;
@@ -40,61 +38,20 @@ use super::COS_SCHEME;
 use crate::raw::oio::PageLister;
 use crate::raw::*;
 use crate::services::CosConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for CosConfig {
     type Builder = CosBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_bucket = map.get("bucket").map(|v| 
!v.is_empty()).unwrap_or(false);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_bucket {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if !host.is_empty() {
-                map.insert("bucket".to_string(), host.to_string());
-                if !map.contains_key("root") {
-                    let trimmed = path.trim_matches('/');
-                    if !trimmed.is_empty() {
-                        map.insert("root".to_string(), trimmed.to_string());
-                    }
-                }
-            }
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        let has_bucket = map.get("bucket").map(|v| 
!v.is_empty()).unwrap_or(false);
-
-        if !has_bucket {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "cos uri requires bucket",
-                ));
-            }
-            let mut segments = trimmed.splitn(2, '/');
-            let bucket = segments.next().unwrap();
-            if bucket.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "cos uri requires bucket",
-                ));
-            }
-            map.insert("bucket".to_string(), bucket.to_string());
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -502,14 +459,17 @@ impl Access for CosBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "cos://example-bucket/path/to/root".parse().unwrap();
-        let cfg = CosConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "cos://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = CosConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket.as_deref(), Some("example-bucket"));
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 93d0e40fd..a5ddac1ff 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,15 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
 use log::debug;
 
-use http::Uri;
-use percent_encoding::percent_decode_str;
-
 use super::core::*;
 use super::delete::FsDeleter;
 use super::lister::FsLister;
@@ -33,27 +29,28 @@ use super::writer::FsWriters;
 use super::FS_SCHEME;
 use crate::raw::*;
 use crate::services::FsConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for FsConfig {
     type Builder = FsBuilder;
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
-
-        if !map.contains_key("root") {
-            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-            if path.is_empty() || path == "/" {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "fs uri requires absolute path",
-                ));
-            }
-            if !path.starts_with('/') {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "fs uri root must be absolute",
-                ));
-            }
-            map.insert("root".to_string(), path.to_string());
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if map.contains_key("root") {
+            return Self::from_iter(map);
+        }
+
+        if let Some(root) = uri.root() {
+            let value = if uri.name().is_none() {
+                if root.starts_with('/') {
+                    root.to_string()
+                } else {
+                    format!("/{}", root)
+                }
+            } else {
+                root.to_string()
+            };
+            map.insert("root".to_string(), value);
         }
 
         Self::from_iter(map)
@@ -301,14 +298,17 @@ impl Access for FsBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_root() {
-        let uri: Uri = "fs:///tmp/data".parse().unwrap();
-        let cfg = FsConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "fs:///tmp/data".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = FsConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.root.as_deref(), Some("/tmp/data"));
     }
 }
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 00848905d..ff6e93ade 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -15,16 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
 use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 use reqsign::GoogleCredentialLoader;
 use reqsign::GoogleSigner;
 use reqsign::GoogleTokenLoad;
@@ -40,6 +37,7 @@ use super::GCS_SCHEME;
 use crate::raw::oio::BatchDeleter;
 use crate::raw::*;
 use crate::services::GcsConfig;
+use crate::types::OperatorUri;
 use crate::*;
 const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";;
 const DEFAULT_GCS_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.read_write";;
@@ -47,57 +45,15 @@ const DEFAULT_GCS_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.read
 impl Configurator for GcsConfig {
     type Builder = GcsBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_bucket = !map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_bucket {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if !host.is_empty() {
-                map.insert("bucket".to_string(), host.to_string());
-                if !map.contains_key("root") {
-                    let trimmed = path.trim_matches('/');
-                    if !trimmed.is_empty() {
-                        map.insert("root".to_string(), trimmed.to_string());
-                    }
-                }
-            }
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        let has_bucket = !map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-
-        if !has_bucket {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "gcs uri requires bucket",
-                ));
-            }
-            let mut segments = trimmed.splitn(2, '/');
-            let bucket = segments.next().unwrap();
-            if bucket.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "gcs uri requires bucket",
-                ));
-            }
-            map.insert("bucket".to_string(), bucket.to_string());
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -560,14 +516,17 @@ impl Access for GcsBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "gcs://example-bucket/path/to/root".parse().unwrap();
-        let cfg = GcsConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "gcs://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = GcsConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket, "example-bucket");
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
diff --git a/core/src/services/memory/backend.rs 
b/core/src/services/memory/backend.rs
index b1108fd63..5ee731efa 100644
--- a/core/src/services/memory/backend.rs
+++ b/core/src/services/memory/backend.rs
@@ -15,13 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use http::Uri;
-use percent_encoding::percent_decode_str;
-
 use super::core::*;
 use super::delete::MemoryDeleter;
 use super::lister::MemoryLister;
@@ -30,16 +26,16 @@ use super::MEMORY_SCHEME;
 use crate::raw::oio;
 use crate::raw::*;
 use crate::services::MemoryConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for MemoryConfig {
     type Builder = MemoryBuilder;
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
 
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
         if !map.contains_key("root") {
-            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-            if !path.is_empty() && path != "/" {
-                map.insert("root".to_string(), 
path.trim_start_matches('/').to_string());
+            if let Some(root) = uri.root().filter(|v| !v.is_empty()) {
+                map.insert("root".to_string(), root.to_string());
             }
         }
 
@@ -193,9 +189,8 @@ impl Access for MemoryAccessor {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn test_accessor_metadata_name() {
@@ -208,8 +203,12 @@ mod tests {
 
     #[test]
     fn from_uri_extracts_root() {
-        let uri: Uri = "memory://localhost/path/to/root".parse().unwrap();
-        let cfg = MemoryConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "memory://localhost/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = MemoryConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
 }
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index f61167429..831c7f8ed 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -24,7 +24,6 @@ use http::Response;
 use http::StatusCode;
 use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 use reqsign::HuaweicloudObsConfig;
 use reqsign::HuaweicloudObsCredentialLoader;
 use reqsign::HuaweicloudObsSigner;
@@ -39,61 +38,20 @@ use super::writer::ObsWriters;
 use super::OBS_SCHEME;
 use crate::raw::*;
 use crate::services::ObsConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for ObsConfig {
     type Builder = ObsBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_bucket = map.get("bucket").map(|v| 
!v.is_empty()).unwrap_or(false);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_bucket {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if !host.is_empty() {
-                map.insert("bucket".to_string(), host.to_string());
-                if !map.contains_key("root") {
-                    let trimmed = path.trim_matches('/');
-                    if !trimmed.is_empty() {
-                        map.insert("root".to_string(), trimmed.to_string());
-                    }
-                }
-            }
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        let has_bucket = map.get("bucket").map(|v| 
!v.is_empty()).unwrap_or(false);
-
-        if !has_bucket {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "obs uri requires bucket",
-                ));
-            }
-            let mut segments = trimmed.splitn(2, '/');
-            let bucket = segments.next().unwrap();
-            if bucket.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "obs uri requires bucket",
-                ));
-            }
-            map.insert("bucket".to_string(), bucket.to_string());
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -501,14 +459,17 @@ impl Access for ObsBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "obs://example-bucket/path/to/root".parse().unwrap();
-        let cfg = ObsConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "obs://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = ObsConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket.as_deref(), Some("example-bucket"));
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 140738275..0f167d748 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
@@ -24,7 +23,6 @@ use http::Response;
 use http::StatusCode;
 use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 use reqsign::AliyunConfig;
 use reqsign::AliyunLoader;
 use reqsign::AliyunOssSigner;
@@ -40,63 +38,22 @@ use super::writer::OssWriters;
 use super::OSS_SCHEME;
 use crate::raw::*;
 use crate::services::OssConfig;
+use crate::types::OperatorUri;
 use crate::*;
 const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 
 impl Configurator for OssConfig {
     type Builder = OssBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_bucket = !map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_bucket {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if !host.is_empty() {
-                map.insert("bucket".to_string(), host.to_string());
-                if !map.contains_key("root") {
-                    let trimmed = path.trim_matches('/');
-                    if !trimmed.is_empty() {
-                        map.insert("root".to_string(), trimmed.to_string());
-                    }
-                }
-            }
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        let has_bucket = !map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-
-        if !has_bucket {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "oss uri requires bucket",
-                ));
-            }
-            let mut segments = trimmed.splitn(2, '/');
-            let bucket = segments.next().unwrap();
-            if bucket.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "oss uri requires bucket",
-                ));
-            }
-            map.insert("bucket".to_string(), bucket.to_string());
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -794,14 +751,17 @@ impl Access for OssBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "oss://example-bucket/path/to/root".parse().unwrap();
-        let cfg = OssConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "oss://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = OssConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket, "example-bucket");
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 4f3676866..d590ee0b8 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -30,12 +30,10 @@ use constants::X_AMZ_META_PREFIX;
 use constants::X_AMZ_VERSION_ID;
 use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
 use log::warn;
 use md5::Digest;
 use md5::Md5;
-use percent_encoding::percent_decode_str;
 use reqsign::AwsAssumeRoleLoader;
 use reqsign::AwsConfig;
 use reqsign::AwsCredentialLoad;
@@ -56,6 +54,7 @@ use super::S3_SCHEME;
 use crate::raw::oio::PageLister;
 use crate::raw::*;
 use crate::services::S3Config;
+use crate::types::OperatorUri;
 use crate::*;
 
 /// Allow constructing correct region endpoint if user gives a global endpoint.
@@ -74,25 +73,15 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 impl Configurator for S3Config {
     type Builder = S3Builder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
-
-        let bucket_missing = map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-        if bucket_missing {
-            let bucket = uri
-                .authority()
-                .map(|authority| authority.host())
-                .filter(|host| !host.is_empty())
-                .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri 
requires bucket"))?;
-            map.insert("bucket".to_string(), bucket.to_string());
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
+
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        if !map.contains_key("root") {
-            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-            let trimmed = path.trim_matches('/');
-            if !trimmed.is_empty() {
-                map.insert("root".to_string(), trimmed.to_string());
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -1188,9 +1177,8 @@ impl Access for S3Backend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn test_is_valid_bucket() {
@@ -1296,8 +1284,12 @@ mod tests {
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "s3://example-bucket/path/to/root".parse().unwrap();
-        let cfg = S3Config::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "s3://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = S3Config::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket, "example-bucket");
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
diff --git a/core/src/services/upyun/backend.rs 
b/core/src/services/upyun/backend.rs
index c13f421c4..f9e71c5b4 100644
--- a/core/src/services/upyun/backend.rs
+++ b/core/src/services/upyun/backend.rs
@@ -15,16 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
 use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
-use percent_encoding::percent_decode_str;
 
 use super::core::*;
 use super::delete::UpyunDeleter;
@@ -35,61 +32,20 @@ use super::writer::UpyunWriters;
 use super::UPYUN_SCHEME;
 use crate::raw::*;
 use crate::services::UpyunConfig;
+use crate::types::OperatorUri;
 use crate::*;
 impl Configurator for UpyunConfig {
     type Builder = UpyunBuilder;
 
-    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
-        let mut map = options.clone();
+    fn from_uri(uri: &OperatorUri) -> Result<Self> {
+        let mut map = uri.options().clone();
 
-        let has_bucket = !map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-
-        let path = percent_decode_str(uri.path()).decode_utf8_lossy();
-
-        if has_bucket {
-            if !map.contains_key("root") {
-                let trimmed = path.trim_matches('/');
-                if !trimmed.is_empty() {
-                    map.insert("root".to_string(), trimmed.to_string());
-                }
-            }
-        } else if let Some(authority) = uri.authority() {
-            let host = authority.host();
-            if !host.is_empty() {
-                map.insert("bucket".to_string(), host.to_string());
-                if !map.contains_key("root") {
-                    let trimmed = path.trim_matches('/');
-                    if !trimmed.is_empty() {
-                        map.insert("root".to_string(), trimmed.to_string());
-                    }
-                }
-            }
+        if let Some(name) = uri.name() {
+            map.insert("bucket".to_string(), name.to_string());
         }
 
-        let has_bucket = !map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
-
-        if !has_bucket {
-            let trimmed = path.trim_matches('/');
-            if trimmed.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "upyun uri requires bucket",
-                ));
-            }
-            let mut segments = trimmed.splitn(2, '/');
-            let bucket = segments.next().unwrap();
-            if bucket.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::ConfigInvalid,
-                    "upyun uri requires bucket",
-                ));
-            }
-            map.insert("bucket".to_string(), bucket.to_string());
-            if let Some(rest) = segments.next() {
-                if !rest.is_empty() && !map.contains_key("root") {
-                    map.insert("root".to_string(), rest.to_string());
-                }
-            }
+        if let Some(root) = uri.root() {
+            map.insert("root".to_string(), root.to_string());
         }
 
         Self::from_iter(map)
@@ -378,14 +334,17 @@ impl Access for UpyunBackend {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::types::OperatorUri;
     use crate::Configurator;
-    use http::Uri;
-    use std::collections::HashMap;
 
     #[test]
     fn from_uri_extracts_bucket_and_root() {
-        let uri: Uri = "upyun://example-bucket/path/to/root".parse().unwrap();
-        let cfg = UpyunConfig::from_uri(&uri, &HashMap::new()).unwrap();
+        let uri = OperatorUri::new(
+            "upyun://example-bucket/path/to/root".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+        let cfg = UpyunConfig::from_uri(&uri).unwrap();
         assert_eq!(cfg.bucket, "example-bucket");
         assert_eq!(cfg.root.as_deref(), Some("path/to/root"));
     }
diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs
index b8cc5c960..2ebe36144 100644
--- a/core/src/types/builder.rs
+++ b/core/src/types/builder.rs
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::fmt::Debug;
 
-use http::Uri;
 use serde::de::DeserializeOwned;
 use serde::Serialize;
 
 use crate::raw::*;
+use crate::types::OperatorUri;
 use crate::*;
 
 /// Builder is used to set up underlying services.
@@ -125,8 +124,8 @@ pub trait Configurator: Serialize + DeserializeOwned + 
Debug + 'static {
     /// Associated builder for this configuration.
     type Builder: Builder;
 
-    /// Build configuration from a URI plus merged options.
-    fn from_uri(_uri: &Uri, _options: &HashMap<String, String>) -> 
Result<Self> {
+    /// Build configuration from a parsed URI plus merged options.
+    fn from_uri(_uri: &OperatorUri) -> Result<Self> {
         Err(Error::new(ErrorKind::Unsupported, "uri is not supported"))
     }
 
diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs
index dd8a09ede..f26511073 100644
--- a/core/src/types/mod.rs
+++ b/core/src/types/mod.rs
@@ -44,11 +44,13 @@ pub use execute::*;
 
 mod operator;
 pub use operator::operator_futures;
+pub use operator::IntoOperatorUri;
 pub use operator::Operator;
 pub use operator::OperatorBuilder;
 pub use operator::OperatorFactory;
 pub use operator::OperatorInfo;
 pub use operator::OperatorRegistry;
+pub use operator::OperatorUri;
 pub use operator::DEFAULT_OPERATOR_REGISTRY;
 
 mod builder;
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 29ae15c82..e50e2f32a 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
 
 use crate::layers::*;
 use crate::raw::*;
+use crate::types::IntoOperatorUri;
 use crate::*;
 
 /// # Operator build API
@@ -141,16 +142,13 @@ impl Operator {
     /// use opendal::Operator;
     ///
     /// # fn example() -> Result<()> {
-    /// let op = Operator::from_uri("memory://localhost/", [])?;
+    /// let op = Operator::from_uri("memory://localhost/")?;
     /// # let _ = op;
     /// # Ok(())
     /// # }
     /// ```
-    pub fn from_uri(
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Operator> {
-        crate::DEFAULT_OPERATOR_REGISTRY.load(uri, options)
+    pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Operator> {
+        crate::DEFAULT_OPERATOR_REGISTRY.load(uri)
     }
 
     /// Create a new operator via given scheme and iterator of config value in 
dynamic dispatch.
diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs
index 1c0044c87..87b27f21a 100644
--- a/core/src/types/operator/mod.rs
+++ b/core/src/types/operator/mod.rs
@@ -31,3 +31,6 @@ pub mod operator_futures;
 
 mod registry;
 pub use registry::{OperatorFactory, OperatorRegistry, 
DEFAULT_OPERATOR_REGISTRY};
+
+mod uri;
+pub use uri::{IntoOperatorUri, OperatorUri};
diff --git a/core/src/types/operator/registry.rs 
b/core/src/types/operator/registry.rs
index 22d05c408..006c6d843 100644
--- a/core/src/types/operator/registry.rs
+++ b/core/src/types/operator/registry.rs
@@ -18,15 +18,13 @@
 use std::collections::HashMap;
 use std::sync::{LazyLock, Mutex};
 
-use http::Uri;
-use percent_encoding::percent_decode_str;
-
 use crate::services;
 use crate::types::builder::{Builder, Configurator};
+use crate::types::{IntoOperatorUri, OperatorUri};
 use crate::{Error, ErrorKind, Operator, Result};
 
 /// Factory signature used to construct [`Operator`] from a URI and extra 
options.
-pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+pub type OperatorFactory = fn(&OperatorUri) -> Result<Operator>;
 
 /// Default registry initialized with builtin services.
 pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = 
LazyLock::new(|| {
@@ -60,33 +58,22 @@ impl OperatorRegistry {
     }
 
     /// Load an [`Operator`] via the factory registered for the URI's scheme.
-    pub fn load(
-        &self,
-        uri: &str,
-        options: impl IntoIterator<Item = (String, String)>,
-    ) -> Result<Operator> {
-        let parsed = uri.parse::<Uri>().map_err(|err| {
-            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
-        })?;
-
-        let scheme = parsed
-            .scheme_str()
-            .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is 
required"))?;
+    pub fn load(&self, uri: impl IntoOperatorUri) -> Result<Operator> {
+        let parsed = uri.into_operator_uri()?;
+        let scheme = parsed.scheme();
 
-        let key = scheme.to_ascii_lowercase();
         let factory = self
             .factories
             .lock()
             .expect("operator registry mutex poisoned")
-            .get(key.as_str())
+            .get(scheme)
             .copied()
             .ok_or_else(|| {
                 Error::new(ErrorKind::Unsupported, "scheme is not registered")
-                    .with_context("scheme", scheme)
+                    .with_context("scheme", scheme.to_string())
             })?;
 
-        let opts: Vec<(String, String)> = options.into_iter().collect();
-        factory(uri, opts)
+        factory(&parsed)
     }
 }
 
@@ -114,30 +101,7 @@ fn register_builtin_services(registry: &OperatorRegistry) {
 }
 
 /// Factory adapter that builds an operator from a configurator type.
-fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) -> 
Result<Operator> {
-    let parsed = uri.parse::<Uri>().map_err(|err| {
-        Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
-    })?;
-
-    let mut params = HashMap::new();
-    if let Some(query) = parsed.query() {
-        for pair in query.split('&') {
-            if pair.is_empty() {
-                continue;
-            }
-            let mut parts = pair.splitn(2, '=');
-            let key = parts.next().unwrap_or("");
-            let value = parts.next().unwrap_or("");
-            let key = percent_decode_str(key).decode_utf8_lossy().to_string();
-            let value = 
percent_decode_str(value).decode_utf8_lossy().to_string();
-            params.insert(key.to_ascii_lowercase(), value);
-        }
-    }
-
-    for (key, value) in options {
-        params.insert(key.to_ascii_lowercase(), value);
-    }
-
-    let cfg = C::from_uri(&parsed, &params)?;
+fn factory<C: Configurator>(uri: &OperatorUri) -> Result<Operator> {
+    let cfg = C::from_uri(uri)?;
     Ok(Operator::from_config(cfg)?.finish())
 }
diff --git a/core/src/types/operator/uri.rs b/core/src/types/operator/uri.rs
new file mode 100644
index 000000000..823db7dbe
--- /dev/null
+++ b/core/src/types/operator/uri.rs
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
+use crate::{Error, ErrorKind, Result};
+
+/// Parsed representation of an operator URI with normalized components.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct OperatorUri {
+    scheme: String,
+    name: Option<String>,
+    root: Option<String>,
+    options: HashMap<String, String>,
+}
+
+impl OperatorUri {
+    /// Build [`OperatorUri`] from a [`Uri`] plus additional options.
+    pub fn new(
+        uri: Uri,
+        extra_options: impl IntoIterator<Item = (String, String)>,
+    ) -> Result<Self> {
+        let scheme = uri
+            .scheme_str()
+            .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is 
required"))?
+            .to_ascii_lowercase();
+
+        let mut options = HashMap::new();
+
+        if let Some(query) = uri.query() {
+            for pair in query.split('&') {
+                if pair.is_empty() {
+                    continue;
+                }
+                let mut parts = pair.splitn(2, '=');
+                let key = parts.next().unwrap_or("");
+                let value = parts.next().unwrap_or("");
+                let key = percent_decode_str(key)
+                    .decode_utf8_lossy()
+                    .to_ascii_lowercase();
+                let value = 
percent_decode_str(value).decode_utf8_lossy().to_string();
+                options.insert(key, value);
+            }
+        }
+
+        for (key, value) in extra_options {
+            options.insert(key.to_ascii_lowercase(), value);
+        }
+
+        let name = uri.authority().and_then(|authority| {
+            let host = authority.host();
+            if host.is_empty() {
+                None
+            } else {
+                Some(host.to_string())
+            }
+        });
+
+        let decoded_path = percent_decode_str(uri.path()).decode_utf8_lossy();
+        let trimmed = decoded_path.trim_matches('/');
+        let root = if trimmed.is_empty() {
+            None
+        } else {
+            Some(trimmed.to_string())
+        };
+
+        Ok(Self {
+            scheme,
+            name,
+            root,
+            options,
+        })
+    }
+
+    /// Normalized scheme in lowercase.
+    pub fn scheme(&self) -> &str {
+        self.scheme.as_str()
+    }
+
+    /// Name extracted from the URI authority, if present.
+    pub fn name(&self) -> Option<&str> {
+        self.name.as_deref()
+    }
+
+    /// Root path (without leading slash) extracted from the URI path, if 
present.
+    pub fn root(&self) -> Option<&str> {
+        self.root.as_deref()
+    }
+
+    /// Normalized option map merged from query string and extra options 
(excluding reserved keys).
+    pub fn options(&self) -> &HashMap<String, String> {
+        &self.options
+    }
+}
+
+/// Conversion trait that builds [`OperatorUri`] from various inputs.
+pub trait IntoOperatorUri {
+    /// Convert the input into an [`OperatorUri`].
+    fn into_operator_uri(self) -> Result<OperatorUri>;
+}
+
+impl IntoOperatorUri for OperatorUri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        Ok(self)
+    }
+}
+
+impl IntoOperatorUri for &OperatorUri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        Ok(self.clone())
+    }
+}
+
+impl IntoOperatorUri for Uri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        OperatorUri::new(self, Vec::<(String, String)>::new())
+    }
+}
+
+impl IntoOperatorUri for &Uri {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        OperatorUri::new(self.clone(), Vec::<(String, String)>::new())
+    }
+}
+
+impl IntoOperatorUri for &str {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let uri = self.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+        OperatorUri::new(uri, Vec::<(String, String)>::new())
+    }
+}
+
+impl IntoOperatorUri for String {
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let uri = self.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+        OperatorUri::new(uri, Vec::<(String, String)>::new())
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (Uri, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (uri, extra) = self;
+        let opts = extra
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect::<Vec<_>>();
+        OperatorUri::new(uri, opts)
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (&Uri, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (uri, extra) = self;
+        let opts = extra
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect::<Vec<_>>();
+        OperatorUri::new(uri.clone(), opts)
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (&str, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (base, extra) = self;
+        let uri = base.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+        let opts = extra
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect::<Vec<_>>();
+        OperatorUri::new(uri, opts)
+    }
+}
+
+impl<O, K, V> IntoOperatorUri for (String, O)
+where
+    O: IntoIterator<Item = (K, V)>,
+    K: Into<String>,
+    V: Into<String>,
+{
+    fn into_operator_uri(self) -> Result<OperatorUri> {
+        let (base, extra) = self;
+        (&base[..], extra).into_operator_uri()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::types::IntoOperatorUri;
+
+    #[test]
+    fn parse_uri_with_name_and_root() {
+        let uri = OperatorUri::new(
+            "s3://example-bucket/photos/2024".parse().unwrap(),
+            Vec::<(String, String)>::new(),
+        )
+        .unwrap();
+
+        assert_eq!(uri.scheme(), "s3");
+        assert_eq!(uri.name(), Some("example-bucket"));
+        assert_eq!(uri.root(), Some("photos/2024"));
+        assert!(uri.options().is_empty());
+    }
+
+    #[test]
+    fn into_operator_uri_merges_extra_options() {
+        let uri = (
+            "s3://bucket/path?region=us-east-1",
+            vec![("region", "override"), ("endpoint", "https://custom";)],
+        )
+            .into_operator_uri()
+            .unwrap();
+
+        assert_eq!(uri.scheme(), "s3");
+        assert_eq!(uri.name(), Some("bucket"));
+        assert_eq!(uri.root(), Some("path"));
+        assert_eq!(
+            uri.options().get("region").map(String::as_str),
+            Some("override")
+        );
+        assert_eq!(
+            uri.options().get("endpoint").map(String::as_str),
+            Some("https://custom";)
+        );
+    }
+}

Reply via email to