This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch registry
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/registry by this push:
     new eee563c32 feat: Add Operator Registry
eee563c32 is described below

commit eee563c32346989b59edf121f2a4576046d3bb3f
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 1 01:23:26 2025 +0800

    feat: Add Operator Registry
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/blocking/operator.rs                |   9 ++
 core/src/docs/rfcs/5444_operator_from_uri.md |  30 ++++---
 core/src/services/fs/backend.rs              |  30 ++++++-
 core/src/services/fs/mod.rs                  |   2 +-
 core/src/services/memory/backend.rs          |  20 ++++-
 core/src/services/memory/mod.rs              |   2 +-
 core/src/services/mod.rs                     |   6 ++
 core/src/services/s3/backend.rs              |  30 ++++++-
 core/src/services/s3/mod.rs                  |   2 +-
 core/src/types/builder.rs                    |   7 ++
 core/src/types/mod.rs                        |   3 +
 core/src/types/operator/builder.rs           |  21 +++++
 core/src/types/operator/mod.rs               |   3 +
 core/src/types/operator/registry.rs          | 129 +++++++++++++++++++++++++++
 14 files changed, 271 insertions(+), 23 deletions(-)

diff --git a/core/src/blocking/operator.rs b/core/src/blocking/operator.rs
index 74b4c36d0..0ae55953f 100644
--- a/core/src/blocking/operator.rs
+++ b/core/src/blocking/operator.rs
@@ -135,6 +135,15 @@ impl Operator {
         })
     }
 
+    /// Create a blocking operator from URI based configuration.
+    pub fn from_uri(
+        uri: &str,
+        options: impl IntoIterator<Item = (String, String)>,
+    ) -> Result<Self> {
+        let op = AsyncOperator::from_uri(uri, options)?;
+        Self::new(op)
+    }
+
     /// Get information of underlying accessor.
     ///
     /// # Examples
diff --git a/core/src/docs/rfcs/5444_operator_from_uri.md 
b/core/src/docs/rfcs/5444_operator_from_uri.md
index 036927234..5b5f21c2d 100644
--- a/core/src/docs/rfcs/5444_operator_from_uri.md
+++ b/core/src/docs/rfcs/5444_operator_from_uri.md
@@ -41,19 +41,19 @@ let op = Operator::from_uri("fs:///tmp/test", vec![])?;
 OpenDAL will, by default, register services enabled by features in a global 
`OperatorRegistry`. Users can also create custom operator registries to support 
their own schemes or additional options.
 
 ```rust
-// Using with custom registry
+// Using a custom registry
 let registry = OperatorRegistry::new();
-registry.register("custom", my_factory);
-let op = registry.parse("custom://endpoint", options)?;
 
-// The same service implementation can be registered under multiple schemes
-registry.register("s3", s3_factory);
-registry.register("minio", s3_factory);  // MinIO is S3-compatible
-registry.register("r2", s3_factory);     // Cloudflare R2 is S3-compatible
+// Register builtin builders under desired schemes
+registry.register::<services::S3>(services::S3_SCHEME);
+registry.register::<services::S3>("minio");  // MinIO is S3-compatible
+registry.register::<services::S3>("r2");     // Cloudflare R2 is S3-compatible
 
 // Users can define their own scheme names for internal use
-registry.register("company-storage", s3_factory);
-registry.register("backup-storage", azblob_factory);
+registry.register::<services::S3>("company-storage");
+registry.register::<services::Azblob>("backup-storage");
+
+let op = registry.load("company-storage://bucket/path", [])?;
 ```
 
 # Reference-level explanation
@@ -62,19 +62,21 @@ The implementation consists of three main components:
 
 1. The `OperatorFactory` and `OperatorRegistry`:
 
-`OperatorFactory` is a function type that takes a URI and a map of options and 
returns an `Operator`. `OperatorRegistry` manages operator factories for 
different schemes.
+`OperatorFactory` is a function type that takes a URI string plus options and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
+
+`OperatorFactory` is a function type that takes a URI string plus options and 
returns an `Operator`. `OperatorRegistry` manages factories registered under 
different schemes.
 
 ```rust
-type OperatorFactory = fn(http::Uri, HashMap<String, String>) -> 
Result<Operator>;
+type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
 
 pub struct OperatorRegistry { ... }
 
 impl OperatorRegistry {
-    fn register(&self, scheme: &str, factory: OperatorFactory) {
+    fn register<B: Builder>(&self, scheme: &str) {
         ...
     }
 
-    fn parse(&self, uri: &str, options: impl IntoIterator<Item = (String, 
String)>) -> Result<Operator> {
+    fn load(&self, uri: &str, options: impl IntoIterator<Item = (String, 
String)>) -> Result<Operator> {
         ...
     }
 }
@@ -86,7 +88,7 @@ impl OperatorRegistry {
 
 ```rust
 impl Configurator for S3Config {
-    fn from_uri(uri: &str, options: impl IntoIterator<Item = (String, 
String)>) -> Result<Self> {
+    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
         ...
     }
 }
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 6e2b11a4d..ab0332020 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,23 +15,49 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
 use log::debug;
 
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
 use super::core::*;
 use super::delete::FsDeleter;
 use super::lister::FsLister;
 use super::reader::FsReader;
 use super::writer::FsWriter;
 use super::writer::FsWriters;
-use super::DEFAULT_SCHEME;
+use super::FS_SCHEME;
 use crate::raw::*;
 use crate::services::FsConfig;
 use crate::*;
 impl Configurator for FsConfig {
     type Builder = FsBuilder;
+    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+        let mut map = options.clone();
+
+        if !map.contains_key("root") {
+            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
+            if path.is_empty() || path == "/" {
+                return Err(Error::new(
+                    ErrorKind::ConfigInvalid,
+                    "fs uri requires absolute path",
+                ));
+            }
+            if !path.starts_with('/') {
+                return Err(Error::new(
+                    ErrorKind::ConfigInvalid,
+                    "fs uri root must be absolute",
+                ));
+            }
+            map.insert("root".to_string(), path.to_string());
+        }
+
+        Self::from_iter(map)
+    }
     fn into_builder(self) -> Self::Builder {
         FsBuilder { config: self }
     }
@@ -144,7 +170,7 @@ impl Builder for FsBuilder {
             core: Arc::new(FsCore {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(FS_SCHEME)
                         .set_root(&root.to_string_lossy())
                         .set_native_capability(Capability {
                             stat: true,
diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs
index cc5951c95..7fa64234a 100644
--- a/core/src/services/fs/mod.rs
+++ b/core/src/services/fs/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for fs service.
 #[cfg(feature = "services-fs")]
-pub(super) const DEFAULT_SCHEME: &str = "fs";
+pub const FS_SCHEME: &str = "fs";
 #[cfg(feature = "services-fs")]
 mod core;
 #[cfg(feature = "services-fs")]
diff --git a/core/src/services/memory/backend.rs 
b/core/src/services/memory/backend.rs
index 8c2b4b55f..8c1d0d1cb 100644
--- a/core/src/services/memory/backend.rs
+++ b/core/src/services/memory/backend.rs
@@ -15,20 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
 use super::core::*;
 use super::delete::MemoryDeleter;
 use super::lister::MemoryLister;
 use super::writer::MemoryWriter;
-use super::DEFAULT_SCHEME;
+use super::MEMORY_SCHEME;
 use crate::raw::oio;
 use crate::raw::*;
 use crate::services::MemoryConfig;
 use crate::*;
 impl Configurator for MemoryConfig {
     type Builder = MemoryBuilder;
+    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+        let mut map = options.clone();
+
+        if !map.contains_key("root") {
+            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
+            if !path.is_empty() && path != "/" {
+                map.insert("root".to_string(), 
path.trim_start_matches('/').to_string());
+            }
+        }
+
+        Self::from_iter(map)
+    }
     fn into_builder(self) -> Self::Builder {
         MemoryBuilder { config: self }
     }
@@ -71,7 +87,7 @@ pub struct MemoryAccessor {
 impl MemoryAccessor {
     fn new(core: MemoryCore) -> Self {
         let info = AccessorInfo::default();
-        info.set_scheme(DEFAULT_SCHEME);
+        info.set_scheme(MEMORY_SCHEME);
         info.set_name(&format!("{:p}", Arc::as_ptr(&core.data)));
         info.set_root("/");
         info.set_native_capability(Capability {
diff --git a/core/src/services/memory/mod.rs b/core/src/services/memory/mod.rs
index ef5c3f5eb..a308c8e42 100644
--- a/core/src/services/memory/mod.rs
+++ b/core/src/services/memory/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for memory service.
 #[cfg(feature = "services-memory")]
-pub(super) const DEFAULT_SCHEME: &str = "memory";
+pub const MEMORY_SCHEME: &str = "memory";
 #[cfg(feature = "services-memory")]
 mod backend;
 #[cfg(feature = "services-memory")]
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index a78fd9a17..dc97e0379 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -68,6 +68,8 @@ mod foundationdb;
 pub use self::foundationdb::*;
 
 mod fs;
+#[cfg(feature = "services-fs")]
+pub use fs::FS_SCHEME;
 pub use fs::*;
 
 mod ftp;
@@ -116,6 +118,8 @@ mod memcached;
 pub use memcached::*;
 
 mod memory;
+#[cfg(feature = "services-memory")]
+pub use self::memory::MEMORY_SCHEME;
 pub use self::memory::*;
 
 mod mini_moka;
@@ -161,6 +165,8 @@ mod rocksdb;
 pub use self::rocksdb::*;
 
 mod s3;
+#[cfg(feature = "services-s3")]
+pub use s3::S3_SCHEME;
 pub use s3::*;
 
 mod seafile;
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 292e90690..a55c95493 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -30,10 +30,12 @@ use constants::X_AMZ_META_PREFIX;
 use constants::X_AMZ_VERSION_ID;
 use http::Response;
 use http::StatusCode;
+use http::Uri;
 use log::debug;
 use log::warn;
 use md5::Digest;
 use md5::Md5;
+use percent_encoding::percent_decode_str;
 use reqsign::AwsAssumeRoleLoader;
 use reqsign::AwsConfig;
 use reqsign::AwsCredentialLoad;
@@ -50,7 +52,7 @@ use super::lister::S3Listers;
 use super::lister::S3ObjectVersionsLister;
 use super::writer::S3Writer;
 use super::writer::S3Writers;
-use super::DEFAULT_SCHEME;
+use super::S3_SCHEME;
 use crate::raw::oio::PageLister;
 use crate::raw::*;
 use crate::services::S3Config;
@@ -72,6 +74,30 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
 impl Configurator for S3Config {
     type Builder = S3Builder;
 
+    fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+        let mut map = options.clone();
+
+        let bucket_missing = map.get("bucket").map(|v| 
v.is_empty()).unwrap_or(true);
+        if bucket_missing {
+            let bucket = uri
+                .authority()
+                .map(|authority| authority.host())
+                .filter(|host| !host.is_empty())
+                .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri 
requires bucket"))?;
+            map.insert("bucket".to_string(), bucket.to_string());
+        }
+
+        if !map.contains_key("root") {
+            let path = percent_decode_str(uri.path()).decode_utf8_lossy();
+            let trimmed = path.trim_matches('/');
+            if !trimmed.is_empty() {
+                map.insert("root".to_string(), trimmed.to_string());
+            }
+        }
+
+        Self::from_iter(map)
+    }
+
     #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         S3Builder {
@@ -904,7 +930,7 @@ impl Builder for S3Builder {
             core: Arc::new(S3Core {
                 info: {
                     let am = AccessorInfo::default();
-                    am.set_scheme(DEFAULT_SCHEME)
+                    am.set_scheme(S3_SCHEME)
                         .set_root(&root)
                         .set_name(bucket)
                         .set_native_capability(Capability {
diff --git a/core/src/services/s3/mod.rs b/core/src/services/s3/mod.rs
index 44f4a3982..ab1c0810e 100644
--- a/core/src/services/s3/mod.rs
+++ b/core/src/services/s3/mod.rs
@@ -17,7 +17,7 @@
 
 /// Default scheme for s3 service.
 #[cfg(feature = "services-s3")]
-pub(super) const DEFAULT_SCHEME: &str = "s3";
+pub const S3_SCHEME: &str = "s3";
 #[cfg(feature = "services-s3")]
 mod core;
 #[cfg(feature = "services-s3")]
diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs
index 3490913cb..b8cc5c960 100644
--- a/core/src/types/builder.rs
+++ b/core/src/types/builder.rs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::fmt::Debug;
 
+use http::Uri;
 use serde::de::DeserializeOwned;
 use serde::Serialize;
 
@@ -123,6 +125,11 @@ pub trait Configurator: Serialize + DeserializeOwned + 
Debug + 'static {
     /// Associated builder for this configuration.
     type Builder: Builder;
 
+    /// Build configuration from a URI plus merged options.
+    fn from_uri(_uri: &Uri, _options: &HashMap<String, String>) -> 
Result<Self> {
+        Err(Error::new(ErrorKind::Unsupported, "uri is not supported"))
+    }
+
     /// Deserialize from an iterator.
     ///
     /// This API is provided by opendal, developer should not implement it.
diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs
index 9e0733792..dd8a09ede 100644
--- a/core/src/types/mod.rs
+++ b/core/src/types/mod.rs
@@ -46,7 +46,10 @@ mod operator;
 pub use operator::operator_futures;
 pub use operator::Operator;
 pub use operator::OperatorBuilder;
+pub use operator::OperatorFactory;
 pub use operator::OperatorInfo;
+pub use operator::OperatorRegistry;
+pub use operator::DEFAULT_OPERATOR_REGISTRY;
 
 mod builder;
 pub use builder::Builder;
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 7cbc273a3..29ae15c82 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -132,6 +132,27 @@ impl Operator {
         Ok(OperatorBuilder::new(acc))
     }
 
+    /// Create a new operator by parsing configuration from a URI.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # use anyhow::Result;
+    /// use opendal::Operator;
+    ///
+    /// # fn example() -> Result<()> {
+    /// let op = Operator::from_uri("memory://localhost/", [])?;
+    /// # let _ = op;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn from_uri(
+        uri: &str,
+        options: impl IntoIterator<Item = (String, String)>,
+    ) -> Result<Operator> {
+        crate::DEFAULT_OPERATOR_REGISTRY.load(uri, options)
+    }
+
     /// Create a new operator via given scheme and iterator of config value in 
dynamic dispatch.
     ///
     /// # Notes
diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs
index 7ad917f72..1c0044c87 100644
--- a/core/src/types/operator/mod.rs
+++ b/core/src/types/operator/mod.rs
@@ -28,3 +28,6 @@ mod info;
 pub use info::OperatorInfo;
 
 pub mod operator_futures;
+
+mod registry;
+pub use registry::{OperatorFactory, OperatorRegistry, 
DEFAULT_OPERATOR_REGISTRY};
diff --git a/core/src/types/operator/registry.rs 
b/core/src/types/operator/registry.rs
new file mode 100644
index 000000000..7f11c003f
--- /dev/null
+++ b/core/src/types/operator/registry.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{LazyLock, Mutex};
+
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
+use crate::services;
+use crate::types::builder::{Builder, Configurator};
+use crate::{Error, ErrorKind, Operator, Result};
+
+/// Factory signature used to construct [`Operator`] from a URI and extra 
options.
+pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+
+/// Default registry initialized with builtin services.
+pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> = 
LazyLock::new(|| {
+    let registry = OperatorRegistry::new();
+    register_builtin_services(&registry);
+    registry
+});
+
+/// Global registry that maps schemes to [`OperatorFactory`] functions.
+#[derive(Debug, Default)]
+pub struct OperatorRegistry {
+    factories: Mutex<HashMap<String, OperatorFactory>>,
+}
+
+impl OperatorRegistry {
+    /// Create a new, empty registry.
+    pub fn new() -> Self {
+        Self {
+            factories: Mutex::new(HashMap::new()),
+        }
+    }
+
+    /// Register a builder for the given scheme.
+    pub fn register<B: Builder>(&self, scheme: &str) {
+        let key = scheme.to_ascii_lowercase();
+        let mut guard = self
+            .factories
+            .lock()
+            .expect("operator registry mutex poisoned");
+        guard.insert(key, factory::<B::Config>);
+    }
+
+    /// Load an [`Operator`] via the factory registered for the URI's scheme.
+    pub fn load(
+        &self,
+        uri: &str,
+        options: impl IntoIterator<Item = (String, String)>,
+    ) -> Result<Operator> {
+        let parsed = uri.parse::<Uri>().map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+        })?;
+
+        let scheme = parsed
+            .scheme_str()
+            .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is 
required"))?;
+
+        let key = scheme.to_ascii_lowercase();
+        let factory = self
+            .factories
+            .lock()
+            .expect("operator registry mutex poisoned")
+            .get(key.as_str())
+            .copied()
+            .ok_or_else(|| {
+                Error::new(ErrorKind::Unsupported, "scheme is not registered")
+                    .with_context("scheme", scheme)
+            })?;
+
+        let opts: Vec<(String, String)> = options.into_iter().collect();
+        factory(uri, opts)
+    }
+}
+
+fn register_builtin_services(registry: &OperatorRegistry) {
+    #[cfg(feature = "services-memory")]
+    registry.register::<services::Memory>(services::MEMORY_SCHEME);
+    #[cfg(feature = "services-fs")]
+    registry.register::<services::Fs>(services::FS_SCHEME);
+    #[cfg(feature = "services-s3")]
+    registry.register::<services::S3>(services::S3_SCHEME);
+}
+
+/// Factory adapter that builds an operator from a configurator type.
+fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) -> 
Result<Operator> {
+    let parsed = uri.parse::<Uri>().map_err(|err| {
+        Error::new(ErrorKind::ConfigInvalid, "failed to parse 
uri").set_source(err)
+    })?;
+
+    let mut params = HashMap::new();
+    if let Some(query) = parsed.query() {
+        for pair in query.split('&') {
+            if pair.is_empty() {
+                continue;
+            }
+            let mut parts = pair.splitn(2, '=');
+            let key = parts.next().unwrap_or("");
+            let value = parts.next().unwrap_or("");
+            let key = percent_decode_str(key).decode_utf8_lossy().to_string();
+            let value = 
percent_decode_str(value).decode_utf8_lossy().to_string();
+            params.insert(key.to_ascii_lowercase(), value);
+        }
+    }
+
+    for (key, value) in options {
+        params.insert(key.to_ascii_lowercase(), value);
+    }
+
+    let cfg = C::from_uri(&parsed, &params)?;
+    Ok(Operator::from_config(cfg)?.finish())
+}

Reply via email to