This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch registry
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/registry by this push:
new eee563c32 feat: Add Operator Registry
eee563c32 is described below
commit eee563c32346989b59edf121f2a4576046d3bb3f
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 1 01:23:26 2025 +0800
feat: Add Operator Registry
Signed-off-by: Xuanwo <[email protected]>
---
core/src/blocking/operator.rs | 9 ++
core/src/docs/rfcs/5444_operator_from_uri.md | 30 ++++---
core/src/services/fs/backend.rs | 30 ++++++-
core/src/services/fs/mod.rs | 2 +-
core/src/services/memory/backend.rs | 20 ++++-
core/src/services/memory/mod.rs | 2 +-
core/src/services/mod.rs | 6 ++
core/src/services/s3/backend.rs | 30 ++++++-
core/src/services/s3/mod.rs | 2 +-
core/src/types/builder.rs | 7 ++
core/src/types/mod.rs | 3 +
core/src/types/operator/builder.rs | 21 +++++
core/src/types/operator/mod.rs | 3 +
core/src/types/operator/registry.rs | 129 +++++++++++++++++++++++++++
14 files changed, 271 insertions(+), 23 deletions(-)
diff --git a/core/src/blocking/operator.rs b/core/src/blocking/operator.rs
index 74b4c36d0..0ae55953f 100644
--- a/core/src/blocking/operator.rs
+++ b/core/src/blocking/operator.rs
@@ -135,6 +135,15 @@ impl Operator {
})
}
+ /// Create a blocking operator from URI based configuration.
+ pub fn from_uri(
+ uri: &str,
+ options: impl IntoIterator<Item = (String, String)>,
+ ) -> Result<Self> {
+ let op = AsyncOperator::from_uri(uri, options)?;
+ Self::new(op)
+ }
+
/// Get information of underlying accessor.
///
/// # Examples
diff --git a/core/src/docs/rfcs/5444_operator_from_uri.md
b/core/src/docs/rfcs/5444_operator_from_uri.md
index 036927234..5b5f21c2d 100644
--- a/core/src/docs/rfcs/5444_operator_from_uri.md
+++ b/core/src/docs/rfcs/5444_operator_from_uri.md
@@ -41,19 +41,19 @@ let op = Operator::from_uri("fs:///tmp/test", vec![])?;
OpenDAL will, by default, register services enabled by features in a global
`OperatorRegistry`. Users can also create custom operator registries to support
their own schemes or additional options.
```rust
-// Using with custom registry
+// Using a custom registry
let registry = OperatorRegistry::new();
-registry.register("custom", my_factory);
-let op = registry.parse("custom://endpoint", options)?;
-// The same service implementation can be registered under multiple schemes
-registry.register("s3", s3_factory);
-registry.register("minio", s3_factory); // MinIO is S3-compatible
-registry.register("r2", s3_factory); // Cloudflare R2 is S3-compatible
+// Register builtin builders under desired schemes
+registry.register::<services::S3>(services::S3_SCHEME);
+registry.register::<services::S3>("minio"); // MinIO is S3-compatible
+registry.register::<services::S3>("r2"); // Cloudflare R2 is S3-compatible
// Users can define their own scheme names for internal use
-registry.register("company-storage", s3_factory);
-registry.register("backup-storage", azblob_factory);
+registry.register::<services::S3>("company-storage");
+registry.register::<services::Azblob>("backup-storage");
+
+let op = registry.load("company-storage://bucket/path", [])?;
```
# Reference-level explanation
@@ -62,19 +62,21 @@ The implementation consists of three main components:
1. The `OperatorFactory` and `OperatorRegistry`:
-`OperatorFactory` is a function type that takes a URI and a map of options and
returns an `Operator`. `OperatorRegistry` manages operator factories for
different schemes.
+`OperatorFactory` is a function type that takes a URI string plus options and
returns an `Operator`. `OperatorRegistry` manages factories registered under
different schemes.
+
+`OperatorFactory` is a function type that takes a URI string plus options and
returns an `Operator`. `OperatorRegistry` manages factories registered under
different schemes.
```rust
-type OperatorFactory = fn(http::Uri, HashMap<String, String>) ->
Result<Operator>;
+type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
pub struct OperatorRegistry { ... }
impl OperatorRegistry {
- fn register(&self, scheme: &str, factory: OperatorFactory) {
+ fn register<B: Builder>(&self, scheme: &str) {
...
}
- fn parse(&self, uri: &str, options: impl IntoIterator<Item = (String,
String)>) -> Result<Operator> {
+ fn load(&self, uri: &str, options: impl IntoIterator<Item = (String,
String)>) -> Result<Operator> {
...
}
}
@@ -86,7 +88,7 @@ impl OperatorRegistry {
```rust
impl Configurator for S3Config {
- fn from_uri(uri: &str, options: impl IntoIterator<Item = (String,
String)>) -> Result<Self> {
+ fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
...
}
}
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 6e2b11a4d..ab0332020 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,23 +15,49 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use log::debug;
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
use super::core::*;
use super::delete::FsDeleter;
use super::lister::FsLister;
use super::reader::FsReader;
use super::writer::FsWriter;
use super::writer::FsWriters;
-use super::DEFAULT_SCHEME;
+use super::FS_SCHEME;
use crate::raw::*;
use crate::services::FsConfig;
use crate::*;
impl Configurator for FsConfig {
type Builder = FsBuilder;
+ fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+ let mut map = options.clone();
+
+ if !map.contains_key("root") {
+ let path = percent_decode_str(uri.path()).decode_utf8_lossy();
+ if path.is_empty() || path == "/" {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "fs uri requires absolute path",
+ ));
+ }
+ if !path.starts_with('/') {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "fs uri root must be absolute",
+ ));
+ }
+ map.insert("root".to_string(), path.to_string());
+ }
+
+ Self::from_iter(map)
+ }
fn into_builder(self) -> Self::Builder {
FsBuilder { config: self }
}
@@ -144,7 +170,7 @@ impl Builder for FsBuilder {
core: Arc::new(FsCore {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(FS_SCHEME)
.set_root(&root.to_string_lossy())
.set_native_capability(Capability {
stat: true,
diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs
index cc5951c95..7fa64234a 100644
--- a/core/src/services/fs/mod.rs
+++ b/core/src/services/fs/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for fs service.
#[cfg(feature = "services-fs")]
-pub(super) const DEFAULT_SCHEME: &str = "fs";
+pub const FS_SCHEME: &str = "fs";
#[cfg(feature = "services-fs")]
mod core;
#[cfg(feature = "services-fs")]
diff --git a/core/src/services/memory/backend.rs
b/core/src/services/memory/backend.rs
index 8c2b4b55f..8c1d0d1cb 100644
--- a/core/src/services/memory/backend.rs
+++ b/core/src/services/memory/backend.rs
@@ -15,20 +15,36 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
use super::core::*;
use super::delete::MemoryDeleter;
use super::lister::MemoryLister;
use super::writer::MemoryWriter;
-use super::DEFAULT_SCHEME;
+use super::MEMORY_SCHEME;
use crate::raw::oio;
use crate::raw::*;
use crate::services::MemoryConfig;
use crate::*;
impl Configurator for MemoryConfig {
type Builder = MemoryBuilder;
+ fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+ let mut map = options.clone();
+
+ if !map.contains_key("root") {
+ let path = percent_decode_str(uri.path()).decode_utf8_lossy();
+ if !path.is_empty() && path != "/" {
+ map.insert("root".to_string(),
path.trim_start_matches('/').to_string());
+ }
+ }
+
+ Self::from_iter(map)
+ }
fn into_builder(self) -> Self::Builder {
MemoryBuilder { config: self }
}
@@ -71,7 +87,7 @@ pub struct MemoryAccessor {
impl MemoryAccessor {
fn new(core: MemoryCore) -> Self {
let info = AccessorInfo::default();
- info.set_scheme(DEFAULT_SCHEME);
+ info.set_scheme(MEMORY_SCHEME);
info.set_name(&format!("{:p}", Arc::as_ptr(&core.data)));
info.set_root("/");
info.set_native_capability(Capability {
diff --git a/core/src/services/memory/mod.rs b/core/src/services/memory/mod.rs
index ef5c3f5eb..a308c8e42 100644
--- a/core/src/services/memory/mod.rs
+++ b/core/src/services/memory/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for memory service.
#[cfg(feature = "services-memory")]
-pub(super) const DEFAULT_SCHEME: &str = "memory";
+pub const MEMORY_SCHEME: &str = "memory";
#[cfg(feature = "services-memory")]
mod backend;
#[cfg(feature = "services-memory")]
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index a78fd9a17..dc97e0379 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -68,6 +68,8 @@ mod foundationdb;
pub use self::foundationdb::*;
mod fs;
+#[cfg(feature = "services-fs")]
+pub use fs::FS_SCHEME;
pub use fs::*;
mod ftp;
@@ -116,6 +118,8 @@ mod memcached;
pub use memcached::*;
mod memory;
+#[cfg(feature = "services-memory")]
+pub use self::memory::MEMORY_SCHEME;
pub use self::memory::*;
mod mini_moka;
@@ -161,6 +165,8 @@ mod rocksdb;
pub use self::rocksdb::*;
mod s3;
+#[cfg(feature = "services-s3")]
+pub use s3::S3_SCHEME;
pub use s3::*;
mod seafile;
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 292e90690..a55c95493 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -30,10 +30,12 @@ use constants::X_AMZ_META_PREFIX;
use constants::X_AMZ_VERSION_ID;
use http::Response;
use http::StatusCode;
+use http::Uri;
use log::debug;
use log::warn;
use md5::Digest;
use md5::Md5;
+use percent_encoding::percent_decode_str;
use reqsign::AwsAssumeRoleLoader;
use reqsign::AwsConfig;
use reqsign::AwsCredentialLoad;
@@ -50,7 +52,7 @@ use super::lister::S3Listers;
use super::lister::S3ObjectVersionsLister;
use super::writer::S3Writer;
use super::writer::S3Writers;
-use super::DEFAULT_SCHEME;
+use super::S3_SCHEME;
use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::S3Config;
@@ -72,6 +74,30 @@ const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
impl Configurator for S3Config {
type Builder = S3Builder;
+ fn from_uri(uri: &Uri, options: &HashMap<String, String>) -> Result<Self> {
+ let mut map = options.clone();
+
+ let bucket_missing = map.get("bucket").map(|v|
v.is_empty()).unwrap_or(true);
+ if bucket_missing {
+ let bucket = uri
+ .authority()
+ .map(|authority| authority.host())
+ .filter(|host| !host.is_empty())
+ .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "s3 uri
requires bucket"))?;
+ map.insert("bucket".to_string(), bucket.to_string());
+ }
+
+ if !map.contains_key("root") {
+ let path = percent_decode_str(uri.path()).decode_utf8_lossy();
+ let trimmed = path.trim_matches('/');
+ if !trimmed.is_empty() {
+ map.insert("root".to_string(), trimmed.to_string());
+ }
+ }
+
+ Self::from_iter(map)
+ }
+
#[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
S3Builder {
@@ -904,7 +930,7 @@ impl Builder for S3Builder {
core: Arc::new(S3Core {
info: {
let am = AccessorInfo::default();
- am.set_scheme(DEFAULT_SCHEME)
+ am.set_scheme(S3_SCHEME)
.set_root(&root)
.set_name(bucket)
.set_native_capability(Capability {
diff --git a/core/src/services/s3/mod.rs b/core/src/services/s3/mod.rs
index 44f4a3982..ab1c0810e 100644
--- a/core/src/services/s3/mod.rs
+++ b/core/src/services/s3/mod.rs
@@ -17,7 +17,7 @@
/// Default scheme for s3 service.
#[cfg(feature = "services-s3")]
-pub(super) const DEFAULT_SCHEME: &str = "s3";
+pub const S3_SCHEME: &str = "s3";
#[cfg(feature = "services-s3")]
mod core;
#[cfg(feature = "services-s3")]
diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs
index 3490913cb..b8cc5c960 100644
--- a/core/src/types/builder.rs
+++ b/core/src/types/builder.rs
@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::fmt::Debug;
+use http::Uri;
use serde::de::DeserializeOwned;
use serde::Serialize;
@@ -123,6 +125,11 @@ pub trait Configurator: Serialize + DeserializeOwned +
Debug + 'static {
/// Associated builder for this configuration.
type Builder: Builder;
+ /// Build configuration from a URI plus merged options.
+ fn from_uri(_uri: &Uri, _options: &HashMap<String, String>) ->
Result<Self> {
+ Err(Error::new(ErrorKind::Unsupported, "uri is not supported"))
+ }
+
/// Deserialize from an iterator.
///
/// This API is provided by opendal, developer should not implement it.
diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs
index 9e0733792..dd8a09ede 100644
--- a/core/src/types/mod.rs
+++ b/core/src/types/mod.rs
@@ -46,7 +46,10 @@ mod operator;
pub use operator::operator_futures;
pub use operator::Operator;
pub use operator::OperatorBuilder;
+pub use operator::OperatorFactory;
pub use operator::OperatorInfo;
+pub use operator::OperatorRegistry;
+pub use operator::DEFAULT_OPERATOR_REGISTRY;
mod builder;
pub use builder::Builder;
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 7cbc273a3..29ae15c82 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -132,6 +132,27 @@ impl Operator {
Ok(OperatorBuilder::new(acc))
}
+ /// Create a new operator by parsing configuration from a URI.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use anyhow::Result;
+ /// use opendal::Operator;
+ ///
+ /// # fn example() -> Result<()> {
+ /// let op = Operator::from_uri("memory://localhost/", [])?;
+ /// # let _ = op;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn from_uri(
+ uri: &str,
+ options: impl IntoIterator<Item = (String, String)>,
+ ) -> Result<Operator> {
+ crate::DEFAULT_OPERATOR_REGISTRY.load(uri, options)
+ }
+
/// Create a new operator via given scheme and iterator of config value in
dynamic dispatch.
///
/// # Notes
diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs
index 7ad917f72..1c0044c87 100644
--- a/core/src/types/operator/mod.rs
+++ b/core/src/types/operator/mod.rs
@@ -28,3 +28,6 @@ mod info;
pub use info::OperatorInfo;
pub mod operator_futures;
+
+mod registry;
+pub use registry::{OperatorFactory, OperatorRegistry,
DEFAULT_OPERATOR_REGISTRY};
diff --git a/core/src/types/operator/registry.rs
b/core/src/types/operator/registry.rs
new file mode 100644
index 000000000..7f11c003f
--- /dev/null
+++ b/core/src/types/operator/registry.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{LazyLock, Mutex};
+
+use http::Uri;
+use percent_encoding::percent_decode_str;
+
+use crate::services;
+use crate::types::builder::{Builder, Configurator};
+use crate::{Error, ErrorKind, Operator, Result};
+
+/// Factory signature used to construct [`Operator`] from a URI and extra
options.
+pub type OperatorFactory = fn(&str, Vec<(String, String)>) -> Result<Operator>;
+
+/// Default registry initialized with builtin services.
+pub static DEFAULT_OPERATOR_REGISTRY: LazyLock<OperatorRegistry> =
LazyLock::new(|| {
+ let registry = OperatorRegistry::new();
+ register_builtin_services(®istry);
+ registry
+});
+
+/// Global registry that maps schemes to [`OperatorFactory`] functions.
+#[derive(Debug, Default)]
+pub struct OperatorRegistry {
+ factories: Mutex<HashMap<String, OperatorFactory>>,
+}
+
+impl OperatorRegistry {
+ /// Create a new, empty registry.
+ pub fn new() -> Self {
+ Self {
+ factories: Mutex::new(HashMap::new()),
+ }
+ }
+
+ /// Register a builder for the given scheme.
+ pub fn register<B: Builder>(&self, scheme: &str) {
+ let key = scheme.to_ascii_lowercase();
+ let mut guard = self
+ .factories
+ .lock()
+ .expect("operator registry mutex poisoned");
+ guard.insert(key, factory::<B::Config>);
+ }
+
+ /// Load an [`Operator`] via the factory registered for the URI's scheme.
+ pub fn load(
+ &self,
+ uri: &str,
+ options: impl IntoIterator<Item = (String, String)>,
+ ) -> Result<Operator> {
+ let parsed = uri.parse::<Uri>().map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
+ })?;
+
+ let scheme = parsed
+ .scheme_str()
+ .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid, "uri scheme is
required"))?;
+
+ let key = scheme.to_ascii_lowercase();
+ let factory = self
+ .factories
+ .lock()
+ .expect("operator registry mutex poisoned")
+ .get(key.as_str())
+ .copied()
+ .ok_or_else(|| {
+ Error::new(ErrorKind::Unsupported, "scheme is not registered")
+ .with_context("scheme", scheme)
+ })?;
+
+ let opts: Vec<(String, String)> = options.into_iter().collect();
+ factory(uri, opts)
+ }
+}
+
+fn register_builtin_services(registry: &OperatorRegistry) {
+ #[cfg(feature = "services-memory")]
+ registry.register::<services::Memory>(services::MEMORY_SCHEME);
+ #[cfg(feature = "services-fs")]
+ registry.register::<services::Fs>(services::FS_SCHEME);
+ #[cfg(feature = "services-s3")]
+ registry.register::<services::S3>(services::S3_SCHEME);
+}
+
+/// Factory adapter that builds an operator from a configurator type.
+fn factory<C: Configurator>(uri: &str, options: Vec<(String, String)>) ->
Result<Operator> {
+ let parsed = uri.parse::<Uri>().map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "failed to parse
uri").set_source(err)
+ })?;
+
+ let mut params = HashMap::new();
+ if let Some(query) = parsed.query() {
+ for pair in query.split('&') {
+ if pair.is_empty() {
+ continue;
+ }
+ let mut parts = pair.splitn(2, '=');
+ let key = parts.next().unwrap_or("");
+ let value = parts.next().unwrap_or("");
+ let key = percent_decode_str(key).decode_utf8_lossy().to_string();
+ let value =
percent_decode_str(value).decode_utf8_lossy().to_string();
+ params.insert(key.to_ascii_lowercase(), value);
+ }
+ }
+
+ for (key, value) in options {
+ params.insert(key.to_ascii_lowercase(), value);
+ }
+
+ let cfg = C::from_uri(&parsed, ¶ms)?;
+ Ok(Operator::from_config(cfg)?.finish())
+}