This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new f94e1f3b8 refactor: migrate memcached service from adapter::kv to impl 
Access directly (#6714)
f94e1f3b8 is described below

commit f94e1f3b8eb080473b4d730adc9245d18c1d2cce
Author: Qinxuan Chen <[email protected]>
AuthorDate: Tue Oct 21 21:38:23 2025 +0800

    refactor: migrate memcached service from adapter::kv to impl Access 
directly (#6714)
    
    * refactor: migrate memcached service from adapter::kv to impl Access 
directly
    
    * adjust some imports
---
 core/src/services/memcached/backend.rs             | 184 +++++++++------------
 core/src/services/memcached/core.rs                | 132 +++++++++++++++
 core/src/services/memcached/{mod.rs => deleter.rs} |  28 +++-
 core/src/services/memcached/docs.md                |   5 +-
 core/src/services/memcached/mod.rs                 |   5 +-
 core/src/services/memcached/writer.rs              |  59 +++++++
 6 files changed, 294 insertions(+), 119 deletions(-)

diff --git a/core/src/services/memcached/backend.rs 
b/core/src/services/memcached/backend.rs
index 5cdde5a0d..7c7260084 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -15,16 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
 use std::time::Duration;
 
-use bb8::RunError;
-use tokio::net::TcpStream;
 use tokio::sync::OnceCell;
 
-use super::binary;
-use crate::raw::adapters::kv;
+use super::config::MemcachedConfig;
+use super::core::*;
+use super::deleter::MemcachedDeleter;
+use super::writer::MemcachedWriter;
 use crate::raw::*;
-use crate::services::MemcachedConfig;
 use crate::*;
 
 /// [Memcached](https://memcached.org/) service support.
@@ -138,11 +138,11 @@ impl Builder for MemcachedBuilder {
         );
 
         let conn = OnceCell::new();
-        Ok(MemcachedBackend::new(Adapter {
+        Ok(MemcachedBackend::new(MemcachedCore {
+            conn,
             endpoint,
             username: self.config.username.clone(),
             password: self.config.password.clone(),
-            conn,
             default_ttl: self.config.default_ttl,
         })
         .with_normalized_root(root))
@@ -150,128 +150,92 @@ impl Builder for MemcachedBuilder {
 }
 
 /// Backend for memcached services.
-pub type MemcachedBackend = kv::Backend<Adapter>;
-
 #[derive(Clone, Debug)]
-pub struct Adapter {
-    endpoint: String,
-    username: Option<String>,
-    password: Option<String>,
-    default_ttl: Option<Duration>,
-    conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
+pub struct MemcachedBackend {
+    core: Arc<MemcachedCore>,
+    root: String,
+    info: Arc<AccessorInfo>,
 }
 
-impl Adapter {
-    async fn conn(&self) -> Result<bb8::PooledConnection<'_, 
MemcacheConnectionManager>> {
-        let pool = self
-            .conn
-            .get_or_try_init(|| async {
-                let mgr = MemcacheConnectionManager::new(
-                    &self.endpoint,
-                    self.username.clone(),
-                    self.password.clone(),
-                );
-
-                bb8::Pool::builder().build(mgr).await.map_err(|err| {
-                    Error::new(ErrorKind::ConfigInvalid, "connect to 
memecached failed")
-                        .set_source(err)
-                })
-            })
-            .await?;
+impl MemcachedBackend {
+    pub fn new(core: MemcachedCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Memcached.into_static());
+        info.set_name("memcached");
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            stat: true,
+            write: true,
+            write_can_empty: true,
+            delete: true,
+            shared: true,
+            ..Default::default()
+        });
 
-        pool.get().await.map_err(|err| match err {
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
-            }
-            RunError::User(err) => err,
-        })
-    }
-}
-
-impl kv::Adapter for Adapter {
-    type Scanner = ();
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Memcached,
-            "memcached",
-            Capability {
-                read: true,
-                write: true,
-                shared: true,
-
-                ..Default::default()
-            },
-        )
-    }
-
-    async fn get(&self, key: &str) -> Result<Option<Buffer>> {
-        let mut conn = self.conn().await?;
-        let result = conn.get(&percent_encode_path(key)).await?;
-        Ok(result.map(Buffer::from))
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
+        }
     }
 
-    async fn set(&self, key: &str, value: Buffer) -> Result<()> {
-        let mut conn = self.conn().await?;
-
-        conn.set(
-            &percent_encode_path(key),
-            &value.to_vec(),
-            // Set expiration to 0 if ttl not set.
-            self.default_ttl
-                .map(|v| v.as_secs() as u32)
-                .unwrap_or_default(),
-        )
-        .await
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
     }
+}
 
-    async fn delete(&self, key: &str) -> Result<()> {
-        let mut conn = self.conn().await?;
+impl Access for MemcachedBackend {
+    type Reader = Buffer;
+    type Writer = MemcachedWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
 
-        conn.delete(&percent_encode_path(key)).await
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
-}
 
-/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
-#[derive(Clone, Debug)]
-struct MemcacheConnectionManager {
-    address: String,
-    username: Option<String>,
-    password: Option<String>,
-}
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
 
-impl MemcacheConnectionManager {
-    fn new(address: &str, username: Option<String>, password: Option<String>) 
-> Self {
-        Self {
-            address: address.to_string(),
-            username,
-            password,
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p).await?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
memcached")),
+            }
         }
     }
-}
-
-impl bb8::ManageConnection for MemcacheConnectionManager {
-    type Connection = binary::Connection;
-    type Error = Error;
 
-    /// TODO: Implement unix stream support.
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
-        let conn = TcpStream::connect(&self.address)
-            .await
-            .map_err(new_std_io_error)?;
-        let mut conn = binary::Connection::new(conn);
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p).await? {
+            Some(bs) => bs,
+            None => return Err(Error::new(ErrorKind::NotFound, "kv not found 
in memcached")),
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
+    }
 
-        if let (Some(username), Some(password)) = (self.username.as_ref(), 
self.password.as_ref()) {
-            conn.auth(username, password).await?;
-        }
-        Ok(conn)
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        conn.version().await.map(|_| ())
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(), 
self.root.clone())),
+        ))
     }
 
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let _ = build_abs_path(&self.root, path);
+        Ok((RpList::default(), ()))
     }
 }
diff --git a/core/src/services/memcached/core.rs 
b/core/src/services/memcached/core.rs
new file mode 100644
index 000000000..82c21584d
--- /dev/null
+++ b/core/src/services/memcached/core.rs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::time::Duration;
+
+use bb8::RunError;
+use tokio::net::TcpStream;
+use tokio::sync::OnceCell;
+
+use super::binary;
+use crate::raw::*;
+use crate::*;
+
+/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
+#[derive(Clone, Debug)]
+pub struct MemcacheConnectionManager {
+    address: String,
+    username: Option<String>,
+    password: Option<String>,
+}
+
+impl MemcacheConnectionManager {
+    fn new(address: &str, username: Option<String>, password: Option<String>) 
-> Self {
+        Self {
+            address: address.to_string(),
+            username,
+            password,
+        }
+    }
+}
+
+impl bb8::ManageConnection for MemcacheConnectionManager {
+    type Connection = binary::Connection;
+    type Error = Error;
+
+    /// TODO: Implement unix stream support.
+    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+        let conn = TcpStream::connect(&self.address)
+            .await
+            .map_err(new_std_io_error)?;
+        let mut conn = binary::Connection::new(conn);
+
+        if let (Some(username), Some(password)) = (self.username.as_ref(), 
self.password.as_ref()) {
+            conn.auth(username, password).await?;
+        }
+        Ok(conn)
+    }
+
+    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
+        conn.version().await.map(|_| ())
+    }
+
+    fn has_broken(&self, _: &mut Self::Connection) -> bool {
+        false
+    }
+}
+
+#[derive(Clone, Debug)]
+pub struct MemcachedCore {
+    pub conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
+    pub endpoint: String,
+    pub username: Option<String>,
+    pub password: Option<String>,
+    pub default_ttl: Option<Duration>,
+}
+
+impl MemcachedCore {
+    async fn conn(&self) -> Result<bb8::PooledConnection<'_, 
MemcacheConnectionManager>> {
+        let pool = self
+            .conn
+            .get_or_try_init(|| async {
+                let mgr = MemcacheConnectionManager::new(
+                    &self.endpoint,
+                    self.username.clone(),
+                    self.password.clone(),
+                );
+
+                bb8::Pool::builder().build(mgr).await.map_err(|err| {
+                    Error::new(ErrorKind::ConfigInvalid, "connect to 
memecached failed")
+                        .set_source(err)
+                })
+            })
+            .await?;
+
+        pool.get().await.map_err(|err| match err {
+            RunError::TimedOut => {
+                Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
+            }
+            RunError::User(err) => err,
+        })
+    }
+
+    pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
+        let mut conn = self.conn().await?;
+        let result = conn.get(&percent_encode_path(key)).await?;
+        Ok(result.map(Buffer::from))
+    }
+
+    pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
+        let mut conn = self.conn().await?;
+
+        conn.set(
+            &percent_encode_path(key),
+            &value.to_vec(),
+            // Set expiration to 0 if ttl not set.
+            self.default_ttl
+                .map(|v| v.as_secs() as u32)
+                .unwrap_or_default(),
+        )
+        .await
+    }
+
+    pub async fn delete(&self, key: &str) -> Result<()> {
+        let mut conn = self.conn().await?;
+
+        conn.delete(&percent_encode_path(key)).await
+    }
+}
diff --git a/core/src/services/memcached/mod.rs 
b/core/src/services/memcached/deleter.rs
similarity index 60%
copy from core/src/services/memcached/mod.rs
copy to core/src/services/memcached/deleter.rs
index 4cb6779c7..725863711 100644
--- a/core/src/services/memcached/mod.rs
+++ b/core/src/services/memcached/deleter.rs
@@ -15,10 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod binary;
+use std::sync::Arc;
 
-mod backend;
-pub use backend::MemcachedBuilder as Memcached;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
 
-mod config;
-pub use config::MemcachedConfig;
+pub struct MemcachedDeleter {
+    core: Arc<MemcachedCore>,
+    root: String,
+}
+
+impl MemcachedDeleter {
+    pub fn new(core: Arc<MemcachedCore>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for MemcachedDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p).await?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/memcached/docs.md 
b/core/src/services/memcached/docs.md
index 40de9df81..f643930fa 100644
--- a/core/src/services/memcached/docs.md
+++ b/core/src/services/memcached/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [ ] create_dir
 - [x] delete
 - [ ] copy
 - [ ] rename
-- [ ] ~~list~~
+- [ ] list
 - [ ] ~~presign~~
-- [ ] blocking
 
 ## Configuration
 
diff --git a/core/src/services/memcached/mod.rs 
b/core/src/services/memcached/mod.rs
index 4cb6779c7..bd29318fb 100644
--- a/core/src/services/memcached/mod.rs
+++ b/core/src/services/memcached/mod.rs
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod backend;
 mod binary;
+mod core;
+mod deleter;
+mod writer;
 
-mod backend;
 pub use backend::MemcachedBuilder as Memcached;
 
 mod config;
diff --git a/core/src/services/memcached/writer.rs 
b/core/src/services/memcached/writer.rs
new file mode 100644
index 000000000..bb2e843b3
--- /dev/null
+++ b/core/src/services/memcached/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct MemcachedWriter {
+    core: Arc<MemcachedCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl MemcachedWriter {
+    pub fn new(core: Arc<MemcachedCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for MemcachedWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf).await?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to