This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 59c1cce8 feat(layer): add madsim layer (#2006)
59c1cce8 is described below

commit 59c1cce881d4f1d9009b13b7c5a0597b3d79b418
Author: Sky Fan <[email protected]>
AuthorDate: Tue Apr 18 16:57:51 2023 +0800

    feat(layer): add madsim layer (#2006)
    
    * fix typo
    
    * ,
    
    * archive
    
    * skeleton
    
    * fmt
    
    * panic in blocking func
    
    * add license header
    
    * fmt
    
    * add doc
    
    * delete unrelated changes
    
    * refactor: madsim as a feature, delete real mod
    
    * fix conflict
    
    * delete useless modification
    
    * update madsim version
    
    * mark blocking func as not supported
    
    * fix details
    
    * rename struct
    
    * throw error, rename var, set cap
---
 core/Cargo.toml           |   4 +
 core/src/layers/madsim.rs | 419 ++++++++++++++++++++++++++++++++++++++++++++++
 core/src/layers/mod.rs    |  12 ++
 3 files changed, 435 insertions(+)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index f8b3ec78..cb1590d5 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -74,6 +74,7 @@ layers-all = [
   "layers-prometheus",
   "layers-tracing",
   "layers-minitrace",
+  "layers-madsim"
 ]
 # Enable layers chaos support
 layers-chaos = ["dep:rand"]
@@ -81,6 +82,8 @@ layers-chaos = ["dep:rand"]
 layers-metrics = ["dep:metrics"]
 # Enable layers prometheus support
 layers-prometheus = ["dep:prometheus"]
+# Enable layers madsim support
+layers-madsim = ["dep:madsim"]
 # Enable layers minitrace support.
 layers-minitrace = ["dep:minitrace"]
 # Enable layers tracing support.
@@ -163,6 +166,7 @@ http = "0.2.5"
 hyper = "0.14"
 lazy-regex = { version = "2.5.0", optional = true }
 log = "0.4"
+madsim = {version = "0.2.21", optional = true }
 md-5 = "0.10"
 metrics = { version = "0.20", optional = true }
 minitrace = { version = "0.4.0", optional = true }
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
new file mode 100644
index 00000000..b76ebc94
--- /dev/null
+++ b/core/src/layers/madsim.rs
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ops::{OpList, OpRead, OpScan, OpWrite};
+use crate::raw::oio::Entry;
+use crate::raw::AccessorCapability;
+use crate::raw::AccessorInfo;
+use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, 
RpScan, RpWrite};
+use crate::types::Error;
+use crate::types::ErrorKind;
+use async_trait::async_trait;
+use bytes::Bytes;
+use madsim::net::Endpoint;
+use madsim::net::Payload;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Result;
+use std::io::SeekFrom;
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::task::{Context, Poll};
+
+/// Add deterministic simulation for async operations, powered by 
[`madsim`](https://docs.rs/madsim/latest/madsim/).
+///
+/// # Note
+///
+/// - blocking operations are not supported, as 
[`madsim`](https://docs.rs/madsim/latest/madsim/) is async only.
+///
+///
+/// # Examples
+///
+/// ```
+/// use opendal::Operator;
+/// use opendal::services;
+/// use opendal::layers::MadsimLayer;
+/// use opendal::layers::MadsimServer;
+/// use madsim::{net::NetSim, runtime::Handle, time::sleep};
+/// use std::time::Duration;
+///
+/// #[cfg(madsim)]
+/// #[madsim::test]
+/// async fn deterministic_simulation_test(){
+///     let handle = Handle::current();
+///     let ip1 = "10.0.0.1".parse().unwrap();
+///     let ip2 = "10.0.0.2".parse().unwrap();
+///     let server_addr = "10.0.0.1:2379".parse().unwrap();
+///     let server = handle.create_node().name("server").ip(ip1).build();
+///     let client = handle.create_node().name("client").ip(ip2).build();
+///
+///     server.spawn(async move {
+///          SimServer::serve(server_addr).await.unwrap();
+///     });
+///     sleep(Duration::from_secs(1)).await;
+///
+///     let handle = client.spawn(async move {
+///     let mut builder = services::Fs::default();
+///     builder.root(".");
+///     let op = Operator::new(builder)
+///         .unwrap()
+///         .layer(MadsimLayer::new(server_addr))
+///         .finish();
+///
+///          let path = "hello.txt";
+///          let data = "Hello, World!";
+///          op.write(path, data).await.unwrap();
+///          assert_eq!(data.as_bytes(), op.read(path).await.unwrap());
+///      });
+///      handle.await.unwrap();
+/// }
+/// ```
+/// To enable logging output, please set `RUSTFLAGS="--cfg madsim"`:
+/// ```shell
+/// RUSTFLAGS="--cfg madsim" cargo test
+/// ```
+#[derive(Debug, Copy, Clone)]
+pub struct MadsimLayer {
+    addr: SocketAddr,
+}
+
+impl MadsimLayer {
+    pub fn new(endpoint: &str) -> Self {
+        Self {
+            addr: endpoint.parse().unwrap(),
+        }
+    }
+}
+
+impl<A: Accessor> Layer<A> for MadsimLayer {
+    type LayeredAccessor = MadsimAccessor;
+
+    fn layer(&self, _: A) -> Self::LayeredAccessor {
+        MadsimAccessor { addr: self.addr }
+    }
+}
+
+#[derive(Debug)]
+pub struct MadsimAccessor {
+    addr: SocketAddr,
+}
+
+#[async_trait]
+impl LayeredAccessor for MadsimAccessor {
+    type Inner = ();
+    type Reader = MadsimReader;
+    type BlockingReader = ();
+    type Writer = MadsimWriter;
+    type BlockingWriter = ();
+    type Pager = MadsimPager;
+    type BlockingPager = ();
+
+    fn inner(&self) -> &Self::Inner {
+        &()
+    }
+
+    fn metadata(&self) -> AccessorInfo {
+        let mut info = AccessorInfo::default();
+        info.set_name("madsim");
+        info.set_capabilities(AccessorCapability::Read | 
AccessorCapability::Write);
+        info
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, 
Self::Reader)> {
+        let req = Request::Read(path.to_string(), args);
+        let ep = Endpoint::connect(self.addr)
+            .await
+            .expect("fail to connect to sim server");
+        let (tx, mut rx) = ep
+            .connect1(self.addr)
+            .await
+            .expect("fail to connect1 to sim server");
+        tx.send(Box::new(req))
+            .await
+            .expect("fail to send request to sim server");
+        let resp = rx
+            .recv()
+            .await
+            .expect("fail to recv response from sim server");
+        let resp = resp
+            .downcast::<ReadResponse>()
+            .expect("fail to downcast response to ReadResponse");
+        let content_length = resp.data.as_ref().map(|b| b.len()).unwrap_or(0);
+        Ok((
+            RpRead::new(content_length as u64),
+            MadsimReader { data: resp.data },
+        ))
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> 
crate::Result<(RpWrite, Self::Writer)> {
+        Ok((
+            RpWrite::default(),
+            MadsimWriter {
+                path: path.to_string(),
+                args,
+                addr: self.addr,
+            },
+        ))
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, 
Self::Pager)> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        ))
+    }
+
+    async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan, 
Self::Pager)> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        ))
+    }
+
+    fn blocking_read(
+        &self,
+        path: &str,
+        args: OpRead,
+    ) -> crate::Result<(RpRead, Self::BlockingReader)> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will not be supported in MadsimLayer",
+        ))
+    }
+
+    fn blocking_write(
+        &self,
+        path: &str,
+        args: OpWrite,
+    ) -> crate::Result<(RpWrite, Self::BlockingWriter)> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will not be supported in MadsimLayer",
+        ))
+    }
+
+    fn blocking_list(
+        &self,
+        path: &str,
+        args: OpList,
+    ) -> crate::Result<(RpList, Self::BlockingPager)> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will not be supported in MadsimLayer",
+        ))
+    }
+
+    fn blocking_scan(
+        &self,
+        path: &str,
+        args: OpScan,
+    ) -> crate::Result<(RpScan, Self::BlockingPager)> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will not be supported in MadsimLayer",
+        ))
+    }
+}
+
+pub struct MadsimReader {
+    data: Option<Bytes>,
+}
+
+impl oio::Read for MadsimReader {
+    fn poll_read(&mut self, _cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<crate::Result<usize>> {
+        if let Some(ref data) = self.data {
+            let len = data.len();
+            buf[..len].copy_from_slice(data);
+            Poll::Ready(Ok(len))
+        } else {
+            Poll::Ready(Ok(0))
+        }
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<crate::Result<u64>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        )))
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<crate::Result<Bytes>>> {
+        Poll::Ready(Some(Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        ))))
+    }
+}
+
+pub struct MadsimWriter {
+    path: String,
+    args: OpWrite,
+    addr: SocketAddr,
+}
+
+#[async_trait]
+impl oio::Write for MadsimWriter {
+    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
+        let req = Request::Write(self.path.to_string(), bs);
+        let ep = Endpoint::connect(self.addr).await?;
+        let (tx, mut rx) = ep.connect1(self.addr).await?;
+        tx.send(Box::new(req)).await?;
+        rx.recv().await?;
+        Ok(())
+    }
+
+    async fn append(&mut self, bs: Bytes) -> crate::Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        ))
+    }
+
+    async fn abort(&mut self) -> crate::Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        ))
+    }
+
+    async fn close(&mut self) -> crate::Result<()> {
+        Ok(())
+    }
+}
+
+pub struct MadsimPager {}
+
+#[async_trait]
+impl oio::Page for MadsimPager {
+    async fn next(&mut self) -> crate::Result<Option<Vec<Entry>>> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "will be supported in the future",
+        ))
+    }
+}
+
+impl From<std::io::Error> for Error {
+    fn from(e: std::io::Error) -> Self {
+        Error::new(ErrorKind::Unexpected, "madsim error")
+    }
+}
+
+/// A simulated server.This an experimental feature, docs are not ready yet.
+#[derive(Default, Clone)]
+pub struct MadsimServer;
+
+impl MadsimServer {
+    pub async fn serve(addr: SocketAddr) -> Result<()> {
+        let ep = Endpoint::bind(addr).await?;
+        let service = Arc::new(SimService::default());
+        loop {
+            let (tx, mut rx, _) = ep.accept1().await?;
+            let service = service.clone();
+            madsim::task::spawn(async move {
+                let request = *rx
+                    .recv()
+                    .await?
+                    .downcast::<Request>()
+                    .expect("invalid request");
+                let response = match request {
+                    Request::Read(path, args) => {
+                        Box::new(service.read(&path, args).await) as Payload
+                    }
+                    Request::Write(path, args) => {
+                        Box::new(service.write(&path, args).await) as Payload
+                    }
+                };
+                tx.send(response).await?;
+                Ok(()) as Result<()>
+            });
+        }
+    }
+}
+
+enum Request {
+    Read(String, OpRead),
+    Write(String, Bytes),
+}
+
+#[derive(Default)]
+pub struct SimService {
+    inner: Mutex<HashMap<String, Bytes>>,
+}
+
+impl SimService {
+    async fn read(&self, path: &str, args: OpRead) -> ReadResponse {
+        let inner = self.inner.lock().unwrap();
+        let data = inner.get(path);
+        ReadResponse {
+            data: data.cloned(),
+        }
+    }
+
+    async fn write(&self, path: &str, data: Bytes) -> WriteResponse {
+        let mut inner = self.inner.lock().unwrap();
+        inner.insert(path.to_string(), data);
+        WriteResponse {}
+    }
+}
+
+struct ReadResponse {
+    data: Option<Bytes>,
+}
+
+struct WriteResponse {}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::{services, Operator};
+    use madsim::{runtime::Handle, time::sleep};
+    use std::time::Duration;
+
+    #[madsim::test]
+    async fn test_madsim_layer() {
+        let handle = Handle::current();
+        let ip1 = "10.0.0.1".parse().unwrap();
+        let ip2 = "10.0.0.2".parse().unwrap();
+        let server_addr = "10.0.0.1:2379";
+        let server = handle.create_node().name("server").ip(ip1).build();
+        let client = handle.create_node().name("client").ip(ip2).build();
+
+        server.spawn(async move {
+            MadsimServer::serve(server_addr.parse().unwrap())
+                .await
+                .unwrap();
+        });
+        sleep(Duration::from_secs(1)).await;
+
+        let handle = client.spawn(async move {
+            let mut builder = services::Fs::default();
+            builder.root(".");
+            let op = Operator::new(builder)
+                .unwrap()
+                .layer(MadsimLayer::new(server_addr))
+                .finish();
+
+            let path = "hello.txt";
+            let data = "Hello, World!";
+            op.write(path, data).await.unwrap();
+            assert_eq!(data.as_bytes(), op.read(path).await.unwrap());
+        });
+        handle.await.unwrap();
+    }
+}
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 956b9792..3b6b7756 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -62,3 +62,15 @@ pub(crate) use error_context::ErrorContextLayer;
 
 mod complete;
 pub(crate) use complete::CompleteLayer;
+
+#[cfg(feature = "layers-madsim")]
+#[cfg(madsim)]
+mod madsim;
+
+#[cfg(feature = "layers-madsim")]
+#[cfg(madsim)]
+pub use self::madsim::MadsimLayer;
+
+#[cfg(feature = "layers-madsim")]
+#[cfg(madsim)]
+pub use self::madsim::MadsimServer;

Reply via email to