This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new fb106f449 feat(services/memcached): Support connection via unix domain 
socket (#7112)
fb106f449 is described below

commit fb106f449544676745519673ba68e4dd53fe59cb
Author: Hugo <[email protected]>
AuthorDate: Thu Jan 22 13:04:51 2026 +0800

    feat(services/memcached): Support connection via unix domain socket (#7112)
    
    * feat(services/memcached): Support connection via unix domain socket
    
    - Switch dependency from `http` to `url` for endpoint parsing.
    - Introduce `SocketStream` to handle both `TcpStream` and `UnixStream`.
    
    * Update Cargo.lock
    
    * format toml
    
    * Update core.rs
    
    * fix(services/memcached): Fix build on non-unix platforms
    
    - Add `#[cfg(unix)]` checks for UnixSocket usage.
    - Refactor connection manager to use `Endpoint` enum.
    
    * trigger GitHub actions
    
    * Update core.rs
    
    * trigger GitHub actions
    
    * test(services/memcached): Add Unix socket test coverage
    
    - Add docker-compose-memcached-unix.yml for Unix socket setup
    - Add memcached_with_unix action for CI testing
    
    * fix: remove username and password
    
    ---------
    
    Co-authored-by: tison <[email protected]>
---
 .../memcached/memcached_with_unix/action.yml       |  52 ++++++++++
 core/Cargo.lock                                    |   2 +-
 core/services/memcached/Cargo.toml                 |   2 +-
 core/services/memcached/src/backend.rs             |  84 ++++++++++------
 core/services/memcached/src/binary.rs              |  12 +--
 core/services/memcached/src/core.rs                | 106 +++++++++++++++++++--
 .../memcached/docker-compose-memcached-unix.yml    |  44 ++++-----
 7 files changed, 230 insertions(+), 72 deletions(-)

diff --git a/.github/services/memcached/memcached_with_unix/action.yml 
b/.github/services/memcached/memcached_with_unix/action.yml
new file mode 100644
index 000000000..1fd09e689
--- /dev/null
+++ b/.github/services/memcached/memcached_with_unix/action.yml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: memcached_with_unix
+description: 'Behavior test for memcached with unix socket'
+
+runs:
+  using: "composite"
+  steps:
+    - name: Setup memcached server with Unix socket
+      shell: bash
+      working-directory: fixtures/memcached
+      run: |
+        # Create directory for the socket
+        sudo mkdir -p /tmp/memcached
+        sudo chmod 777 /tmp/memcached
+        docker compose -f docker-compose-memcached-unix.yml up -d --wait
+    - name: Wait for socket
+      shell: bash
+      run: |
+        # Wait for the socket file to be created
+        for i in {1..30}; do
+          if [ -S /tmp/memcached/memcached.sock ]; then
+            echo "Socket is ready"
+            break
+          fi
+          echo "Waiting for socket... ($i/30)"
+          sleep 1
+        done
+        # Verify socket exists
+        ls -la /tmp/memcached/
+    - name: Setup
+      shell: bash
+      run: |
+        cat << EOF >> $GITHUB_ENV
+        OPENDAL_MEMCACHED_ENDPOINT=unix:///tmp/memcached/memcached.sock
+        OPENDAL_MEMCACHED_ROOT=/
+        EOF
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 661025134..9c18a7c86 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6766,10 +6766,10 @@ name = "opendal-service-memcached"
 version = "0.55.0"
 dependencies = [
  "fastpool",
- "http 1.4.0",
  "opendal-core",
  "serde",
  "tokio",
+ "url",
 ]
 
 [[package]]
diff --git a/core/services/memcached/Cargo.toml 
b/core/services/memcached/Cargo.toml
index fa8ea87aa..f5c27e6f5 100644
--- a/core/services/memcached/Cargo.toml
+++ b/core/services/memcached/Cargo.toml
@@ -32,10 +32,10 @@ all-features = true
 
 [dependencies]
 fastpool = "1.0.2"
-http = { workspace = true }
 opendal-core = { path = "../../core", version = "0.55.0", default-features = 
false }
 serde = { workspace = true, features = ["derive"] }
 tokio = { workspace = true, features = ["net", "io-util"] }
+url = "2.5.7"
 
 [dev-dependencies]
 tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
diff --git a/core/services/memcached/src/backend.rs 
b/core/services/memcached/src/backend.rs
index 12ab94f0b..4ea91c0c9 100644
--- a/core/services/memcached/src/backend.rs
+++ b/core/services/memcached/src/backend.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::borrow::Cow;
 use std::sync::Arc;
+use url::Url;
 
 use opendal_core::raw::*;
 use opendal_core::*;
@@ -94,53 +96,73 @@ impl Builder for MemcachedBuilder {
     type Config = MemcachedConfig;
 
     fn build(self) -> Result<impl Access> {
-        let endpoint = self.config.endpoint.clone().ok_or_else(|| {
+        let endpoint_raw = self.config.endpoint.clone().ok_or_else(|| {
             Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
                 .with_context("service", MEMCACHED_SCHEME)
         })?;
-        let uri = http::Uri::try_from(&endpoint).map_err(|err| {
+
+        let url_str = if !endpoint_raw.contains("://") {
+            Cow::Owned(format!("tcp://{}", endpoint_raw))
+        } else {
+            Cow::Borrowed(endpoint_raw.as_str())
+        };
+
+        let parsed = Url::parse(&url_str).map_err(|err| {
             Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
                 .with_context("service", MEMCACHED_SCHEME)
-                .with_context("endpoint", &endpoint)
+                .with_context("endpoint", &endpoint_raw)
                 .set_source(err)
         })?;
 
-        match uri.scheme_str() {
-            // If scheme is none, we will use tcp by default.
-            None => (),
-            Some(scheme) => {
-                // We only support tcp by now.
-                if scheme != "tcp" {
+        let endpoint = match parsed.scheme() {
+            "tcp" => {
+                let host = parsed.host_str().ok_or_else(|| {
+                    Error::new(ErrorKind::ConfigInvalid, "tcp endpoint doesn't 
have host")
+                        .with_context("service", MEMCACHED_SCHEME)
+                        .with_context("endpoint", &endpoint_raw)
+                })?;
+                let port = parsed.port().ok_or_else(|| {
+                    Error::new(ErrorKind::ConfigInvalid, "tcp endpoint doesn't 
have port")
+                        .with_context("service", MEMCACHED_SCHEME)
+                        .with_context("endpoint", &endpoint_raw)
+                })?;
+                Endpoint::Tcp(format!("{host}:{port}"))
+            }
+
+            #[cfg(unix)]
+            "unix" => {
+                let path = parsed.path();
+                if path.is_empty() {
                     return Err(Error::new(
                         ErrorKind::ConfigInvalid,
-                        "endpoint is using invalid scheme",
+                        "unix endpoint doesn't have path",
                     )
                     .with_context("service", MEMCACHED_SCHEME)
-                    .with_context("endpoint", &endpoint)
-                    .with_context("scheme", scheme.to_string()));
+                    .with_context("endpoint", &endpoint_raw));
                 }
+                Endpoint::Unix(path.to_string())
             }
-        };
 
-        let host = if let Some(host) = uri.host() {
-            host.to_string()
-        } else {
-            return Err(
-                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have 
host")
-                    .with_context("service", MEMCACHED_SCHEME)
-                    .with_context("endpoint", &endpoint),
-            );
-        };
-        let port = if let Some(port) = uri.port_u16() {
-            port
-        } else {
-            return Err(
-                Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have 
port")
-                    .with_context("service", MEMCACHED_SCHEME)
-                    .with_context("endpoint", &endpoint),
-            );
+            #[cfg(not(unix))]
+            "unix" => {
+                return Err(Error::new(
+                    ErrorKind::ConfigInvalid,
+                    "unix socket is not supported on this platform",
+                )
+                .with_context("service", MEMCACHED_SCHEME)
+                .with_context("endpoint", &endpoint_raw));
+            }
+
+            scheme => {
+                return Err(Error::new(
+                    ErrorKind::ConfigInvalid,
+                    "endpoint is using invalid scheme, only tcp and unix are 
supported",
+                )
+                .with_context("service", MEMCACHED_SCHEME)
+                .with_context("endpoint", &endpoint_raw)
+                .with_context("scheme", scheme));
+            }
         };
-        let endpoint = format!("{host}:{port}",);
 
         let root = normalize_root(self.config.root.unwrap_or_else(|| 
"/".to_string()).as_str());
 
diff --git a/core/services/memcached/src/binary.rs 
b/core/services/memcached/src/binary.rs
index 205b94fca..ec9c72abc 100644
--- a/core/services/memcached/src/binary.rs
+++ b/core/services/memcached/src/binary.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::core::SocketStream;
 use opendal_core::raw::*;
 use opendal_core::*;
 use tokio::io;
 use tokio::io::AsyncReadExt;
 use tokio::io::AsyncWriteExt;
 use tokio::io::BufReader;
-use tokio::net::TcpStream;
 
 pub(super) mod constants {
     pub const OK_STATUS: u16 = 0x0;
@@ -60,7 +60,7 @@ pub struct PacketHeader {
 }
 
 impl PacketHeader {
-    pub async fn write(self, writer: &mut TcpStream) -> io::Result<()> {
+    pub async fn write(self, writer: &mut SocketStream) -> io::Result<()> {
         writer.write_u8(self.magic).await?;
         writer.write_u8(self.opcode).await?;
         writer.write_u16(self.key_length).await?;
@@ -73,7 +73,7 @@ impl PacketHeader {
         Ok(())
     }
 
-    pub async fn read(reader: &mut TcpStream) -> Result<PacketHeader, 
io::Error> {
+    pub async fn read(reader: &mut SocketStream) -> Result<PacketHeader, 
io::Error> {
         let header = PacketHeader {
             magic: reader.read_u8().await?,
             opcode: reader.read_u8().await?,
@@ -98,11 +98,11 @@ pub struct Response {
 
 #[derive(Debug)]
 pub struct Connection {
-    io: BufReader<TcpStream>,
+    io: BufReader<SocketStream>,
 }
 
 impl Connection {
-    pub fn new(io: TcpStream) -> Self {
+    pub fn new(io: SocketStream) -> Self {
         Self {
             io: BufReader::new(io),
         }
@@ -246,7 +246,7 @@ impl Connection {
     }
 }
 
-pub async fn parse_response(reader: &mut TcpStream) -> Result<Response> {
+pub async fn parse_response(reader: &mut SocketStream) -> Result<Response> {
     let header = PacketHeader::read(reader).await.map_err(new_std_io_error)?;
 
     if header.vbucket_id_or_status != constants::OK_STATUS
diff --git a/core/services/memcached/src/core.rs 
b/core/services/memcached/src/core.rs
index f05b8f48f..1221bb08e 100644
--- a/core/services/memcached/src/core.rs
+++ b/core/services/memcached/src/core.rs
@@ -22,22 +22,104 @@ use fastpool::ObjectStatus;
 use fastpool::bounded;
 use opendal_core::raw::*;
 use opendal_core::*;
+use std::io;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
 use tokio::net::TcpStream;
+#[cfg(unix)]
+use tokio::net::UnixStream;
 
 use super::binary;
 
+#[derive(Debug)]
+pub enum SocketStream {
+    Tcp(TcpStream),
+    #[cfg(unix)]
+    Unix(UnixStream),
+}
+
+impl SocketStream {
+    pub async fn connect_tcp(addr_str: &str) -> io::Result<Self> {
+        let socket_addr: SocketAddr = addr_str
+            .parse()
+            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+        let stream = TcpStream::connect(socket_addr).await?;
+        Ok(SocketStream::Tcp(stream))
+    }
+
+    #[cfg(unix)]
+    pub async fn connect_unix(path: &str) -> io::Result<Self> {
+        let stream = UnixStream::connect(path).await?;
+        Ok(SocketStream::Unix(stream))
+    }
+}
+
+impl AsyncRead for SocketStream {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        match self.get_mut() {
+            SocketStream::Tcp(s) => Pin::new(s).poll_read(cx, buf),
+            #[cfg(unix)]
+            SocketStream::Unix(s) => Pin::new(s).poll_read(cx, buf),
+        }
+    }
+}
+
+impl AsyncWrite for SocketStream {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        match self.get_mut() {
+            SocketStream::Tcp(s) => Pin::new(s).poll_write(cx, buf),
+            #[cfg(unix)]
+            SocketStream::Unix(s) => Pin::new(s).poll_write(cx, buf),
+        }
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<io::Result<()>> {
+        match self.get_mut() {
+            SocketStream::Tcp(s) => Pin::new(s).poll_flush(cx),
+            #[cfg(unix)]
+            SocketStream::Unix(s) => Pin::new(s).poll_flush(cx),
+        }
+    }
+
+    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<io::Result<()>> {
+        match self.get_mut() {
+            SocketStream::Tcp(s) => Pin::new(s).poll_shutdown(cx),
+            #[cfg(unix)]
+            SocketStream::Unix(s) => Pin::new(s).poll_shutdown(cx),
+        }
+    }
+}
+
+/// Endpoint for memcached connection.
+#[derive(Clone, Debug)]
+pub enum Endpoint {
+    Tcp(String), // host:port
+    #[cfg(unix)]
+    Unix(String), // socket path
+}
+
 /// A connection manager for `memcache_async::ascii::Protocol`.
 #[derive(Clone)]
 struct MemcacheConnectionManager {
-    address: String,
+    endpoint: Endpoint,
     username: Option<String>,
     password: Option<String>,
 }
 
 impl MemcacheConnectionManager {
-    fn new(address: &str, username: Option<String>, password: Option<String>) 
-> Self {
+    fn new(endpoint: Endpoint, username: Option<String>, password: 
Option<String>) -> Self {
         Self {
-            address: address.to_string(),
+            endpoint,
             username,
             password,
         }
@@ -48,11 +130,17 @@ impl ManageObject for MemcacheConnectionManager {
     type Object = binary::Connection;
     type Error = Error;
 
-    /// TODO: Implement unix stream support.
     async fn create(&self) -> Result<Self::Object, Self::Error> {
-        let conn = TcpStream::connect(&self.address)
-            .await
-            .map_err(new_std_io_error)?;
+        let conn = match &self.endpoint {
+            Endpoint::Tcp(addr) => SocketStream::connect_tcp(addr)
+                .await
+                .map_err(new_std_io_error)?,
+            #[cfg(unix)]
+            Endpoint::Unix(path) => SocketStream::connect_unix(path)
+                .await
+                .map_err(new_std_io_error)?,
+        };
+
         let mut conn = binary::Connection::new(conn);
 
         if let (Some(username), Some(password)) = (self.username.as_ref(), 
self.password.as_ref()) {
@@ -81,7 +169,7 @@ pub struct MemcachedCore {
 
 impl MemcachedCore {
     pub fn new(
-        endpoint: String,
+        endpoint: Endpoint,
         username: Option<String>,
         password: Option<String>,
         default_ttl: Option<Duration>,
@@ -89,7 +177,7 @@ impl MemcachedCore {
     ) -> Self {
         let conn = bounded::Pool::new(
             bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
-            MemcacheConnectionManager::new(endpoint.as_str(), username, 
password),
+            MemcacheConnectionManager::new(endpoint, username, password),
         );
 
         Self { default_ttl, conn }
diff --git a/core/services/memcached/Cargo.toml 
b/fixtures/memcached/docker-compose-memcached-unix.yml
similarity index 51%
copy from core/services/memcached/Cargo.toml
copy to fixtures/memcached/docker-compose-memcached-unix.yml
index fa8ea87aa..41c0ab5ad 100644
--- a/core/services/memcached/Cargo.toml
+++ b/fixtures/memcached/docker-compose-memcached-unix.yml
@@ -15,27 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[package]
-description = "Apache OpenDAL Memcached service implementation"
-name = "opendal-service-memcached"
-
-authors = { workspace = true }
-edition = { workspace = true }
-homepage = { workspace = true }
-license = { workspace = true }
-repository = { workspace = true }
-rust-version = { workspace = true }
-version = { workspace = true }
-
-[package.metadata.docs.rs]
-all-features = true
-
-[dependencies]
-fastpool = "1.0.2"
-http = { workspace = true }
-opendal-core = { path = "../../core", version = "0.55.0", default-features = 
false }
-serde = { workspace = true, features = ["derive"] }
-tokio = { workspace = true, features = ["net", "io-util"] }
-
-[dev-dependencies]
-tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
+services:
+  memcached:
+    image: bitnami/memcached:latest
+    environment:
+      # memcache's max item size is 1MiB, But opendal's behavior tests
+      # will produce larger file.
+      #
+      # Specify the setting here to make our test happy.
+      MEMCACHED_MAX_ITEM_SIZE: 16777216
+      # Enable Unix socket with extra flags
+      # -s: socket path
+      # -a: socket access mask (0777 for world-readable/writable)
+      MEMCACHED_EXTRA_FLAGS: "-s /tmp/memcached/memcached.sock -a 0777"
+    volumes:
+      - /tmp/memcached:/tmp/memcached
+    healthcheck:
+      test: ["CMD-SHELL", "test -S /tmp/memcached/memcached.sock"]
+      interval: 5s
+      timeout: 5s
+      retries: 5

Reply via email to