This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 59c1cce8 feat(layer): add madsim layer (#2006)
59c1cce8 is described below
commit 59c1cce881d4f1d9009b13b7c5a0597b3d79b418
Author: Sky Fan <[email protected]>
AuthorDate: Tue Apr 18 16:57:51 2023 +0800
feat(layer): add madsim layer (#2006)
* fix typo
* ,
* archive
* skeleton
* fmt
* panic in blocking func
* add license header
* fmt
* add doc
* delete unrelated changes
* refactor: madsim as a feature, delete real mod
* fix conflict
* delete useless modification
* update madsim version
* mark blocking func as not supported
* fix details
* rename struct
* throw error, rename var, set cap
---
core/Cargo.toml | 4 +
core/src/layers/madsim.rs | 419 ++++++++++++++++++++++++++++++++++++++++++++++
core/src/layers/mod.rs | 12 ++
3 files changed, 435 insertions(+)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f8b3ec78..cb1590d5 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -74,6 +74,7 @@ layers-all = [
"layers-prometheus",
"layers-tracing",
"layers-minitrace",
+ "layers-madsim"
]
# Enable layers chaos support
layers-chaos = ["dep:rand"]
@@ -81,6 +82,8 @@ layers-chaos = ["dep:rand"]
layers-metrics = ["dep:metrics"]
# Enable layers prometheus support
layers-prometheus = ["dep:prometheus"]
+# Enable layers madsim support
+layers-madsim = ["dep:madsim"]
# Enable layers minitrace support.
layers-minitrace = ["dep:minitrace"]
# Enable layers tracing support.
@@ -163,6 +166,7 @@ http = "0.2.5"
hyper = "0.14"
lazy-regex = { version = "2.5.0", optional = true }
log = "0.4"
+madsim = {version = "0.2.21", optional = true }
md-5 = "0.10"
metrics = { version = "0.20", optional = true }
minitrace = { version = "0.4.0", optional = true }
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
new file mode 100644
index 00000000..b76ebc94
--- /dev/null
+++ b/core/src/layers/madsim.rs
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ops::{OpList, OpRead, OpScan, OpWrite};
+use crate::raw::oio::Entry;
+use crate::raw::AccessorCapability;
+use crate::raw::AccessorInfo;
+use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead,
RpScan, RpWrite};
+use crate::types::Error;
+use crate::types::ErrorKind;
+use async_trait::async_trait;
+use bytes::Bytes;
+use madsim::net::Endpoint;
+use madsim::net::Payload;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Result;
+use std::io::SeekFrom;
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::task::{Context, Poll};
+
+/// Add deterministic simulation for async operations, powered by
[`madsim`](https://docs.rs/madsim/latest/madsim/).
+///
+/// # Note
+///
+/// - blocking operations are not supported, as
[`madsim`](https://docs.rs/madsim/latest/madsim/) is async only.
+///
+///
+/// # Examples
+///
+/// ```
+/// use opendal::Operator;
+/// use opendal::services;
+/// use opendal::layers::MadsimLayer;
+/// use opendal::layers::MadsimServer;
+/// use madsim::{net::NetSim, runtime::Handle, time::sleep};
+/// use std::time::Duration;
+///
+/// #[cfg(madsim)]
+/// #[madsim::test]
+/// async fn deterministic_simulation_test(){
+/// let handle = Handle::current();
+/// let ip1 = "10.0.0.1".parse().unwrap();
+/// let ip2 = "10.0.0.2".parse().unwrap();
+/// let server_addr = "10.0.0.1:2379".parse().unwrap();
+/// let server = handle.create_node().name("server").ip(ip1).build();
+/// let client = handle.create_node().name("client").ip(ip2).build();
+///
+/// server.spawn(async move {
+/// SimServer::serve(server_addr).await.unwrap();
+/// });
+/// sleep(Duration::from_secs(1)).await;
+///
+/// let handle = client.spawn(async move {
+/// let mut builder = services::Fs::default();
+/// builder.root(".");
+/// let op = Operator::new(builder)
+/// .unwrap()
+/// .layer(MadsimLayer::new(server_addr))
+/// .finish();
+///
+/// let path = "hello.txt";
+/// let data = "Hello, World!";
+/// op.write(path, data).await.unwrap();
+/// assert_eq!(data.as_bytes(), op.read(path).await.unwrap());
+/// });
+/// handle.await.unwrap();
+/// }
+/// ```
+/// To enable logging output, please set `RUSTFLAGS="--cfg madsim"`:
+/// ```shell
+/// RUSTFLAGS="--cfg madsim" cargo test
+/// ```
+#[derive(Debug, Copy, Clone)]
+pub struct MadsimLayer {
+ addr: SocketAddr,
+}
+
+impl MadsimLayer {
+ pub fn new(endpoint: &str) -> Self {
+ Self {
+ addr: endpoint.parse().unwrap(),
+ }
+ }
+}
+
+impl<A: Accessor> Layer<A> for MadsimLayer {
+ type LayeredAccessor = MadsimAccessor;
+
+ fn layer(&self, _: A) -> Self::LayeredAccessor {
+ MadsimAccessor { addr: self.addr }
+ }
+}
+
+#[derive(Debug)]
+pub struct MadsimAccessor {
+ addr: SocketAddr,
+}
+
+#[async_trait]
+impl LayeredAccessor for MadsimAccessor {
+ type Inner = ();
+ type Reader = MadsimReader;
+ type BlockingReader = ();
+ type Writer = MadsimWriter;
+ type BlockingWriter = ();
+ type Pager = MadsimPager;
+ type BlockingPager = ();
+
+ fn inner(&self) -> &Self::Inner {
+ &()
+ }
+
+ fn metadata(&self) -> AccessorInfo {
+ let mut info = AccessorInfo::default();
+ info.set_name("madsim");
+ info.set_capabilities(AccessorCapability::Read |
AccessorCapability::Write);
+ info
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead,
Self::Reader)> {
+ let req = Request::Read(path.to_string(), args);
+ let ep = Endpoint::connect(self.addr)
+ .await
+ .expect("fail to connect to sim server");
+ let (tx, mut rx) = ep
+ .connect1(self.addr)
+ .await
+ .expect("fail to connect1 to sim server");
+ tx.send(Box::new(req))
+ .await
+ .expect("fail to send request to sim server");
+ let resp = rx
+ .recv()
+ .await
+ .expect("fail to recv response from sim server");
+ let resp = resp
+ .downcast::<ReadResponse>()
+ .expect("fail to downcast response to ReadResponse");
+ let content_length = resp.data.as_ref().map(|b| b.len()).unwrap_or(0);
+ Ok((
+ RpRead::new(content_length as u64),
+ MadsimReader { data: resp.data },
+ ))
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) ->
crate::Result<(RpWrite, Self::Writer)> {
+ Ok((
+ RpWrite::default(),
+ MadsimWriter {
+ path: path.to_string(),
+ args,
+ addr: self.addr,
+ },
+ ))
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList,
Self::Pager)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ ))
+ }
+
+ async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan,
Self::Pager)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ ))
+ }
+
+ fn blocking_read(
+ &self,
+ path: &str,
+ args: OpRead,
+ ) -> crate::Result<(RpRead, Self::BlockingReader)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will not be supported in MadsimLayer",
+ ))
+ }
+
+ fn blocking_write(
+ &self,
+ path: &str,
+ args: OpWrite,
+ ) -> crate::Result<(RpWrite, Self::BlockingWriter)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will not be supported in MadsimLayer",
+ ))
+ }
+
+ fn blocking_list(
+ &self,
+ path: &str,
+ args: OpList,
+ ) -> crate::Result<(RpList, Self::BlockingPager)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will not be supported in MadsimLayer",
+ ))
+ }
+
+ fn blocking_scan(
+ &self,
+ path: &str,
+ args: OpScan,
+ ) -> crate::Result<(RpScan, Self::BlockingPager)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will not be supported in MadsimLayer",
+ ))
+ }
+}
+
+pub struct MadsimReader {
+ data: Option<Bytes>,
+}
+
+impl oio::Read for MadsimReader {
+ fn poll_read(&mut self, _cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<crate::Result<usize>> {
+ if let Some(ref data) = self.data {
+ let len = data.len();
+ buf[..len].copy_from_slice(data);
+ Poll::Ready(Ok(len))
+ } else {
+ Poll::Ready(Ok(0))
+ }
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<crate::Result<u64>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ )))
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<crate::Result<Bytes>>> {
+ Poll::Ready(Some(Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ ))))
+ }
+}
+
+pub struct MadsimWriter {
+ path: String,
+ args: OpWrite,
+ addr: SocketAddr,
+}
+
+#[async_trait]
+impl oio::Write for MadsimWriter {
+ async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
+ let req = Request::Write(self.path.to_string(), bs);
+ let ep = Endpoint::connect(self.addr).await?;
+ let (tx, mut rx) = ep.connect1(self.addr).await?;
+ tx.send(Box::new(req)).await?;
+ rx.recv().await?;
+ Ok(())
+ }
+
+ async fn append(&mut self, bs: Bytes) -> crate::Result<()> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ ))
+ }
+
+ async fn abort(&mut self) -> crate::Result<()> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ ))
+ }
+
+ async fn close(&mut self) -> crate::Result<()> {
+ Ok(())
+ }
+}
+
+pub struct MadsimPager {}
+
+#[async_trait]
+impl oio::Page for MadsimPager {
+ async fn next(&mut self) -> crate::Result<Option<Vec<Entry>>> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will be supported in the future",
+ ))
+ }
+}
+
+impl From<std::io::Error> for Error {
+ fn from(e: std::io::Error) -> Self {
+ Error::new(ErrorKind::Unexpected, "madsim error")
+ }
+}
+
+/// A simulated server.This an experimental feature, docs are not ready yet.
+#[derive(Default, Clone)]
+pub struct MadsimServer;
+
+impl MadsimServer {
+ pub async fn serve(addr: SocketAddr) -> Result<()> {
+ let ep = Endpoint::bind(addr).await?;
+ let service = Arc::new(SimService::default());
+ loop {
+ let (tx, mut rx, _) = ep.accept1().await?;
+ let service = service.clone();
+ madsim::task::spawn(async move {
+ let request = *rx
+ .recv()
+ .await?
+ .downcast::<Request>()
+ .expect("invalid request");
+ let response = match request {
+ Request::Read(path, args) => {
+ Box::new(service.read(&path, args).await) as Payload
+ }
+ Request::Write(path, args) => {
+ Box::new(service.write(&path, args).await) as Payload
+ }
+ };
+ tx.send(response).await?;
+ Ok(()) as Result<()>
+ });
+ }
+ }
+}
+
+enum Request {
+ Read(String, OpRead),
+ Write(String, Bytes),
+}
+
+#[derive(Default)]
+pub struct SimService {
+ inner: Mutex<HashMap<String, Bytes>>,
+}
+
+impl SimService {
+ async fn read(&self, path: &str, args: OpRead) -> ReadResponse {
+ let inner = self.inner.lock().unwrap();
+ let data = inner.get(path);
+ ReadResponse {
+ data: data.cloned(),
+ }
+ }
+
+ async fn write(&self, path: &str, data: Bytes) -> WriteResponse {
+ let mut inner = self.inner.lock().unwrap();
+ inner.insert(path.to_string(), data);
+ WriteResponse {}
+ }
+}
+
+struct ReadResponse {
+ data: Option<Bytes>,
+}
+
+struct WriteResponse {}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::{services, Operator};
+ use madsim::{runtime::Handle, time::sleep};
+ use std::time::Duration;
+
+ #[madsim::test]
+ async fn test_madsim_layer() {
+ let handle = Handle::current();
+ let ip1 = "10.0.0.1".parse().unwrap();
+ let ip2 = "10.0.0.2".parse().unwrap();
+ let server_addr = "10.0.0.1:2379";
+ let server = handle.create_node().name("server").ip(ip1).build();
+ let client = handle.create_node().name("client").ip(ip2).build();
+
+ server.spawn(async move {
+ MadsimServer::serve(server_addr.parse().unwrap())
+ .await
+ .unwrap();
+ });
+ sleep(Duration::from_secs(1)).await;
+
+ let handle = client.spawn(async move {
+ let mut builder = services::Fs::default();
+ builder.root(".");
+ let op = Operator::new(builder)
+ .unwrap()
+ .layer(MadsimLayer::new(server_addr))
+ .finish();
+
+ let path = "hello.txt";
+ let data = "Hello, World!";
+ op.write(path, data).await.unwrap();
+ assert_eq!(data.as_bytes(), op.read(path).await.unwrap());
+ });
+ handle.await.unwrap();
+ }
+}
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 956b9792..3b6b7756 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -62,3 +62,15 @@ pub(crate) use error_context::ErrorContextLayer;
mod complete;
pub(crate) use complete::CompleteLayer;
+
+#[cfg(feature = "layers-madsim")]
+#[cfg(madsim)]
+mod madsim;
+
+#[cfg(feature = "layers-madsim")]
+#[cfg(madsim)]
+pub use self::madsim::MadsimLayer;
+
+#[cfg(feature = "layers-madsim")]
+#[cfg(madsim)]
+pub use self::madsim::MadsimServer;