This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch python-api
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 43506af3afc5b37583ddd4d72077b73eb0a60a0b
Author: Xuanwo <[email protected]>
AuthorDate: Fri Nov 3 18:23:36 2023 +0800

    feat(bindings/python): Implement File and AsyncFile to replace Reader
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/python/python/opendal/__init__.pyi |  16 +-
 bindings/python/src/{reader.rs => file.rs}  | 291 ++++++++++++++++++----------
 bindings/python/src/lib.rs                  |  10 +-
 bindings/python/src/operator.rs             |  71 ++++---
 bindings/python/src/utils.rs                |  30 ++-
 bindings/python/tests/test_read.py          |  43 ++++
 6 files changed, 316 insertions(+), 145 deletions(-)

diff --git a/bindings/python/python/opendal/__init__.pyi 
b/bindings/python/python/opendal/__init__.pyi
index 2c33057e3..65ae74a21 100644
--- a/bindings/python/python/opendal/__init__.pyi
+++ b/bindings/python/python/opendal/__init__.pyi
@@ -22,8 +22,8 @@ class Error(Exception): ...
 class Operator:
     def __init__(self, scheme: str, **kwargs): ...
     def layer(self, layer: Layer): ...
+    def open(self, path: str, mode: str) -> File: ...
     def read(self, path: str) -> memoryview: ...
-    def open_reader(self, path: str) -> Reader: ...
     def write(
         self,
         path: str,
@@ -47,8 +47,8 @@ class Operator:
 class AsyncOperator:
     def __init__(self, scheme: str, **kwargs): ...
     def layer(self, layer: Layer): ...
+    async def open(self, path: str, mode: str) -> AsyncFile: ...
     async def read(self, path: str) -> memoryview: ...
-    def open_reader(self, path: str) -> AsyncReader: ...
     async def write(
         self,
         path: str,
@@ -74,18 +74,22 @@ class AsyncOperator:
     async def rename(self, source: str, target: str): ...
     async def remove_all(self, path: str): ...
 
-class Reader:
+class File:
     def read(self, size: Optional[int] = None) -> memoryview: ...
+    def write(self, bs: bytes): ...
     def seek(self, offset: int, whence: int = 0) -> int: ...
     def tell(self) -> int: ...
-    def __enter__(self) -> Reader: ...
+    def close(self): ...
+    def __enter__(self) -> File: ...
     def __exit__(self, exc_type, exc_value, traceback) -> None: ...
 
-class AsyncReader:
+class AsyncFile:
     async def read(self, size: Optional[int] = None) -> memoryview: ...
+    async def write(self, bs: bytes): ...
     async def seek(self, offset: int, whence: int = 0) -> int: ...
     async def tell(self) -> int: ...
-    def __aenter__(self) -> AsyncReader: ...
+    async def close(self): ...
+    def __aenter__(self) -> AsyncFile: ...
     def __aexit__(self, exc_type, exc_value, traceback) -> None: ...
 
 class Entry:
diff --git a/bindings/python/src/reader.rs b/bindings/python/src/file.rs
similarity index 50%
rename from bindings/python/src/reader.rs
rename to bindings/python/src/file.rs
index 584f21932..8db6452b3 100644
--- a/bindings/python/src/reader.rs
+++ b/bindings/python/src/file.rs
@@ -15,49 +15,62 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::Read;
 use std::io::Seek;
 use std::io::SeekFrom;
+use std::io::{Read, Write};
+use std::ops::DerefMut;
 use std::sync::Arc;
 
-use futures::AsyncReadExt;
 use futures::AsyncSeekExt;
+use futures::{AsyncReadExt, AsyncWriteExt};
 use pyo3::exceptions::PyIOError;
-use pyo3::exceptions::PyNotImplementedError;
 use pyo3::exceptions::PyValueError;
-use pyo3::ffi;
 use pyo3::prelude::*;
-use pyo3::AsPyPointer;
 use pyo3_asyncio::tokio::future_into_py;
 use tokio::sync::Mutex;
 
 use crate::*;
 
-/// A file-like blocking reader.
+/// A file-like object.
 /// Can be used as a context manager.
 #[pyclass(module = "opendal")]
-pub struct Reader(Option<ocore::BlockingReader>);
+pub struct File(FileState);
 
-impl Reader {
-    pub fn new(reader: ocore::BlockingReader) -> Self {
-        Self(Some(reader))
+enum FileState {
+    Reader(ocore::BlockingReader),
+    Writer(ocore::BlockingWriter),
+    Closed,
+}
+
+impl File {
+    pub fn new_reader(reader: ocore::BlockingReader) -> Self {
+        Self(FileState::Reader(reader))
     }
 
-    fn as_mut(&mut self) -> PyResult<&mut ocore::BlockingReader> {
-        let reader = self
-            .0
-            .as_mut()
-            .ok_or_else(|| PyValueError::new_err("I/O operation on closed 
file."))?;
-        Ok(reader)
+    pub fn new_writer(writer: ocore::BlockingWriter) -> Self {
+        Self(FileState::Writer(writer))
     }
 }
 
 #[pymethods]
-impl Reader {
+impl File {
     /// Read and return size bytes, or if size is not given, until EOF.
     #[pyo3(signature = (size=None,))]
     pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> 
PyResult<&'p PyAny> {
-        let reader = self.as_mut()?;
+        let reader = match &mut self.0 {
+            FileState::Reader(r) => r,
+            FileState::Writer(_) => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on write only file.",
+                ));
+            }
+            FileState::Closed => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on closed file.",
+                ));
+            }
+        };
+
         let buffer = match size {
             Some(size) => {
                 let mut buffer = vec![0; size];
@@ -74,18 +87,29 @@ impl Reader {
                 buffer
             }
         };
-        let buffer = Buffer::from(buffer).into_py(py);
-        let memoryview =
-            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
-        Ok(memoryview)
+
+        Buffer::new(buffer).into_memory_view_ref(py)
     }
 
-    /// `Reader` doesn't support write.
-    /// Raises a `NotImplementedError` if called.
-    pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> {
-        Err(PyNotImplementedError::new_err(
-            "Reader does not support write",
-        ))
+    /// Write bytes into the file.
+    pub fn write(&mut self, bs: &[u8]) -> PyResult<()> {
+        let writer = match &mut self.0 {
+            FileState::Reader(_) => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on read only file.",
+                ));
+            }
+            FileState::Writer(w) => w,
+            FileState::Closed => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on closed file.",
+                ));
+            }
+        };
+
+        writer
+            .write_all(bs)
+            .map_err(|err| PyIOError::new_err(err.to_string()))
     }
 
     /// Change the stream position to the given byte offset.
@@ -99,13 +123,27 @@ impl Reader {
     /// Return the new absolute position.
     #[pyo3(signature = (pos, whence = 0))]
     pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
+        let reader = match &mut self.0 {
+            FileState::Reader(r) => r,
+            FileState::Writer(_) => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on write only file.",
+                ));
+            }
+            FileState::Closed => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on closed file.",
+                ));
+            }
+        };
+
         let whence = match whence {
             0 => SeekFrom::Start(pos as u64),
             1 => SeekFrom::Current(pos),
             2 => SeekFrom::End(pos),
             _ => return Err(PyValueError::new_err("invalid whence")),
         };
-        let reader = self.as_mut()?;
+
         reader
             .seek(whence)
             .map_err(|err| PyIOError::new_err(err.to_string()))
@@ -113,74 +151,82 @@ impl Reader {
 
     /// Return the current stream position.
     pub fn tell(&mut self) -> PyResult<u64> {
-        let reader = self.as_mut()?;
+        let reader = match &mut self.0 {
+            FileState::Reader(r) => r,
+            FileState::Writer(_) => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on write only file.",
+                ));
+            }
+            FileState::Closed => {
+                return Err(PyIOError::new_err(
+                    "I/O operation failed for reading on closed file.",
+                ));
+            }
+        };
+
         reader
             .stream_position()
             .map_err(|err| PyIOError::new_err(err.to_string()))
     }
 
+    fn close(&mut self) -> PyResult<()> {
+        self.0 = FileState::Closed;
+        Ok(())
+    }
+
     pub fn __enter__(slf: Py<Self>) -> Py<Self> {
         slf
     }
 
     pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, 
_traceback: PyObject) {
-        drop(self.0.take());
+        self.0 = FileState::Closed;
     }
 }
 
-enum ReaderState {
-    Init {
-        operator: ocore::Operator,
-        path: String,
-    },
-    Open(ocore::Reader),
+/// A file-like async reader.
+/// Can be used as an async context manager.
+#[pyclass(module = "opendal")]
+pub struct AsyncFile(Arc<Mutex<AsyncFileState>>);
+
+enum AsyncFileState {
+    Reader(ocore::Reader),
+    Writer(ocore::Writer),
     Closed,
 }
 
-impl ReaderState {
-    async fn reader(&mut self) -> PyResult<&mut ocore::Reader> {
-        let reader = match self {
-            ReaderState::Init { operator, path } => {
-                let reader = 
operator.reader(path).await.map_err(format_pyerr)?;
-                *self = ReaderState::Open(reader);
-                if let ReaderState::Open(ref mut reader) = self {
-                    reader
-                } else {
-                    unreachable!()
-                }
-            }
-            ReaderState::Open(ref mut reader) => reader,
-            ReaderState::Closed => {
-                return Err(PyValueError::new_err("I/O operation on closed 
file."));
-            }
-        };
-        Ok(reader)
+impl AsyncFile {
+    pub fn new_reader(reader: ocore::Reader) -> Self {
+        Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader))))
     }
 
-    fn close(&mut self) {
-        *self = ReaderState::Closed;
-    }
-}
-
-/// A file-like async reader.
-/// Can be used as an async context manager.
-#[pyclass(module = "opendal")]
-pub struct AsyncReader(Arc<Mutex<ReaderState>>);
-
-impl AsyncReader {
-    pub fn new(operator: ocore::Operator, path: String) -> Self {
-        Self(Arc::new(Mutex::new(ReaderState::Init { operator, path })))
+    pub fn new_writer(writer: ocore::Writer) -> Self {
+        Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))))
     }
 }
 
 #[pymethods]
-impl AsyncReader {
+impl AsyncFile {
     /// Read and return size bytes, or if size is not given, until EOF.
     pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> 
PyResult<&'p PyAny> {
-        let reader = self.0.clone();
+        let state = self.0.clone();
+
         future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            let reader = state.reader().await?;
+            let mut guard = state.lock().await;
+            let reader = match guard.deref_mut() {
+                AsyncFileState::Reader(r) => r,
+                AsyncFileState::Writer(_) => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on write only file.",
+                    ));
+                }
+                _ => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on closed file.",
+                    ));
+                }
+            };
+
             let buffer = match size {
                 Some(size) => {
                     let mut buffer = vec![0; size];
@@ -199,25 +245,38 @@ impl AsyncReader {
                     buffer
                 }
             };
-            Python::with_gil(|py| {
-                let buffer = Buffer::from(buffer).into_py(py);
-                unsafe {
-                    PyObject::from_owned_ptr_or_err(
-                        py,
-                        ffi::PyMemoryView_FromObject(buffer.as_ptr()),
-                    )
-                }
-            })
+
+            Python::with_gil(|py| Buffer::new(buffer).into_memory_view(py))
         })
     }
 
-    /// `AsyncReader` doesn't support write.
-    /// Raises a `NotImplementedError` if called.
-    pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) -> 
PyResult<&'p PyAny> {
-        future_into_py::<_, PyObject>(py, async move {
-            Err(PyNotImplementedError::new_err(
-                "AsyncReader does not support write",
-            ))
+    /// Write bytes into the file.
+    pub fn write<'p>(&'p mut self, py: Python<'p>, bs: &'p [u8]) -> 
PyResult<&'p PyAny> {
+        let state = self.0.clone();
+
+        // FIXME: can we avoid this clone?
+        let bs = bs.to_vec();
+
+        future_into_py(py, async move {
+            let mut guard = state.lock().await;
+            let writer = match guard.deref_mut() {
+                AsyncFileState::Reader(_) => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on read only file.",
+                    ));
+                }
+                AsyncFileState::Writer(w) => w,
+                _ => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on closed file.",
+                    ));
+                }
+            };
+
+            writer
+                .write_all(&bs)
+                .await
+                .map_err(|err| PyIOError::new_err(err.to_string()))
         })
     }
 
@@ -232,16 +291,31 @@ impl AsyncReader {
     /// Return the new absolute position.
     #[pyo3(signature = (pos, whence = 0))]
     pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> 
PyResult<&'p PyAny> {
+        let state = self.0.clone();
+
         let whence = match whence {
             0 => SeekFrom::Start(pos as u64),
             1 => SeekFrom::Current(pos),
             2 => SeekFrom::End(pos),
             _ => return Err(PyValueError::new_err("invalid whence")),
         };
-        let reader = self.0.clone();
+
         future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            let reader = state.reader().await?;
+            let mut guard = state.lock().await;
+            let reader = match guard.deref_mut() {
+                AsyncFileState::Reader(r) => r,
+                AsyncFileState::Writer(_) => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on write only file.",
+                    ));
+                }
+                _ => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on closed file.",
+                    ));
+                }
+            };
+
             let ret = reader
                 .seek(whence)
                 .await
@@ -252,10 +326,24 @@ impl AsyncReader {
 
     /// Return the current stream position.
     pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
-        let reader = self.0.clone();
+        let state = self.0.clone();
+
         future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            let reader = state.reader().await?;
+            let mut guard = state.lock().await;
+            let reader = match guard.deref_mut() {
+                AsyncFileState::Reader(r) => r,
+                AsyncFileState::Writer(_) => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on write only file.",
+                    ));
+                }
+                _ => {
+                    return Err(PyIOError::new_err(
+                        "I/O operation failed for reading on closed file.",
+                    ));
+                }
+            };
+
             let pos = reader
                 .stream_position()
                 .await
@@ -264,6 +352,15 @@ impl AsyncReader {
         })
     }
 
+    fn close<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
+        let state = self.0.clone();
+        future_into_py(py, async move {
+            let mut state = state.lock().await;
+            *state = AsyncFileState::Closed;
+            Ok(())
+        })
+    }
+
     fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a 
PyAny> {
         let slf = slf.into_py(py);
         future_into_py(py, async move { Ok(slf) })
@@ -276,10 +373,10 @@ impl AsyncReader {
         _exc_value: &'a PyAny,
         _traceback: &'a PyAny,
     ) -> PyResult<&'a PyAny> {
-        let reader = self.0.clone();
+        let state = self.0.clone();
         future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            state.close();
+            let mut state = state.lock().await;
+            *state = AsyncFileState::Closed;
             Ok(())
         })
     }
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 1903cf424..a9c41fc00 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -33,8 +33,8 @@ mod metadata;
 pub use metadata::*;
 mod operator;
 pub use operator::*;
-mod reader;
-pub use reader::*;
+mod file;
+pub use file::*;
 mod utils;
 pub use utils::*;
 
@@ -72,9 +72,11 @@ pub use utils::*;
 #[pymodule]
 fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
     m.add_class::<Operator>()?;
-    m.add_class::<Reader>()?;
     m.add_class::<AsyncOperator>()?;
-    m.add_class::<AsyncReader>()?;
+
+    m.add_class::<File>()?;
+    m.add_class::<AsyncFile>()?;
+
     m.add_class::<Entry>()?;
     m.add_class::<EntryMode>()?;
     m.add_class::<Metadata>()?;
diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs
index 0c17b2c31..5bc95a2cc 100644
--- a/bindings/python/src/operator.rs
+++ b/bindings/python/src/operator.rs
@@ -20,10 +20,8 @@ use std::str::FromStr;
 use std::time::Duration;
 
 use pyo3::exceptions::PyValueError;
-use pyo3::ffi;
 use pyo3::prelude::*;
 use pyo3::types::{PyBytes, PyDict};
-use pyo3::AsPyPointer;
 use pyo3_asyncio::tokio::future_into_py;
 
 use crate::*;
@@ -77,24 +75,27 @@ impl Operator {
         Ok(Self(op.blocking()))
     }
 
-    /// Read the whole path into bytes.
-    pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p 
PyAny> {
-        let buffer = self
-            .0
-            .read(path)
-            .map_err(format_pyerr)
-            .map(Buffer::from)?
-            .into_py(py);
-        let memoryview =
-            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
-        Ok(memoryview)
-    }
-
     /// Open a file-like reader for the given path.
-    pub fn open_reader(&self, path: &str) -> PyResult<Reader> {
-        let r = self.0.reader(path).map_err(format_pyerr)?;
+    pub fn open(&self, path: String, mode: String) -> PyResult<File> {
+        let this = self.0.clone();
 
-        Ok(Reader::new(r))
+        if mode == "rb" {
+            let r = this.reader(&path).map_err(format_pyerr)?;
+            Ok(File::new_reader(r))
+        } else if mode == "wb" {
+            let w = this.writer(&path).map_err(format_pyerr)?;
+            Ok(File::new_writer(w))
+        } else {
+            Err(Error::new_err(format!(
+                "OpenDAL doesn't support mode: {mode}"
+            )))
+        }
+    }
+
+    /// Read the whole path into bytes.
+    pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p 
PyAny> {
+        let buffer = self.0.read(path).map_err(format_pyerr)?;
+        Buffer::new(buffer).into_memory_view_ref(py)
     }
 
     /// Write bytes into given path.
@@ -232,28 +233,34 @@ impl AsyncOperator {
         Ok(Self(op))
     }
 
+    /// Open a file-like reader for the given path.
+    pub fn open<'p>(&'p self, py: Python<'p>, path: String, mode: String) -> 
PyResult<&'p PyAny> {
+        let this = self.0.clone();
+
+        future_into_py(py, async move {
+            if mode == "rb" {
+                let r = this.reader(&path).await.map_err(format_pyerr)?;
+                Ok(AsyncFile::new_reader(r))
+            } else if mode == "wb" {
+                let w = this.writer(&path).await.map_err(format_pyerr)?;
+                Ok(AsyncFile::new_writer(w))
+            } else {
+                Err(Error::new_err(format!(
+                    "OpenDAL doesn't support mode: {mode}"
+                )))
+            }
+        })
+    }
+
     /// Read the whole path into bytes.
     pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p 
PyAny> {
         let this = self.0.clone();
         future_into_py(py, async move {
             let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?;
-            Python::with_gil(|py| {
-                let buffer = Buffer::from(res).into_py(py);
-                unsafe {
-                    PyObject::from_owned_ptr_or_err(
-                        py,
-                        ffi::PyMemoryView_FromObject(buffer.as_ptr()),
-                    )
-                }
-            })
+            Python::with_gil(|py| Buffer::new(res).into_memory_view(py))
         })
     }
 
-    /// Open a file-like reader for the given path.
-    pub fn open_reader(&self, path: String) -> PyResult<AsyncReader> {
-        Ok(AsyncReader::new(self.0.clone(), path))
-    }
-
     /// Write bytes into given path.
     #[pyo3(signature = (path, bs, **kwargs))]
     pub fn write<'p>(
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 328499910..24cbca9da 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -37,6 +37,30 @@ pub struct Buffer {
     inner: Vec<u8>,
 }
 
+impl Buffer {
+    pub fn new(inner: Vec<u8>) -> Self {
+        Buffer { inner }
+    }
+
+    /// Consume self to build a memory view
+    pub fn into_memory_view(self, py: Python) -> PyResult<Py<PyAny>> {
+        let buffer = self.into_py(py);
+
+        unsafe {
+            PyObject::from_owned_ptr_or_err(py, 
ffi::PyMemoryView_FromObject(buffer.as_ptr()))
+        }
+    }
+
+    /// Consume self to build a memory view ref.
+    pub fn into_memory_view_ref(self, py: Python) -> PyResult<&PyAny> {
+        let buffer = self.into_py(py);
+        let view =
+            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
+
+        Ok(view)
+    }
+}
+
 #[pymethods]
 impl Buffer {
     unsafe fn __getbuffer__(
@@ -60,12 +84,6 @@ impl Buffer {
     }
 }
 
-impl From<Vec<u8>> for Buffer {
-    fn from(inner: Vec<u8>) -> Self {
-        Self { inner }
-    }
-}
-
 pub fn format_pyerr(err: ocore::Error) -> PyErr {
     use ocore::ErrorKind::*;
     match err.kind() {
diff --git a/bindings/python/tests/test_read.py 
b/bindings/python/tests/test_read.py
index 9c318943c..95cde619e 100644
--- a/bindings/python/tests/test_read.py
+++ b/bindings/python/tests/test_read.py
@@ -35,6 +35,19 @@ def test_sync_read(service_name, operator, async_operator):
 
     operator.delete(filename)
 
[email protected]_capability("read", "write", "delete")
+def test_sync_reader(service_name, operator, async_operator):
+    size = randint(1, 1024)
+    filename = f"random_file_{str(uuid4())}"
+    content = os.urandom(size)
+    operator.write(filename, content)
+
+    with operator.open(filename, "rb") as reader:
+        read_content = reader.read()
+        assert read_content is not None
+        assert read_content == content
+
+    operator.delete(filename)
 
 @pytest.mark.asyncio
 @pytest.mark.need_capability("read", "write", "delete")
@@ -50,6 +63,36 @@ async def test_async_read(service_name, operator, 
async_operator):
 
     await async_operator.delete(filename)
 
[email protected]
[email protected]_capability("read", "write", "delete")
+async def test_async_reader(service_name, operator, async_operator):
+    size = randint(1, 1024)
+    filename = f"random_file_{str(uuid4())}"
+    content = os.urandom(size)
+    await async_operator.write(filename, content)
+
+    async with await async_operator.open(filename, "rb") as reader:
+        read_content = await reader.read()
+        assert read_content is not None
+        assert read_content == content
+
+    await async_operator.delete(filename)
+
[email protected]
[email protected]_capability("read", "write", "delete")
+async def test_async_reader_without_context(service_name, operator, 
async_operator):
+    size = randint(1, 1024)
+    filename = f"random_file_{str(uuid4())}"
+    content = os.urandom(size)
+    await async_operator.write(filename, content)
+
+    reader = await async_operator.open(filename, "rb")
+    read_content = await reader.read()
+    assert read_content is not None
+    assert read_content == content
+    await reader.close()
+
+    await async_operator.delete(filename)
 
 @pytest.mark.need_capability("read", "write", "delete", "stat")
 def test_sync_read_stat(service_name, operator, async_operator):

Reply via email to