This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch python-api in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 43506af3afc5b37583ddd4d72077b73eb0a60a0b Author: Xuanwo <[email protected]> AuthorDate: Fri Nov 3 18:23:36 2023 +0800 feat(bindings/python): Implement File and AsyncFile to replace Reader Signed-off-by: Xuanwo <[email protected]> --- bindings/python/python/opendal/__init__.pyi | 16 +- bindings/python/src/{reader.rs => file.rs} | 291 ++++++++++++++++++---------- bindings/python/src/lib.rs | 10 +- bindings/python/src/operator.rs | 71 ++++--- bindings/python/src/utils.rs | 30 ++- bindings/python/tests/test_read.py | 43 ++++ 6 files changed, 316 insertions(+), 145 deletions(-) diff --git a/bindings/python/python/opendal/__init__.pyi b/bindings/python/python/opendal/__init__.pyi index 2c33057e3..65ae74a21 100644 --- a/bindings/python/python/opendal/__init__.pyi +++ b/bindings/python/python/opendal/__init__.pyi @@ -22,8 +22,8 @@ class Error(Exception): ... class Operator: def __init__(self, scheme: str, **kwargs): ... def layer(self, layer: Layer): ... + def open(self, path: str, mode: str) -> File: ... def read(self, path: str) -> memoryview: ... - def open_reader(self, path: str) -> Reader: ... def write( self, path: str, @@ -47,8 +47,8 @@ class Operator: class AsyncOperator: def __init__(self, scheme: str, **kwargs): ... def layer(self, layer: Layer): ... + async def open(self, path: str, mode: str) -> AsyncFile: ... async def read(self, path: str) -> memoryview: ... - def open_reader(self, path: str) -> AsyncReader: ... async def write( self, path: str, @@ -74,18 +74,22 @@ class AsyncOperator: async def rename(self, source: str, target: str): ... async def remove_all(self, path: str): ... -class Reader: +class File: def read(self, size: Optional[int] = None) -> memoryview: ... + def write(self, bs: bytes): ... def seek(self, offset: int, whence: int = 0) -> int: ... def tell(self) -> int: ... - def __enter__(self) -> Reader: ... + def close(self): ... + def __enter__(self) -> File: ... def __exit__(self, exc_type, exc_value, traceback) -> None: ... -class AsyncReader: +class AsyncFile: async def read(self, size: Optional[int] = None) -> memoryview: ... + async def write(self, bs: bytes): ... async def seek(self, offset: int, whence: int = 0) -> int: ... async def tell(self) -> int: ... - def __aenter__(self) -> AsyncReader: ... + async def close(self): ... + def __aenter__(self) -> AsyncFile: ... def __aexit__(self, exc_type, exc_value, traceback) -> None: ... class Entry: diff --git a/bindings/python/src/reader.rs b/bindings/python/src/file.rs similarity index 50% rename from bindings/python/src/reader.rs rename to bindings/python/src/file.rs index 584f21932..8db6452b3 100644 --- a/bindings/python/src/reader.rs +++ b/bindings/python/src/file.rs @@ -15,49 +15,62 @@ // specific language governing permissions and limitations // under the License. -use std::io::Read; use std::io::Seek; use std::io::SeekFrom; +use std::io::{Read, Write}; +use std::ops::DerefMut; use std::sync::Arc; -use futures::AsyncReadExt; use futures::AsyncSeekExt; +use futures::{AsyncReadExt, AsyncWriteExt}; use pyo3::exceptions::PyIOError; -use pyo3::exceptions::PyNotImplementedError; use pyo3::exceptions::PyValueError; -use pyo3::ffi; use pyo3::prelude::*; -use pyo3::AsPyPointer; use pyo3_asyncio::tokio::future_into_py; use tokio::sync::Mutex; use crate::*; -/// A file-like blocking reader. +/// A file-like object. /// Can be used as a context manager. #[pyclass(module = "opendal")] -pub struct Reader(Option<ocore::BlockingReader>); +pub struct File(FileState); -impl Reader { - pub fn new(reader: ocore::BlockingReader) -> Self { - Self(Some(reader)) +enum FileState { + Reader(ocore::BlockingReader), + Writer(ocore::BlockingWriter), + Closed, +} + +impl File { + pub fn new_reader(reader: ocore::BlockingReader) -> Self { + Self(FileState::Reader(reader)) } - fn as_mut(&mut self) -> PyResult<&mut ocore::BlockingReader> { - let reader = self - .0 - .as_mut() - .ok_or_else(|| PyValueError::new_err("I/O operation on closed file."))?; - Ok(reader) + pub fn new_writer(writer: ocore::BlockingWriter) -> Self { + Self(FileState::Writer(writer)) } } #[pymethods] -impl Reader { +impl File { /// Read and return size bytes, or if size is not given, until EOF. #[pyo3(signature = (size=None,))] pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> { - let reader = self.as_mut()?; + let reader = match &mut self.0 { + FileState::Reader(r) => r, + FileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + FileState::Closed => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + let buffer = match size { Some(size) => { let mut buffer = vec![0; size]; @@ -74,18 +87,29 @@ impl Reader { buffer } }; - let buffer = Buffer::from(buffer).into_py(py); - let memoryview = - unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; - Ok(memoryview) + + Buffer::new(buffer).into_memory_view_ref(py) } - /// `Reader` doesn't support write. - /// Raises a `NotImplementedError` if called. - pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> { - Err(PyNotImplementedError::new_err( - "Reader does not support write", - )) + /// Write bytes into the file. + pub fn write(&mut self, bs: &[u8]) -> PyResult<()> { + let writer = match &mut self.0 { + FileState::Reader(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on read only file.", + )); + } + FileState::Writer(w) => w, + FileState::Closed => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + + writer + .write_all(bs) + .map_err(|err| PyIOError::new_err(err.to_string())) } /// Change the stream position to the given byte offset. @@ -99,13 +123,27 @@ impl Reader { /// Return the new absolute position. #[pyo3(signature = (pos, whence = 0))] pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> { + let reader = match &mut self.0 { + FileState::Reader(r) => r, + FileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + FileState::Closed => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + let whence = match whence { 0 => SeekFrom::Start(pos as u64), 1 => SeekFrom::Current(pos), 2 => SeekFrom::End(pos), _ => return Err(PyValueError::new_err("invalid whence")), }; - let reader = self.as_mut()?; + reader .seek(whence) .map_err(|err| PyIOError::new_err(err.to_string())) @@ -113,74 +151,82 @@ impl Reader { /// Return the current stream position. pub fn tell(&mut self) -> PyResult<u64> { - let reader = self.as_mut()?; + let reader = match &mut self.0 { + FileState::Reader(r) => r, + FileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + FileState::Closed => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + reader .stream_position() .map_err(|err| PyIOError::new_err(err.to_string())) } + fn close(&mut self) -> PyResult<()> { + self.0 = FileState::Closed; + Ok(()) + } + pub fn __enter__(slf: Py<Self>) -> Py<Self> { slf } pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, _traceback: PyObject) { - drop(self.0.take()); + self.0 = FileState::Closed; } } -enum ReaderState { - Init { - operator: ocore::Operator, - path: String, - }, - Open(ocore::Reader), +/// A file-like async reader. +/// Can be used as an async context manager. +#[pyclass(module = "opendal")] +pub struct AsyncFile(Arc<Mutex<AsyncFileState>>); + +enum AsyncFileState { + Reader(ocore::Reader), + Writer(ocore::Writer), Closed, } -impl ReaderState { - async fn reader(&mut self) -> PyResult<&mut ocore::Reader> { - let reader = match self { - ReaderState::Init { operator, path } => { - let reader = operator.reader(path).await.map_err(format_pyerr)?; - *self = ReaderState::Open(reader); - if let ReaderState::Open(ref mut reader) = self { - reader - } else { - unreachable!() - } - } - ReaderState::Open(ref mut reader) => reader, - ReaderState::Closed => { - return Err(PyValueError::new_err("I/O operation on closed file.")); - } - }; - Ok(reader) +impl AsyncFile { + pub fn new_reader(reader: ocore::Reader) -> Self { + Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader)))) } - fn close(&mut self) { - *self = ReaderState::Closed; - } -} - -/// A file-like async reader. -/// Can be used as an async context manager. -#[pyclass(module = "opendal")] -pub struct AsyncReader(Arc<Mutex<ReaderState>>); - -impl AsyncReader { - pub fn new(operator: ocore::Operator, path: String) -> Self { - Self(Arc::new(Mutex::new(ReaderState::Init { operator, path }))) + pub fn new_writer(writer: ocore::Writer) -> Self { + Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer)))) } } #[pymethods] -impl AsyncReader { +impl AsyncFile { /// Read and return size bytes, or if size is not given, until EOF. pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> { - let reader = self.0.clone(); + let state = self.0.clone(); + future_into_py(py, async move { - let mut state = reader.lock().await; - let reader = state.reader().await?; + let mut guard = state.lock().await; + let reader = match guard.deref_mut() { + AsyncFileState::Reader(r) => r, + AsyncFileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + _ => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + let buffer = match size { Some(size) => { let mut buffer = vec![0; size]; @@ -199,25 +245,38 @@ impl AsyncReader { buffer } }; - Python::with_gil(|py| { - let buffer = Buffer::from(buffer).into_py(py); - unsafe { - PyObject::from_owned_ptr_or_err( - py, - ffi::PyMemoryView_FromObject(buffer.as_ptr()), - ) - } - }) + + Python::with_gil(|py| Buffer::new(buffer).into_memory_view(py)) }) } - /// `AsyncReader` doesn't support write. - /// Raises a `NotImplementedError` if called. - pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) -> PyResult<&'p PyAny> { - future_into_py::<_, PyObject>(py, async move { - Err(PyNotImplementedError::new_err( - "AsyncReader does not support write", - )) + /// Write bytes into the file. + pub fn write<'p>(&'p mut self, py: Python<'p>, bs: &'p [u8]) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + + // FIXME: can we avoid this clone? + let bs = bs.to_vec(); + + future_into_py(py, async move { + let mut guard = state.lock().await; + let writer = match guard.deref_mut() { + AsyncFileState::Reader(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on read only file.", + )); + } + AsyncFileState::Writer(w) => w, + _ => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + + writer + .write_all(&bs) + .await + .map_err(|err| PyIOError::new_err(err.to_string())) }) } @@ -232,16 +291,31 @@ impl AsyncReader { /// Return the new absolute position. #[pyo3(signature = (pos, whence = 0))] pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + let whence = match whence { 0 => SeekFrom::Start(pos as u64), 1 => SeekFrom::Current(pos), 2 => SeekFrom::End(pos), _ => return Err(PyValueError::new_err("invalid whence")), }; - let reader = self.0.clone(); + future_into_py(py, async move { - let mut state = reader.lock().await; - let reader = state.reader().await?; + let mut guard = state.lock().await; + let reader = match guard.deref_mut() { + AsyncFileState::Reader(r) => r, + AsyncFileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + _ => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + let ret = reader .seek(whence) .await @@ -252,10 +326,24 @@ impl AsyncReader { /// Return the current stream position. pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> { - let reader = self.0.clone(); + let state = self.0.clone(); + future_into_py(py, async move { - let mut state = reader.lock().await; - let reader = state.reader().await?; + let mut guard = state.lock().await; + let reader = match guard.deref_mut() { + AsyncFileState::Reader(r) => r, + AsyncFileState::Writer(_) => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on write only file.", + )); + } + _ => { + return Err(PyIOError::new_err( + "I/O operation failed for reading on closed file.", + )); + } + }; + let pos = reader .stream_position() .await @@ -264,6 +352,15 @@ impl AsyncReader { }) } + fn close<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> { + let state = self.0.clone(); + future_into_py(py, async move { + let mut state = state.lock().await; + *state = AsyncFileState::Closed; + Ok(()) + }) + } + fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { let slf = slf.into_py(py); future_into_py(py, async move { Ok(slf) }) @@ -276,10 +373,10 @@ impl AsyncReader { _exc_value: &'a PyAny, _traceback: &'a PyAny, ) -> PyResult<&'a PyAny> { - let reader = self.0.clone(); + let state = self.0.clone(); future_into_py(py, async move { - let mut state = reader.lock().await; - state.close(); + let mut state = state.lock().await; + *state = AsyncFileState::Closed; Ok(()) }) } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 1903cf424..a9c41fc00 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -33,8 +33,8 @@ mod metadata; pub use metadata::*; mod operator; pub use operator::*; -mod reader; -pub use reader::*; +mod file; +pub use file::*; mod utils; pub use utils::*; @@ -72,9 +72,11 @@ pub use utils::*; #[pymodule] fn _opendal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::<Operator>()?; - m.add_class::<Reader>()?; m.add_class::<AsyncOperator>()?; - m.add_class::<AsyncReader>()?; + + m.add_class::<File>()?; + m.add_class::<AsyncFile>()?; + m.add_class::<Entry>()?; m.add_class::<EntryMode>()?; m.add_class::<Metadata>()?; diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs index 0c17b2c31..5bc95a2cc 100644 --- a/bindings/python/src/operator.rs +++ b/bindings/python/src/operator.rs @@ -20,10 +20,8 @@ use std::str::FromStr; use std::time::Duration; use pyo3::exceptions::PyValueError; -use pyo3::ffi; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyDict}; -use pyo3::AsPyPointer; use pyo3_asyncio::tokio::future_into_py; use crate::*; @@ -77,24 +75,27 @@ impl Operator { Ok(Self(op.blocking())) } - /// Read the whole path into bytes. - pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p PyAny> { - let buffer = self - .0 - .read(path) - .map_err(format_pyerr) - .map(Buffer::from)? - .into_py(py); - let memoryview = - unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; - Ok(memoryview) - } - /// Open a file-like reader for the given path. - pub fn open_reader(&self, path: &str) -> PyResult<Reader> { - let r = self.0.reader(path).map_err(format_pyerr)?; + pub fn open(&self, path: String, mode: String) -> PyResult<File> { + let this = self.0.clone(); - Ok(Reader::new(r)) + if mode == "rb" { + let r = this.reader(&path).map_err(format_pyerr)?; + Ok(File::new_reader(r)) + } else if mode == "wb" { + let w = this.writer(&path).map_err(format_pyerr)?; + Ok(File::new_writer(w)) + } else { + Err(Error::new_err(format!( + "OpenDAL doesn't support mode: {mode}" + ))) + } + } + + /// Read the whole path into bytes. + pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p PyAny> { + let buffer = self.0.read(path).map_err(format_pyerr)?; + Buffer::new(buffer).into_memory_view_ref(py) } /// Write bytes into given path. @@ -232,28 +233,34 @@ impl AsyncOperator { Ok(Self(op)) } + /// Open a file-like reader for the given path. + pub fn open<'p>(&'p self, py: Python<'p>, path: String, mode: String) -> PyResult<&'p PyAny> { + let this = self.0.clone(); + + future_into_py(py, async move { + if mode == "rb" { + let r = this.reader(&path).await.map_err(format_pyerr)?; + Ok(AsyncFile::new_reader(r)) + } else if mode == "wb" { + let w = this.writer(&path).await.map_err(format_pyerr)?; + Ok(AsyncFile::new_writer(w)) + } else { + Err(Error::new_err(format!( + "OpenDAL doesn't support mode: {mode}" + ))) + } + }) + } + /// Read the whole path into bytes. pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { let this = self.0.clone(); future_into_py(py, async move { let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?; - Python::with_gil(|py| { - let buffer = Buffer::from(res).into_py(py); - unsafe { - PyObject::from_owned_ptr_or_err( - py, - ffi::PyMemoryView_FromObject(buffer.as_ptr()), - ) - } - }) + Python::with_gil(|py| Buffer::new(res).into_memory_view(py)) }) } - /// Open a file-like reader for the given path. - pub fn open_reader(&self, path: String) -> PyResult<AsyncReader> { - Ok(AsyncReader::new(self.0.clone(), path)) - } - /// Write bytes into given path. #[pyo3(signature = (path, bs, **kwargs))] pub fn write<'p>( diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs index 328499910..24cbca9da 100644 --- a/bindings/python/src/utils.rs +++ b/bindings/python/src/utils.rs @@ -37,6 +37,30 @@ pub struct Buffer { inner: Vec<u8>, } +impl Buffer { + pub fn new(inner: Vec<u8>) -> Self { + Buffer { inner } + } + + /// Consume self to build a memory view + pub fn into_memory_view(self, py: Python) -> PyResult<Py<PyAny>> { + let buffer = self.into_py(py); + + unsafe { + PyObject::from_owned_ptr_or_err(py, ffi::PyMemoryView_FromObject(buffer.as_ptr())) + } + } + + /// Consume self to build a memory view ref. + pub fn into_memory_view_ref(self, py: Python) -> PyResult<&PyAny> { + let buffer = self.into_py(py); + let view = + unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; + + Ok(view) + } +} + #[pymethods] impl Buffer { unsafe fn __getbuffer__( @@ -60,12 +84,6 @@ impl Buffer { } } -impl From<Vec<u8>> for Buffer { - fn from(inner: Vec<u8>) -> Self { - Self { inner } - } -} - pub fn format_pyerr(err: ocore::Error) -> PyErr { use ocore::ErrorKind::*; match err.kind() { diff --git a/bindings/python/tests/test_read.py b/bindings/python/tests/test_read.py index 9c318943c..95cde619e 100644 --- a/bindings/python/tests/test_read.py +++ b/bindings/python/tests/test_read.py @@ -35,6 +35,19 @@ def test_sync_read(service_name, operator, async_operator): operator.delete(filename) [email protected]_capability("read", "write", "delete") +def test_sync_reader(service_name, operator, async_operator): + size = randint(1, 1024) + filename = f"random_file_{str(uuid4())}" + content = os.urandom(size) + operator.write(filename, content) + + with operator.open(filename, "rb") as reader: + read_content = reader.read() + assert read_content is not None + assert read_content == content + + operator.delete(filename) @pytest.mark.asyncio @pytest.mark.need_capability("read", "write", "delete") @@ -50,6 +63,36 @@ async def test_async_read(service_name, operator, async_operator): await async_operator.delete(filename) [email protected] [email protected]_capability("read", "write", "delete") +async def test_async_reader(service_name, operator, async_operator): + size = randint(1, 1024) + filename = f"random_file_{str(uuid4())}" + content = os.urandom(size) + await async_operator.write(filename, content) + + async with await async_operator.open(filename, "rb") as reader: + read_content = await reader.read() + assert read_content is not None + assert read_content == content + + await async_operator.delete(filename) + [email protected] [email protected]_capability("read", "write", "delete") +async def test_async_reader_without_context(service_name, operator, async_operator): + size = randint(1, 1024) + filename = f"random_file_{str(uuid4())}" + content = os.urandom(size) + await async_operator.write(filename, content) + + reader = await async_operator.open(filename, "rb") + read_content = await reader.read() + assert read_content is not None + assert read_content == content + await reader.close() + + await async_operator.delete(filename) @pytest.mark.need_capability("read", "write", "delete", "stat") def test_sync_read_stat(service_name, operator, async_operator):
