This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-python in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit cf92431c0cc22f0f1aba6131f8788a81a680b871 Author: Xuanwo <[email protected]> AuthorDate: Fri Nov 3 16:38:00 2023 +0800 refactor(bindings/python): Refactor layout for python bindings Signed-off-by: Xuanwo <[email protected]> --- bindings/python/src/layers.rs | 7 +- bindings/python/src/lib.rs | 537 +----------------------- bindings/python/src/lister.rs | 81 ++++ bindings/python/src/metadata.rs | 117 ++++++ bindings/python/src/{asyncio.rs => operator.rs} | 437 ++++++++++--------- bindings/python/src/reader.rs | 286 +++++++++++++ bindings/python/src/utils.rs | 78 ++++ 7 files changed, 827 insertions(+), 716 deletions(-) diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs index d270c6be4..4b78e27e4 100644 --- a/bindings/python/src/layers.rs +++ b/bindings/python/src/layers.rs @@ -17,10 +17,11 @@ use std::time::Duration; -use ::opendal as od; use opendal::Operator; use pyo3::prelude::*; +use crate::*; + pub trait PythonLayer: Send + Sync { fn layer(&self, op: Operator) -> Operator; } @@ -30,7 +31,7 @@ pub struct Layer(pub Box<dyn PythonLayer>); #[pyclass(module = "opendal.layers", extends=Layer)] #[derive(Clone)] -pub struct RetryLayer(od::layers::RetryLayer); +pub struct RetryLayer(ocore::layers::RetryLayer); impl PythonLayer for RetryLayer { fn layer(&self, op: Operator) -> Operator { @@ -55,7 +56,7 @@ impl RetryLayer { max_delay: Option<f64>, min_delay: Option<f64>, ) -> PyResult<PyClassInitializer<Self>> { - let mut retry = od::layers::RetryLayer::default(); + let mut retry = ocore::layers::RetryLayer::default(); if let Some(max_times) = max_times { retry = retry.with_max_times(max_times); } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index c2e293fd7..1903cf424 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -18,526 +18,25 @@ // Suppress clippy::redundant_closure warning from pyo3 generated code #![allow(clippy::redundant_closure)] -use std::collections::HashMap; -use std::io::Read; -use std::io::Seek; -use std::io::SeekFrom; -use std::os::raw::c_int; -use std::str::FromStr; - -use ::opendal as od; -use pyo3::create_exception; -use pyo3::exceptions::PyException; -use pyo3::exceptions::PyFileExistsError; -use pyo3::exceptions::PyFileNotFoundError; -use pyo3::exceptions::PyIOError; -use pyo3::exceptions::PyNotImplementedError; -use pyo3::exceptions::PyPermissionError; -use pyo3::exceptions::PyValueError; -use pyo3::ffi; +// expose the opendal rust core as `core`. +// We will use `ocore::Xxx` to represents all types from opendal rust core. +pub use ::opendal as ocore; use pyo3::prelude::*; -use pyo3::types::PyDict; -use pyo3::AsPyPointer; -mod asyncio; mod capability; +pub use capability::*; mod layers; - -use crate::asyncio::*; - -create_exception!(opendal, Error, PyException, "OpenDAL related errors"); - -/// A bytes-like object that implements buffer protocol. -#[pyclass(module = "opendal")] -struct Buffer { - inner: Vec<u8>, -} - -#[pymethods] -impl Buffer { - unsafe fn __getbuffer__( - slf: PyRefMut<Self>, - view: *mut ffi::Py_buffer, - flags: c_int, - ) -> PyResult<()> { - let bytes = slf.inner.as_slice(); - let ret = ffi::PyBuffer_FillInfo( - view, - slf.as_ptr() as *mut _, - bytes.as_ptr() as *mut _, - bytes.len().try_into().unwrap(), - 1, // read only - flags, - ); - if ret == -1 { - return Err(PyErr::fetch(slf.py())); - } - Ok(()) - } -} - -impl From<Vec<u8>> for Buffer { - fn from(inner: Vec<u8>) -> Self { - Self { inner } - } -} - -fn build_operator( - scheme: od::Scheme, - map: HashMap<String, String>, - blocking: bool, -) -> PyResult<od::Operator> { - let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?; - if blocking && !op.info().full_capability().blocking { - let runtime = pyo3_asyncio::tokio::get_runtime(); - let _guard = runtime.enter(); - op = op.layer(od::layers::BlockingLayer::create().expect("blocking layer must be created")); - } - - Ok(op) -} - -/// `Operator` is the entry for all public blocking APIs -/// -/// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`). -#[pyclass(module = "opendal")] -struct Operator(od::BlockingOperator); - -#[pymethods] -impl Operator { - #[new] - #[pyo3(signature = (scheme, *, **map))] - pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { - let scheme = od::Scheme::from_str(scheme) - .map_err(|err| { - od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) - }) - .map_err(format_pyerr)?; - let map = map - .map(|v| { - v.extract::<HashMap<String, String>>() - .expect("must be valid hashmap") - }) - .unwrap_or_default(); - - Ok(Operator(build_operator(scheme, map, true)?.blocking())) - } - - /// Add new layers upon existing operator - pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> { - let op = layer.0.layer(self.0.clone().into()); - Ok(Self(op.blocking())) - } - - /// Read the whole path into bytes. - pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p PyAny> { - let buffer = self - .0 - .read(path) - .map_err(format_pyerr) - .map(Buffer::from)? - .into_py(py); - let memoryview = - unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; - Ok(memoryview) - } - - /// Open a file-like reader for the given path. - pub fn open_reader(&self, path: &str) -> PyResult<Reader> { - self.0 - .reader(path) - .map(|reader| Reader(Some(reader))) - .map_err(format_pyerr) - } - - /// Write bytes into given path. - #[pyo3(signature = (path, bs, **kwargs))] - pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) -> PyResult<()> { - let opwrite = build_opwrite(kwargs)?; - let mut write = self.0.write_with(path, bs).append(opwrite.append()); - if let Some(buffer) = opwrite.buffer() { - write = write.buffer(buffer); - } - if let Some(content_type) = opwrite.content_type() { - write = write.content_type(content_type); - } - if let Some(content_disposition) = opwrite.content_disposition() { - write = write.content_disposition(content_disposition); - } - if let Some(cache_control) = opwrite.cache_control() { - write = write.cache_control(cache_control); - } - - write.call().map_err(format_pyerr) - } - - /// Get current path's metadata **without cache** directly. - pub fn stat(&self, path: &str) -> PyResult<Metadata> { - self.0.stat(path).map_err(format_pyerr).map(Metadata) - } - - /// Copy source to target. - pub fn copy(&self, source: &str, target: &str) -> PyResult<()> { - self.0.copy(source, target).map_err(format_pyerr) - } - - /// Rename filename. - pub fn rename(&self, source: &str, target: &str) -> PyResult<()> { - self.0.rename(source, target).map_err(format_pyerr) - } - - /// Remove all file - pub fn remove_all(&self, path: &str) -> PyResult<()> { - self.0.remove_all(path).map_err(format_pyerr) - } - - /// Create a dir at given path. - /// - /// # Notes - /// - /// To indicate that a path is a directory, it is compulsory to include - /// a trailing / in the path. Failure to do so may result in - /// `NotADirectory` error being returned by OpenDAL. - /// - /// # Behavior - /// - /// - Create on existing dir will succeed. - /// - Create dir is always recursive, works like `mkdir -p` - pub fn create_dir(&self, path: &str) -> PyResult<()> { - self.0.create_dir(path).map_err(format_pyerr) - } - - /// Delete given path. - /// - /// # Notes - /// - /// - Delete not existing error won't return errors. - pub fn delete(&self, path: &str) -> PyResult<()> { - self.0.delete(path).map_err(format_pyerr) - } - - /// List current dir path. - pub fn list(&self, path: &str) -> PyResult<BlockingLister> { - Ok(BlockingLister(self.0.lister(path).map_err(format_pyerr)?)) - } - - /// List dir in flat way. - pub fn scan(&self, path: &str) -> PyResult<BlockingLister> { - Ok(BlockingLister( - self.0 - .lister_with(path) - .delimiter("") - .call() - .map_err(format_pyerr)?, - )) - } - - pub fn capability(&self) -> PyResult<capability::Capability> { - Ok(capability::Capability::new(self.0.info().full_capability())) - } - - fn __repr__(&self) -> String { - let info = self.0.info(); - let name = info.name(); - if name.is_empty() { - format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root()) - } else { - format!( - "Operator(\"{}\", root=\"{}\", name=\"{name}\")", - info.scheme(), - info.root() - ) - } - } -} - -/// A file-like blocking reader. -/// Can be used as a context manager. -#[pyclass(module = "opendal")] -struct Reader(Option<od::BlockingReader>); - -impl Reader { - fn as_mut(&mut self) -> PyResult<&mut od::BlockingReader> { - let reader = self - .0 - .as_mut() - .ok_or_else(|| PyValueError::new_err("I/O operation on closed file."))?; - Ok(reader) - } -} - -#[pymethods] -impl Reader { - /// Read and return size bytes, or if size is not given, until EOF. - #[pyo3(signature = (size=None,))] - pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> { - let reader = self.as_mut()?; - let buffer = match size { - Some(size) => { - let mut buffer = vec![0; size]; - reader - .read_exact(&mut buffer) - .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer - } - None => { - let mut buffer = Vec::new(); - reader - .read_to_end(&mut buffer) - .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer - } - }; - let buffer = Buffer::from(buffer).into_py(py); - let memoryview = - unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; - Ok(memoryview) - } - - /// `Reader` doesn't support write. - /// Raises a `NotImplementedError` if called. - pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> { - Err(PyNotImplementedError::new_err( - "Reader does not support write", - )) - } - - /// Change the stream position to the given byte offset. - /// offset is interpreted relative to the position indicated by `whence`. - /// The default value for whence is `SEEK_SET`. Values for `whence` are: - /// - /// * `SEEK_SET` or `0` – start of the stream (the default); offset should be zero or positive - /// * `SEEK_CUR` or `1` – current stream position; offset may be negative - /// * `SEEK_END` or `2` – end of the stream; offset is usually negative - /// - /// Return the new absolute position. - #[pyo3(signature = (pos, whence = 0))] - pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> { - let whence = match whence { - 0 => SeekFrom::Start(pos as u64), - 1 => SeekFrom::Current(pos), - 2 => SeekFrom::End(pos), - _ => return Err(PyValueError::new_err("invalid whence")), - }; - let reader = self.as_mut()?; - reader - .seek(whence) - .map_err(|err| PyIOError::new_err(err.to_string())) - } - - /// Return the current stream position. - pub fn tell(&mut self) -> PyResult<u64> { - let reader = self.as_mut()?; - reader - .stream_position() - .map_err(|err| PyIOError::new_err(err.to_string())) - } - - pub fn __enter__(slf: Py<Self>) -> Py<Self> { - slf - } - - pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, _traceback: PyObject) { - drop(self.0.take()); - } -} - -#[pyclass(unsendable, module = "opendal")] -struct BlockingLister(od::BlockingLister); - -#[pymethods] -impl BlockingLister { - fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { - slf - } - fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> { - match slf.0.next() { - Some(Ok(entry)) => Ok(Some(Entry(entry).into_py(slf.py()))), - Some(Err(err)) => { - let pyerr = format_pyerr(err); - Err(pyerr) - } - None => Ok(None), - } - } -} - -#[pyclass(module = "opendal")] -struct Entry(od::Entry); - -#[pymethods] -impl Entry { - /// Path of entry. Path is relative to operator's root. - #[getter] - pub fn path(&self) -> &str { - self.0.path() - } - - fn __str__(&self) -> &str { - self.0.path() - } - - fn __repr__(&self) -> String { - format!("Entry({:?})", self.0.path()) - } -} - -#[pyclass(module = "opendal")] -struct Metadata(od::Metadata); - -#[pymethods] -impl Metadata { - #[getter] - pub fn content_disposition(&self) -> Option<&str> { - self.0.content_disposition() - } - - /// Content length of this entry. - #[getter] - pub fn content_length(&self) -> u64 { - self.0.content_length() - } - - /// Content MD5 of this entry. - #[getter] - pub fn content_md5(&self) -> Option<&str> { - self.0.content_md5() - } - - /// Content Type of this entry. - #[getter] - pub fn content_type(&self) -> Option<&str> { - self.0.content_type() - } - - /// ETag of this entry. - #[getter] - pub fn etag(&self) -> Option<&str> { - self.0.etag() - } - - /// mode represent this entry's mode. - #[getter] - pub fn mode(&self) -> EntryMode { - EntryMode(self.0.mode()) - } -} - -#[pyclass(module = "opendal")] -struct EntryMode(od::EntryMode); - -#[pymethods] -impl EntryMode { - /// Returns `True` if this is a file. - pub fn is_file(&self) -> bool { - self.0.is_file() - } - - /// Returns `True` if this is a directory. - pub fn is_dir(&self) -> bool { - self.0.is_dir() - } - - pub fn __repr__(&self) -> &'static str { - match self.0 { - od::EntryMode::FILE => "EntryMode.FILE", - od::EntryMode::DIR => "EntryMode.DIR", - od::EntryMode::Unknown => "EntryMode.UNKNOWN", - } - } -} - -#[pyclass(module = "opendal")] -struct PresignedRequest(od::raw::PresignedRequest); - -#[pymethods] -impl PresignedRequest { - /// Return the URL of this request. - #[getter] - pub fn url(&self) -> String { - self.0.uri().to_string() - } - - /// Return the HTTP method of this request. - #[getter] - pub fn method(&self) -> &str { - self.0.method().as_str() - } - - /// Return the HTTP headers of this request. - #[getter] - pub fn headers(&self) -> PyResult<HashMap<&str, &str>> { - let mut headers = HashMap::new(); - for (k, v) in self.0.header().iter() { - let k = k.as_str(); - let v = v.to_str().map_err(|err| Error::new_err(err.to_string()))?; - if headers.insert(k, v).is_some() { - return Err(Error::new_err("duplicate header")); - } - } - Ok(headers) - } -} - -fn format_pyerr(err: od::Error) -> PyErr { - use od::ErrorKind::*; - match err.kind() { - NotFound => PyFileNotFoundError::new_err(err.to_string()), - AlreadyExists => PyFileExistsError::new_err(err.to_string()), - PermissionDenied => PyPermissionError::new_err(err.to_string()), - Unsupported => PyNotImplementedError::new_err(err.to_string()), - _ => Error::new_err(err.to_string()), - } -} - -/// recognize OpWrite-equivalent options passed as python dict -pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) -> PyResult<od::raw::OpWrite> { - use od::raw::OpWrite; - let mut op = OpWrite::new(); - - let dict = if let Some(kwargs) = kwargs { - kwargs - } else { - return Ok(op); - }; - - if let Some(append) = dict.get_item("append") { - let v = append - .extract::<bool>() - .map_err(|err| PyValueError::new_err(format!("append must be bool, got {}", err)))?; - op = op.with_append(v); - } - - if let Some(buffer) = dict.get_item("buffer") { - let v = buffer - .extract::<usize>() - .map_err(|err| PyValueError::new_err(format!("buffer must be usize, got {}", err)))?; - op = op.with_buffer(v); - } - - if let Some(content_type) = dict.get_item("content_type") { - let v = content_type.extract::<String>().map_err(|err| { - PyValueError::new_err(format!("content_type must be str, got {}", err)) - })?; - op = op.with_content_type(v.as_str()); - } - - if let Some(content_disposition) = dict.get_item("content_disposition") { - let v = content_disposition.extract::<String>().map_err(|err| { - PyValueError::new_err(format!("content_disposition must be str, got {}", err)) - })?; - op = op.with_content_disposition(v.as_str()); - } - - if let Some(cache_control) = dict.get_item("cache_control") { - let v = cache_control.extract::<String>().map_err(|err| { - PyValueError::new_err(format!("cache_control must be str, got {}", err)) - })?; - op = op.with_cache_control(v.as_str()); - } - - Ok(op) -} +pub use layers::*; +mod lister; +pub use lister::*; +mod metadata; +pub use metadata::*; +mod operator; +pub use operator::*; +mod reader; +pub use reader::*; +mod utils; +pub use utils::*; /// OpenDAL Python binding /// @@ -580,13 +79,13 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::<EntryMode>()?; m.add_class::<Metadata>()?; m.add_class::<PresignedRequest>()?; - m.add_class::<capability::Capability>()?; + m.add_class::<Capability>()?; m.add("Error", py.get_type::<Error>())?; // Layer module let layers_module = PyModule::new(py, "layers")?; - layers_module.add_class::<layers::Layer>()?; - layers_module.add_class::<layers::RetryLayer>()?; + layers_module.add_class::<Layer>()?; + layers_module.add_class::<RetryLayer>()?; m.add_submodule(layers_module)?; py.import("sys")? .getattr("modules")? diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs new file mode 100644 index 000000000..d2e7fe89c --- /dev/null +++ b/bindings/python/src/lister.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::TryStreamExt; +use std::sync::Arc; + +use pyo3::exceptions::PyStopAsyncIteration; +use pyo3::prelude::*; +use pyo3_asyncio::tokio::future_into_py; +use tokio::sync::Mutex; + +use crate::*; + +#[pyclass(unsendable, module = "opendal")] +pub struct BlockingLister(ocore::BlockingLister); + +impl BlockingLister { + /// Create a new blocking lister. + pub fn new(inner: ocore::BlockingLister) -> Self { + Self(inner) + } +} + +#[pymethods] +impl BlockingLister { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> { + match slf.0.next() { + Some(Ok(entry)) => Ok(Some(Entry::new(entry).into_py(slf.py()))), + Some(Err(err)) => { + let pyerr = format_pyerr(err); + Err(pyerr) + } + None => Ok(None), + } + } +} + +#[pyclass(module = "opendal")] +pub struct AsyncLister(Arc<Mutex<ocore::Lister>>); + +impl AsyncLister { + pub fn new(lister: ocore::Lister) -> Self { + Self(Arc::new(Mutex::new(lister))) + } +} + +#[pymethods] +impl AsyncLister { + fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<Self> { + slf + } + fn __anext__(slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> { + let lister = slf.0.clone(); + let fut = future_into_py(slf.py(), async move { + let mut lister = lister.lock().await; + let entry = lister.try_next().await.map_err(format_pyerr)?; + match entry { + Some(entry) => Ok(Python::with_gil(|py| Entry::new(entry).into_py(py))), + None => Err(PyStopAsyncIteration::new_err("stream exhausted")), + } + })?; + Ok(Some(fut.into())) + } +} diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs new file mode 100644 index 000000000..e03eeda18 --- /dev/null +++ b/bindings/python/src/metadata.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; + +use crate::*; + +#[pyclass(module = "opendal")] +pub struct Entry(ocore::Entry); + +impl Entry { + pub fn new(entry: ocore::Entry) -> Self { + Self(entry) + } +} + +#[pymethods] +impl Entry { + /// Path of entry. Path is relative to operator's root. + #[getter] + pub fn path(&self) -> &str { + self.0.path() + } + + fn __str__(&self) -> &str { + self.0.path() + } + + fn __repr__(&self) -> String { + format!("Entry({:?})", self.0.path()) + } +} + +#[pyclass(module = "opendal")] +pub struct Metadata(ocore::Metadata); + +impl Metadata { + pub fn new(meta: ocore::Metadata) -> Self { + Self(meta) + } +} + +#[pymethods] +impl Metadata { + #[getter] + pub fn content_disposition(&self) -> Option<&str> { + self.0.content_disposition() + } + + /// Content length of this entry. + #[getter] + pub fn content_length(&self) -> u64 { + self.0.content_length() + } + + /// Content MD5 of this entry. + #[getter] + pub fn content_md5(&self) -> Option<&str> { + self.0.content_md5() + } + + /// Content Type of this entry. + #[getter] + pub fn content_type(&self) -> Option<&str> { + self.0.content_type() + } + + /// ETag of this entry. + #[getter] + pub fn etag(&self) -> Option<&str> { + self.0.etag() + } + + /// mode represent this entry's mode. + #[getter] + pub fn mode(&self) -> EntryMode { + EntryMode(self.0.mode()) + } +} + +#[pyclass(module = "opendal")] +pub struct EntryMode(ocore::EntryMode); + +#[pymethods] +impl EntryMode { + /// Returns `True` if this is a file. + pub fn is_file(&self) -> bool { + self.0.is_file() + } + + /// Returns `True` if this is a directory. + pub fn is_dir(&self) -> bool { + self.0.is_dir() + } + + pub fn __repr__(&self) -> &'static str { + match self.0 { + ocore::EntryMode::FILE => "EntryMode.FILE", + ocore::EntryMode::DIR => "EntryMode.DIR", + ocore::EntryMode::Unknown => "EntryMode.UNKNOWN", + } + } +} diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/operator.rs similarity index 52% rename from bindings/python/src/asyncio.rs rename to bindings/python/src/operator.rs index 2ef6253d6..0c17b2c31 100644 --- a/bindings/python/src/asyncio.rs +++ b/bindings/python/src/operator.rs @@ -16,52 +16,204 @@ // under the License. use std::collections::HashMap; -use std::io::SeekFrom; use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; -use ::opendal as od; -use futures::TryStreamExt; -use pyo3::exceptions::PyIOError; -use pyo3::exceptions::PyNotImplementedError; -use pyo3::exceptions::PyStopAsyncIteration; use pyo3::exceptions::PyValueError; use pyo3::ffi; use pyo3::prelude::*; -use pyo3::types::PyBytes; -use pyo3::types::PyDict; +use pyo3::types::{PyBytes, PyDict}; use pyo3::AsPyPointer; use pyo3_asyncio::tokio::future_into_py; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeekExt; -use tokio::sync::Mutex; -use crate::build_operator; -use crate::build_opwrite; -use crate::format_pyerr; -use crate::layers; -use crate::Buffer; -use crate::Entry; -use crate::Metadata; -use crate::PresignedRequest; +use crate::*; + +fn build_operator( + scheme: ocore::Scheme, + map: HashMap<String, String>, + blocking: bool, +) -> PyResult<ocore::Operator> { + let mut op = ocore::Operator::via_map(scheme, map).map_err(format_pyerr)?; + if blocking && !op.info().full_capability().blocking { + let runtime = pyo3_asyncio::tokio::get_runtime(); + let _guard = runtime.enter(); + op = op + .layer(ocore::layers::BlockingLayer::create().expect("blocking layer must be created")); + } + + Ok(op) +} + +/// `Operator` is the entry for all public blocking APIs +/// +/// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`). +#[pyclass(module = "opendal")] +pub struct Operator(ocore::BlockingOperator); + +#[pymethods] +impl Operator { + #[new] + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { + let scheme = ocore::Scheme::from_str(scheme) + .map_err(|err| { + ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme") + .set_source(err) + }) + .map_err(format_pyerr)?; + let map = map + .map(|v| { + v.extract::<HashMap<String, String>>() + .expect("must be valid hashmap") + }) + .unwrap_or_default(); -use crate::capability; + Ok(Operator(build_operator(scheme, map, true)?.blocking())) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> { + let op = layer.0.layer(self.0.clone().into()); + Ok(Self(op.blocking())) + } + + /// Read the whole path into bytes. + pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p PyAny> { + let buffer = self + .0 + .read(path) + .map_err(format_pyerr) + .map(Buffer::from)? + .into_py(py); + let memoryview = + unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; + Ok(memoryview) + } + + /// Open a file-like reader for the given path. + pub fn open_reader(&self, path: &str) -> PyResult<Reader> { + let r = self.0.reader(path).map_err(format_pyerr)?; + + Ok(Reader::new(r)) + } + + /// Write bytes into given path. + #[pyo3(signature = (path, bs, **kwargs))] + pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) -> PyResult<()> { + let opwrite = build_opwrite(kwargs)?; + let mut write = self.0.write_with(path, bs).append(opwrite.append()); + if let Some(buffer) = opwrite.buffer() { + write = write.buffer(buffer); + } + if let Some(content_type) = opwrite.content_type() { + write = write.content_type(content_type); + } + if let Some(content_disposition) = opwrite.content_disposition() { + write = write.content_disposition(content_disposition); + } + if let Some(cache_control) = opwrite.cache_control() { + write = write.cache_control(cache_control); + } + + write.call().map_err(format_pyerr) + } + + /// Get current path's metadata **without cache** directly. + pub fn stat(&self, path: &str) -> PyResult<Metadata> { + self.0.stat(path).map_err(format_pyerr).map(Metadata::new) + } + + /// Copy source to target. + pub fn copy(&self, source: &str, target: &str) -> PyResult<()> { + self.0.copy(source, target).map_err(format_pyerr) + } + + /// Rename filename. + pub fn rename(&self, source: &str, target: &str) -> PyResult<()> { + self.0.rename(source, target).map_err(format_pyerr) + } + + /// Remove all file + pub fn remove_all(&self, path: &str) -> PyResult<()> { + self.0.remove_all(path).map_err(format_pyerr) + } + + /// Create a dir at given path. + /// + /// # Notes + /// + /// To indicate that a path is a directory, it is compulsory to include + /// a trailing / in the path. Failure to do so may result in + /// `NotADirectory` error being returned by OpenDAL. + /// + /// # Behavior + /// + /// - Create on existing dir will succeed. + /// - Create dir is always recursive, works like `mkdir -p` + pub fn create_dir(&self, path: &str) -> PyResult<()> { + self.0.create_dir(path).map_err(format_pyerr) + } + + /// Delete given path. + /// + /// # Notes + /// + /// - Delete not existing error won't return errors. + pub fn delete(&self, path: &str) -> PyResult<()> { + self.0.delete(path).map_err(format_pyerr) + } + + /// List current dir path. + pub fn list(&self, path: &str) -> PyResult<BlockingLister> { + let l = self.0.lister(path).map_err(format_pyerr)?; + Ok(BlockingLister::new(l)) + } + + /// List dir in flat way. + pub fn scan(&self, path: &str) -> PyResult<BlockingLister> { + let l = self + .0 + .lister_with(path) + .delimiter("") + .call() + .map_err(format_pyerr)?; + Ok(BlockingLister::new(l)) + } + + pub fn capability(&self) -> PyResult<capability::Capability> { + Ok(capability::Capability::new(self.0.info().full_capability())) + } + + fn __repr__(&self) -> String { + let info = self.0.info(); + let name = info.name(); + if name.is_empty() { + format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root()) + } else { + format!( + "Operator(\"{}\", root=\"{}\", name=\"{name}\")", + info.scheme(), + info.root() + ) + } + } +} /// `AsyncOperator` is the entry for all public async APIs /// /// Create a new `AsyncOperator` with the given `scheme` and options(`**kwargs`). #[pyclass(module = "opendal")] -pub struct AsyncOperator(od::Operator); +pub struct AsyncOperator(ocore::Operator); #[pymethods] impl AsyncOperator { #[new] #[pyo3(signature = (scheme, *, **map))] pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { - let scheme = od::Scheme::from_str(scheme) + let scheme = ocore::Scheme::from_str(scheme) .map_err(|err| { - od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) + ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme") + .set_source(err) }) .map_err(format_pyerr)?; let map = map @@ -99,10 +251,7 @@ impl AsyncOperator { /// Open a file-like reader for the given path. pub fn open_reader(&self, path: String) -> PyResult<AsyncReader> { - Ok(AsyncReader::new(ReaderState::Init { - operator: self.0.clone(), - path, - })) + Ok(AsyncReader::new(self.0.clone(), path)) } /// Write bytes into given path. @@ -139,7 +288,11 @@ impl AsyncOperator { pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { let this = self.0.clone(); future_into_py(py, async move { - let res: Metadata = this.stat(&path).await.map_err(format_pyerr).map(Metadata)?; + let res: Metadata = this + .stat(&path) + .await + .map_err(format_pyerr) + .map(Metadata::new)?; Ok(res) }) @@ -315,187 +468,83 @@ impl AsyncOperator { } } -enum ReaderState { - Init { - operator: od::Operator, - path: String, - }, - Open(od::Reader), - Closed, -} - -impl ReaderState { - async fn reader(&mut self) -> PyResult<&mut od::Reader> { - let reader = match self { - ReaderState::Init { operator, path } => { - let reader = operator.reader(path).await.map_err(format_pyerr)?; - *self = ReaderState::Open(reader); - if let ReaderState::Open(ref mut reader) = self { - reader - } else { - unreachable!() - } - } - ReaderState::Open(ref mut reader) => reader, - ReaderState::Closed => { - return Err(PyValueError::new_err("I/O operation on closed file.")); - } - }; - Ok(reader) - } - - fn close(&mut self) { - *self = ReaderState::Closed; - } -} - -/// A file-like async reader. -/// Can be used as an async context manager. -#[pyclass(module = "opendal")] -pub struct AsyncReader(Arc<Mutex<ReaderState>>); - -impl AsyncReader { - fn new(reader: ReaderState) -> Self { - Self(Arc::new(Mutex::new(reader))) - } -} - -#[pymethods] -impl AsyncReader { - /// Read and return size bytes, or if size is not given, until EOF. - pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> { - let reader = self.0.clone(); - future_into_py(py, async move { - let mut state = reader.lock().await; - let reader = state.reader().await?; - let buffer = match size { - Some(size) => { - let mut buffer = vec![0; size]; - reader - .read_exact(&mut buffer) - .await - .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer - } - None => { - let mut buffer = Vec::new(); - reader - .read_to_end(&mut buffer) - .await - .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer - } - }; - Python::with_gil(|py| { - let buffer = Buffer::from(buffer).into_py(py); - unsafe { - PyObject::from_owned_ptr_or_err( - py, - ffi::PyMemoryView_FromObject(buffer.as_ptr()), - ) - } - }) - }) +/// recognize OpWrite-equivalent options passed as python dict +pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) -> PyResult<ocore::raw::OpWrite> { + use ocore::raw::OpWrite; + let mut op = OpWrite::new(); + + let dict = if let Some(kwargs) = kwargs { + kwargs + } else { + return Ok(op); + }; + + if let Some(append) = dict.get_item("append") { + let v = append + .extract::<bool>() + .map_err(|err| PyValueError::new_err(format!("append must be bool, got {}", err)))?; + op = op.with_append(v); } - /// `AsyncReader` doesn't support write. - /// Raises a `NotImplementedError` if called. - pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) -> PyResult<&'p PyAny> { - future_into_py::<_, PyObject>(py, async move { - Err(PyNotImplementedError::new_err( - "AsyncReader does not support write", - )) - }) + if let Some(buffer) = dict.get_item("buffer") { + let v = buffer + .extract::<usize>() + .map_err(|err| PyValueError::new_err(format!("buffer must be usize, got {}", err)))?; + op = op.with_buffer(v); } - /// Change the stream position to the given byte offset. - /// offset is interpreted relative to the position indicated by `whence`. - /// The default value for whence is `SEEK_SET`. Values for `whence` are: - /// - /// * `SEEK_SET` or `0` – start of the stream (the default); offset should be zero or positive - /// * `SEEK_CUR` or `1` – current stream position; offset may be negative - /// * `SEEK_END` or `2` – end of the stream; offset is usually negative - /// - /// Return the new absolute position. - #[pyo3(signature = (pos, whence = 0))] - pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> PyResult<&'p PyAny> { - let whence = match whence { - 0 => SeekFrom::Start(pos as u64), - 1 => SeekFrom::Current(pos), - 2 => SeekFrom::End(pos), - _ => return Err(PyValueError::new_err("invalid whence")), - }; - let reader = self.0.clone(); - future_into_py(py, async move { - let mut state = reader.lock().await; - let reader = state.reader().await?; - let ret = reader - .seek(whence) - .await - .map_err(|err| PyIOError::new_err(err.to_string()))?; - Ok(Python::with_gil(|py| ret.into_py(py))) - }) + if let Some(content_type) = dict.get_item("content_type") { + let v = content_type.extract::<String>().map_err(|err| { + PyValueError::new_err(format!("content_type must be str, got {}", err)) + })?; + op = op.with_content_type(v.as_str()); } - /// Return the current stream position. - pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> { - let reader = self.0.clone(); - future_into_py(py, async move { - let mut state = reader.lock().await; - let reader = state.reader().await?; - let pos = reader - .stream_position() - .await - .map_err(|err| PyIOError::new_err(err.to_string()))?; - Ok(Python::with_gil(|py| pos.into_py(py))) - }) + if let Some(content_disposition) = dict.get_item("content_disposition") { + let v = content_disposition.extract::<String>().map_err(|err| { + PyValueError::new_err(format!("content_disposition must be str, got {}", err)) + })?; + op = op.with_content_disposition(v.as_str()); } - fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { - let slf = slf.into_py(py); - future_into_py(py, async move { Ok(slf) }) + if let Some(cache_control) = dict.get_item("cache_control") { + let v = cache_control.extract::<String>().map_err(|err| { + PyValueError::new_err(format!("cache_control must be str, got {}", err)) + })?; + op = op.with_cache_control(v.as_str()); } - fn __aexit__<'a>( - &self, - py: Python<'a>, - _exc_type: &'a PyAny, - _exc_value: &'a PyAny, - _traceback: &'a PyAny, - ) -> PyResult<&'a PyAny> { - let reader = self.0.clone(); - future_into_py(py, async move { - let mut state = reader.lock().await; - state.close(); - Ok(()) - }) - } + Ok(op) } #[pyclass(module = "opendal")] -struct AsyncLister(Arc<Mutex<od::Lister>>); +pub struct PresignedRequest(ocore::raw::PresignedRequest); -impl AsyncLister { - fn new(lister: od::Lister) -> Self { - Self(Arc::new(Mutex::new(lister))) +#[pymethods] +impl PresignedRequest { + /// Return the URL of this request. + #[getter] + pub fn url(&self) -> String { + self.0.uri().to_string() } -} -#[pymethods] -impl AsyncLister { - fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<Self> { - slf - } - fn __anext__(slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> { - let lister = slf.0.clone(); - let fut = future_into_py(slf.py(), async move { - let mut lister = lister.lock().await; - let entry = lister.try_next().await.map_err(format_pyerr)?; - match entry { - Some(entry) => Ok(Python::with_gil(|py| Entry(entry).into_py(py))), - None => Err(PyStopAsyncIteration::new_err("stream exhausted")), + /// Return the HTTP method of this request. + #[getter] + pub fn method(&self) -> &str { + self.0.method().as_str() + } + + /// Return the HTTP headers of this request. + #[getter] + pub fn headers(&self) -> PyResult<HashMap<&str, &str>> { + let mut headers = HashMap::new(); + for (k, v) in self.0.header().iter() { + let k = k.as_str(); + let v = v.to_str().map_err(|err| Error::new_err(err.to_string()))?; + if headers.insert(k, v).is_some() { + return Err(Error::new_err("duplicate header")); } - })?; - Ok(Some(fut.into())) + } + Ok(headers) } } diff --git a/bindings/python/src/reader.rs b/bindings/python/src/reader.rs new file mode 100644 index 000000000..584f21932 --- /dev/null +++ b/bindings/python/src/reader.rs @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Read; +use std::io::Seek; +use std::io::SeekFrom; +use std::sync::Arc; + +use futures::AsyncReadExt; +use futures::AsyncSeekExt; +use pyo3::exceptions::PyIOError; +use pyo3::exceptions::PyNotImplementedError; +use pyo3::exceptions::PyValueError; +use pyo3::ffi; +use pyo3::prelude::*; +use pyo3::AsPyPointer; +use pyo3_asyncio::tokio::future_into_py; +use tokio::sync::Mutex; + +use crate::*; + +/// A file-like blocking reader. +/// Can be used as a context manager. +#[pyclass(module = "opendal")] +pub struct Reader(Option<ocore::BlockingReader>); + +impl Reader { + pub fn new(reader: ocore::BlockingReader) -> Self { + Self(Some(reader)) + } + + fn as_mut(&mut self) -> PyResult<&mut ocore::BlockingReader> { + let reader = self + .0 + .as_mut() + .ok_or_else(|| PyValueError::new_err("I/O operation on closed file."))?; + Ok(reader) + } +} + +#[pymethods] +impl Reader { + /// Read and return size bytes, or if size is not given, until EOF. + #[pyo3(signature = (size=None,))] + pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> { + let reader = self.as_mut()?; + let buffer = match size { + Some(size) => { + let mut buffer = vec![0; size]; + reader + .read_exact(&mut buffer) + .map_err(|err| PyIOError::new_err(err.to_string()))?; + buffer + } + None => { + let mut buffer = Vec::new(); + reader + .read_to_end(&mut buffer) + .map_err(|err| PyIOError::new_err(err.to_string()))?; + buffer + } + }; + let buffer = Buffer::from(buffer).into_py(py); + let memoryview = + unsafe { py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? }; + Ok(memoryview) + } + + /// `Reader` doesn't support write. + /// Raises a `NotImplementedError` if called. + pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> { + Err(PyNotImplementedError::new_err( + "Reader does not support write", + )) + } + + /// Change the stream position to the given byte offset. + /// offset is interpreted relative to the position indicated by `whence`. + /// The default value for whence is `SEEK_SET`. Values for `whence` are: + /// + /// * `SEEK_SET` or `0` – start of the stream (the default); offset should be zero or positive + /// * `SEEK_CUR` or `1` – current stream position; offset may be negative + /// * `SEEK_END` or `2` – end of the stream; offset is usually negative + /// + /// Return the new absolute position. + #[pyo3(signature = (pos, whence = 0))] + pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> { + let whence = match whence { + 0 => SeekFrom::Start(pos as u64), + 1 => SeekFrom::Current(pos), + 2 => SeekFrom::End(pos), + _ => return Err(PyValueError::new_err("invalid whence")), + }; + let reader = self.as_mut()?; + reader + .seek(whence) + .map_err(|err| PyIOError::new_err(err.to_string())) + } + + /// Return the current stream position. + pub fn tell(&mut self) -> PyResult<u64> { + let reader = self.as_mut()?; + reader + .stream_position() + .map_err(|err| PyIOError::new_err(err.to_string())) + } + + pub fn __enter__(slf: Py<Self>) -> Py<Self> { + slf + } + + pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, _traceback: PyObject) { + drop(self.0.take()); + } +} + +enum ReaderState { + Init { + operator: ocore::Operator, + path: String, + }, + Open(ocore::Reader), + Closed, +} + +impl ReaderState { + async fn reader(&mut self) -> PyResult<&mut ocore::Reader> { + let reader = match self { + ReaderState::Init { operator, path } => { + let reader = operator.reader(path).await.map_err(format_pyerr)?; + *self = ReaderState::Open(reader); + if let ReaderState::Open(ref mut reader) = self { + reader + } else { + unreachable!() + } + } + ReaderState::Open(ref mut reader) => reader, + ReaderState::Closed => { + return Err(PyValueError::new_err("I/O operation on closed file.")); + } + }; + Ok(reader) + } + + fn close(&mut self) { + *self = ReaderState::Closed; + } +} + +/// A file-like async reader. +/// Can be used as an async context manager. +#[pyclass(module = "opendal")] +pub struct AsyncReader(Arc<Mutex<ReaderState>>); + +impl AsyncReader { + pub fn new(operator: ocore::Operator, path: String) -> Self { + Self(Arc::new(Mutex::new(ReaderState::Init { operator, path }))) + } +} + +#[pymethods] +impl AsyncReader { + /// Read and return size bytes, or if size is not given, until EOF. + pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> { + let reader = self.0.clone(); + future_into_py(py, async move { + let mut state = reader.lock().await; + let reader = state.reader().await?; + let buffer = match size { + Some(size) => { + let mut buffer = vec![0; size]; + reader + .read_exact(&mut buffer) + .await + .map_err(|err| PyIOError::new_err(err.to_string()))?; + buffer + } + None => { + let mut buffer = Vec::new(); + reader + .read_to_end(&mut buffer) + .await + .map_err(|err| PyIOError::new_err(err.to_string()))?; + buffer + } + }; + Python::with_gil(|py| { + let buffer = Buffer::from(buffer).into_py(py); + unsafe { + PyObject::from_owned_ptr_or_err( + py, + ffi::PyMemoryView_FromObject(buffer.as_ptr()), + ) + } + }) + }) + } + + /// `AsyncReader` doesn't support write. + /// Raises a `NotImplementedError` if called. + pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) -> PyResult<&'p PyAny> { + future_into_py::<_, PyObject>(py, async move { + Err(PyNotImplementedError::new_err( + "AsyncReader does not support write", + )) + }) + } + + /// Change the stream position to the given byte offset. + /// offset is interpreted relative to the position indicated by `whence`. + /// The default value for whence is `SEEK_SET`. Values for `whence` are: + /// + /// * `SEEK_SET` or `0` – start of the stream (the default); offset should be zero or positive + /// * `SEEK_CUR` or `1` – current stream position; offset may be negative + /// * `SEEK_END` or `2` – end of the stream; offset is usually negative + /// + /// Return the new absolute position. + #[pyo3(signature = (pos, whence = 0))] + pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> PyResult<&'p PyAny> { + let whence = match whence { + 0 => SeekFrom::Start(pos as u64), + 1 => SeekFrom::Current(pos), + 2 => SeekFrom::End(pos), + _ => return Err(PyValueError::new_err("invalid whence")), + }; + let reader = self.0.clone(); + future_into_py(py, async move { + let mut state = reader.lock().await; + let reader = state.reader().await?; + let ret = reader + .seek(whence) + .await + .map_err(|err| PyIOError::new_err(err.to_string()))?; + Ok(Python::with_gil(|py| ret.into_py(py))) + }) + } + + /// Return the current stream position. + pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> { + let reader = self.0.clone(); + future_into_py(py, async move { + let mut state = reader.lock().await; + let reader = state.reader().await?; + let pos = reader + .stream_position() + .await + .map_err(|err| PyIOError::new_err(err.to_string()))?; + Ok(Python::with_gil(|py| pos.into_py(py))) + }) + } + + fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { + let slf = slf.into_py(py); + future_into_py(py, async move { Ok(slf) }) + } + + fn __aexit__<'a>( + &self, + py: Python<'a>, + _exc_type: &'a PyAny, + _exc_value: &'a PyAny, + _traceback: &'a PyAny, + ) -> PyResult<&'a PyAny> { + let reader = self.0.clone(); + future_into_py(py, async move { + let mut state = reader.lock().await; + state.close(); + Ok(()) + }) + } +} diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs new file mode 100644 index 000000000..328499910 --- /dev/null +++ b/bindings/python/src/utils.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::os::raw::c_int; + +use pyo3::create_exception; +use pyo3::exceptions::PyException; +use pyo3::exceptions::PyFileExistsError; +use pyo3::exceptions::PyFileNotFoundError; +use pyo3::exceptions::PyNotImplementedError; +use pyo3::exceptions::PyPermissionError; +use pyo3::ffi; +use pyo3::prelude::*; +use pyo3::AsPyPointer; + +use crate::*; + +create_exception!(opendal, Error, PyException, "OpenDAL related errors"); + +/// A bytes-like object that implements buffer protocol. +#[pyclass(module = "opendal")] +pub struct Buffer { + inner: Vec<u8>, +} + +#[pymethods] +impl Buffer { + unsafe fn __getbuffer__( + slf: PyRefMut<Self>, + view: *mut ffi::Py_buffer, + flags: c_int, + ) -> PyResult<()> { + let bytes = slf.inner.as_slice(); + let ret = ffi::PyBuffer_FillInfo( + view, + slf.as_ptr() as *mut _, + bytes.as_ptr() as *mut _, + bytes.len().try_into().unwrap(), + 1, // read only + flags, + ); + if ret == -1 { + return Err(PyErr::fetch(slf.py())); + } + Ok(()) + } +} + +impl From<Vec<u8>> for Buffer { + fn from(inner: Vec<u8>) -> Self { + Self { inner } + } +} + +pub fn format_pyerr(err: ocore::Error) -> PyErr { + use ocore::ErrorKind::*; + match err.kind() { + NotFound => PyFileNotFoundError::new_err(err.to_string()), + AlreadyExists => PyFileExistsError::new_err(err.to_string()), + PermissionDenied => PyPermissionError::new_err(err.to_string()), + Unsupported => PyNotImplementedError::new_err(err.to_string()), + _ => Error::new_err(err.to_string()), + } +}
