This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new fb106f449 feat(services/memcached): Support connection via unix domain
socket (#7112)
fb106f449 is described below
commit fb106f449544676745519673ba68e4dd53fe59cb
Author: Hugo <[email protected]>
AuthorDate: Thu Jan 22 13:04:51 2026 +0800
feat(services/memcached): Support connection via unix domain socket (#7112)
* feat(services/memcached): Support connection via unix domain socket
- Switch dependency from `http` to `url` for endpoint parsing.
- Introduce `SocketStream` to handle both `TcpStream` and `UnixStream`.
* Update Cargo.lock
* format toml
* Update core.rs
* fix(services/memcached): Fix build on non-unix platforms
- Add `#[cfg(unix)]` checks for UnixSocket usage.
- Refactor connection manager to use `Endpoint` enum.
* trigger GitHub actions
* Update core.rs
* trigger GitHub actions
* test(services/memcached): Add Unix socket test coverage
- Add docker-compose-memcached-unix.yml for Unix socket setup
- Add memcached_with_unix action for CI testing
* fix: remove username and password
---------
Co-authored-by: tison <[email protected]>
---
.../memcached/memcached_with_unix/action.yml | 52 ++++++++++
core/Cargo.lock | 2 +-
core/services/memcached/Cargo.toml | 2 +-
core/services/memcached/src/backend.rs | 84 ++++++++++------
core/services/memcached/src/binary.rs | 12 +--
core/services/memcached/src/core.rs | 106 +++++++++++++++++++--
.../memcached/docker-compose-memcached-unix.yml | 44 ++++-----
7 files changed, 230 insertions(+), 72 deletions(-)
diff --git a/.github/services/memcached/memcached_with_unix/action.yml
b/.github/services/memcached/memcached_with_unix/action.yml
new file mode 100644
index 000000000..1fd09e689
--- /dev/null
+++ b/.github/services/memcached/memcached_with_unix/action.yml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: memcached_with_unix
+description: 'Behavior test for memcached with unix socket'
+
+runs:
+ using: "composite"
+ steps:
+ - name: Setup memcached server with Unix socket
+ shell: bash
+ working-directory: fixtures/memcached
+ run: |
+ # Create directory for the socket
+ sudo mkdir -p /tmp/memcached
+ sudo chmod 777 /tmp/memcached
+ docker compose -f docker-compose-memcached-unix.yml up -d --wait
+ - name: Wait for socket
+ shell: bash
+ run: |
+ # Wait for the socket file to be created
+ for i in {1..30}; do
+ if [ -S /tmp/memcached/memcached.sock ]; then
+ echo "Socket is ready"
+ break
+ fi
+ echo "Waiting for socket... ($i/30)"
+ sleep 1
+ done
+ # Verify socket exists
+ ls -la /tmp/memcached/
+ - name: Setup
+ shell: bash
+ run: |
+ cat << EOF >> $GITHUB_ENV
+ OPENDAL_MEMCACHED_ENDPOINT=unix:///tmp/memcached/memcached.sock
+ OPENDAL_MEMCACHED_ROOT=/
+ EOF
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 661025134..9c18a7c86 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6766,10 +6766,10 @@ name = "opendal-service-memcached"
version = "0.55.0"
dependencies = [
"fastpool",
- "http 1.4.0",
"opendal-core",
"serde",
"tokio",
+ "url",
]
[[package]]
diff --git a/core/services/memcached/Cargo.toml
b/core/services/memcached/Cargo.toml
index fa8ea87aa..f5c27e6f5 100644
--- a/core/services/memcached/Cargo.toml
+++ b/core/services/memcached/Cargo.toml
@@ -32,10 +32,10 @@ all-features = true
[dependencies]
fastpool = "1.0.2"
-http = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["net", "io-util"] }
+url = "2.5.7"
[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
diff --git a/core/services/memcached/src/backend.rs
b/core/services/memcached/src/backend.rs
index 12ab94f0b..4ea91c0c9 100644
--- a/core/services/memcached/src/backend.rs
+++ b/core/services/memcached/src/backend.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use std::borrow::Cow;
use std::sync::Arc;
+use url::Url;
use opendal_core::raw::*;
use opendal_core::*;
@@ -94,53 +96,73 @@ impl Builder for MemcachedBuilder {
type Config = MemcachedConfig;
fn build(self) -> Result<impl Access> {
- let endpoint = self.config.endpoint.clone().ok_or_else(|| {
+ let endpoint_raw = self.config.endpoint.clone().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
.with_context("service", MEMCACHED_SCHEME)
})?;
- let uri = http::Uri::try_from(&endpoint).map_err(|err| {
+
+ let url_str = if !endpoint_raw.contains("://") {
+ Cow::Owned(format!("tcp://{}", endpoint_raw))
+ } else {
+ Cow::Borrowed(endpoint_raw.as_str())
+ };
+
+ let parsed = Url::parse(&url_str).map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
.with_context("service", MEMCACHED_SCHEME)
- .with_context("endpoint", &endpoint)
+ .with_context("endpoint", &endpoint_raw)
.set_source(err)
})?;
- match uri.scheme_str() {
- // If scheme is none, we will use tcp by default.
- None => (),
- Some(scheme) => {
- // We only support tcp by now.
- if scheme != "tcp" {
+ let endpoint = match parsed.scheme() {
+ "tcp" => {
+ let host = parsed.host_str().ok_or_else(|| {
+ Error::new(ErrorKind::ConfigInvalid, "tcp endpoint doesn't
have host")
+ .with_context("service", MEMCACHED_SCHEME)
+ .with_context("endpoint", &endpoint_raw)
+ })?;
+ let port = parsed.port().ok_or_else(|| {
+ Error::new(ErrorKind::ConfigInvalid, "tcp endpoint doesn't
have port")
+ .with_context("service", MEMCACHED_SCHEME)
+ .with_context("endpoint", &endpoint_raw)
+ })?;
+ Endpoint::Tcp(format!("{host}:{port}"))
+ }
+
+ #[cfg(unix)]
+ "unix" => {
+ let path = parsed.path();
+ if path.is_empty() {
return Err(Error::new(
ErrorKind::ConfigInvalid,
- "endpoint is using invalid scheme",
+ "unix endpoint doesn't have path",
)
.with_context("service", MEMCACHED_SCHEME)
- .with_context("endpoint", &endpoint)
- .with_context("scheme", scheme.to_string()));
+ .with_context("endpoint", &endpoint_raw));
}
+ Endpoint::Unix(path.to_string())
}
- };
- let host = if let Some(host) = uri.host() {
- host.to_string()
- } else {
- return Err(
- Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have
host")
- .with_context("service", MEMCACHED_SCHEME)
- .with_context("endpoint", &endpoint),
- );
- };
- let port = if let Some(port) = uri.port_u16() {
- port
- } else {
- return Err(
- Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have
port")
- .with_context("service", MEMCACHED_SCHEME)
- .with_context("endpoint", &endpoint),
- );
+ #[cfg(not(unix))]
+ "unix" => {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "unix socket is not supported on this platform",
+ )
+ .with_context("service", MEMCACHED_SCHEME)
+ .with_context("endpoint", &endpoint_raw));
+ }
+
+ scheme => {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "endpoint is using invalid scheme, only tcp and unix are
supported",
+ )
+ .with_context("service", MEMCACHED_SCHEME)
+ .with_context("endpoint", &endpoint_raw)
+ .with_context("scheme", scheme));
+ }
};
- let endpoint = format!("{host}:{port}",);
let root = normalize_root(self.config.root.unwrap_or_else(||
"/".to_string()).as_str());
diff --git a/core/services/memcached/src/binary.rs
b/core/services/memcached/src/binary.rs
index 205b94fca..ec9c72abc 100644
--- a/core/services/memcached/src/binary.rs
+++ b/core/services/memcached/src/binary.rs
@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use crate::core::SocketStream;
use opendal_core::raw::*;
use opendal_core::*;
use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
-use tokio::net::TcpStream;
pub(super) mod constants {
pub const OK_STATUS: u16 = 0x0;
@@ -60,7 +60,7 @@ pub struct PacketHeader {
}
impl PacketHeader {
- pub async fn write(self, writer: &mut TcpStream) -> io::Result<()> {
+ pub async fn write(self, writer: &mut SocketStream) -> io::Result<()> {
writer.write_u8(self.magic).await?;
writer.write_u8(self.opcode).await?;
writer.write_u16(self.key_length).await?;
@@ -73,7 +73,7 @@ impl PacketHeader {
Ok(())
}
- pub async fn read(reader: &mut TcpStream) -> Result<PacketHeader,
io::Error> {
+ pub async fn read(reader: &mut SocketStream) -> Result<PacketHeader,
io::Error> {
let header = PacketHeader {
magic: reader.read_u8().await?,
opcode: reader.read_u8().await?,
@@ -98,11 +98,11 @@ pub struct Response {
#[derive(Debug)]
pub struct Connection {
- io: BufReader<TcpStream>,
+ io: BufReader<SocketStream>,
}
impl Connection {
- pub fn new(io: TcpStream) -> Self {
+ pub fn new(io: SocketStream) -> Self {
Self {
io: BufReader::new(io),
}
@@ -246,7 +246,7 @@ impl Connection {
}
}
-pub async fn parse_response(reader: &mut TcpStream) -> Result<Response> {
+pub async fn parse_response(reader: &mut SocketStream) -> Result<Response> {
let header = PacketHeader::read(reader).await.map_err(new_std_io_error)?;
if header.vbucket_id_or_status != constants::OK_STATUS
diff --git a/core/services/memcached/src/core.rs
b/core/services/memcached/src/core.rs
index f05b8f48f..1221bb08e 100644
--- a/core/services/memcached/src/core.rs
+++ b/core/services/memcached/src/core.rs
@@ -22,22 +22,104 @@ use fastpool::ObjectStatus;
use fastpool::bounded;
use opendal_core::raw::*;
use opendal_core::*;
+use std::io;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
+#[cfg(unix)]
+use tokio::net::UnixStream;
use super::binary;
+#[derive(Debug)]
+pub enum SocketStream {
+ Tcp(TcpStream),
+ #[cfg(unix)]
+ Unix(UnixStream),
+}
+
+impl SocketStream {
+ pub async fn connect_tcp(addr_str: &str) -> io::Result<Self> {
+ let socket_addr: SocketAddr = addr_str
+ .parse()
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+ let stream = TcpStream::connect(socket_addr).await?;
+ Ok(SocketStream::Tcp(stream))
+ }
+
+ #[cfg(unix)]
+ pub async fn connect_unix(path: &str) -> io::Result<Self> {
+ let stream = UnixStream::connect(path).await?;
+ Ok(SocketStream::Unix(stream))
+ }
+}
+
+impl AsyncRead for SocketStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.get_mut() {
+ SocketStream::Tcp(s) => Pin::new(s).poll_read(cx, buf),
+ #[cfg(unix)]
+ SocketStream::Unix(s) => Pin::new(s).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for SocketStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.get_mut() {
+ SocketStream::Tcp(s) => Pin::new(s).poll_write(cx, buf),
+ #[cfg(unix)]
+ SocketStream::Unix(s) => Pin::new(s).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<io::Result<()>> {
+ match self.get_mut() {
+ SocketStream::Tcp(s) => Pin::new(s).poll_flush(cx),
+ #[cfg(unix)]
+ SocketStream::Unix(s) => Pin::new(s).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<io::Result<()>> {
+ match self.get_mut() {
+ SocketStream::Tcp(s) => Pin::new(s).poll_shutdown(cx),
+ #[cfg(unix)]
+ SocketStream::Unix(s) => Pin::new(s).poll_shutdown(cx),
+ }
+ }
+}
+
+/// Endpoint for memcached connection.
+#[derive(Clone, Debug)]
+pub enum Endpoint {
+ Tcp(String), // host:port
+ #[cfg(unix)]
+ Unix(String), // socket path
+}
+
/// A connection manager for `memcache_async::ascii::Protocol`.
#[derive(Clone)]
struct MemcacheConnectionManager {
- address: String,
+ endpoint: Endpoint,
username: Option<String>,
password: Option<String>,
}
impl MemcacheConnectionManager {
- fn new(address: &str, username: Option<String>, password: Option<String>)
-> Self {
+ fn new(endpoint: Endpoint, username: Option<String>, password:
Option<String>) -> Self {
Self {
- address: address.to_string(),
+ endpoint,
username,
password,
}
@@ -48,11 +130,17 @@ impl ManageObject for MemcacheConnectionManager {
type Object = binary::Connection;
type Error = Error;
- /// TODO: Implement unix stream support.
async fn create(&self) -> Result<Self::Object, Self::Error> {
- let conn = TcpStream::connect(&self.address)
- .await
- .map_err(new_std_io_error)?;
+ let conn = match &self.endpoint {
+ Endpoint::Tcp(addr) => SocketStream::connect_tcp(addr)
+ .await
+ .map_err(new_std_io_error)?,
+ #[cfg(unix)]
+ Endpoint::Unix(path) => SocketStream::connect_unix(path)
+ .await
+ .map_err(new_std_io_error)?,
+ };
+
let mut conn = binary::Connection::new(conn);
if let (Some(username), Some(password)) = (self.username.as_ref(),
self.password.as_ref()) {
@@ -81,7 +169,7 @@ pub struct MemcachedCore {
impl MemcachedCore {
pub fn new(
- endpoint: String,
+ endpoint: Endpoint,
username: Option<String>,
password: Option<String>,
default_ttl: Option<Duration>,
@@ -89,7 +177,7 @@ impl MemcachedCore {
) -> Self {
let conn = bounded::Pool::new(
bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
- MemcacheConnectionManager::new(endpoint.as_str(), username,
password),
+ MemcacheConnectionManager::new(endpoint, username, password),
);
Self { default_ttl, conn }
diff --git a/core/services/memcached/Cargo.toml
b/fixtures/memcached/docker-compose-memcached-unix.yml
similarity index 51%
copy from core/services/memcached/Cargo.toml
copy to fixtures/memcached/docker-compose-memcached-unix.yml
index fa8ea87aa..41c0ab5ad 100644
--- a/core/services/memcached/Cargo.toml
+++ b/fixtures/memcached/docker-compose-memcached-unix.yml
@@ -15,27 +15,23 @@
# specific language governing permissions and limitations
# under the License.
-[package]
-description = "Apache OpenDAL Memcached service implementation"
-name = "opendal-service-memcached"
-
-authors = { workspace = true }
-edition = { workspace = true }
-homepage = { workspace = true }
-license = { workspace = true }
-repository = { workspace = true }
-rust-version = { workspace = true }
-version = { workspace = true }
-
-[package.metadata.docs.rs]
-all-features = true
-
-[dependencies]
-fastpool = "1.0.2"
-http = { workspace = true }
-opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
-serde = { workspace = true, features = ["derive"] }
-tokio = { workspace = true, features = ["net", "io-util"] }
-
-[dev-dependencies]
-tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
+services:
+ memcached:
+ image: bitnami/memcached:latest
+ environment:
+ # memcache's max item size is 1MiB, But opendal's behavior tests
+ # will produce larger file.
+ #
+ # Specify the setting here to make our test happy.
+ MEMCACHED_MAX_ITEM_SIZE: 16777216
+ # Enable Unix socket with extra flags
+ # -s: socket path
+ # -a: socket access mask (0777 for world-readable/writable)
+ MEMCACHED_EXTRA_FLAGS: "-s /tmp/memcached/memcached.sock -a 0777"
+ volumes:
+ - /tmp/memcached:/tmp/memcached
+ healthcheck:
+ test: ["CMD-SHELL", "test -S /tmp/memcached/memcached.sock"]
+ interval: 5s
+ timeout: 5s
+ retries: 5