This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 318decc17 refactor(bindings/python): Refactor layout for python
bindings (#3473)
318decc17 is described below
commit 318decc17f62e375214d8dbbb8158c1f9ecfe705
Author: Xuanwo <[email protected]>
AuthorDate: Fri Nov 3 17:00:04 2023 +0800
refactor(bindings/python): Refactor layout for python bindings (#3473)
* remove submodule
Signed-off-by: Xuanwo <[email protected]>
* refactor(bindings/python): Refactor layout for python bindings
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/python/src/layers.rs | 14 +-
bindings/python/src/lib.rs | 538 +-----------------------
bindings/python/src/lister.rs | 81 ++++
bindings/python/src/metadata.rs | 117 ++++++
bindings/python/src/{asyncio.rs => operator.rs} | 437 ++++++++++---------
bindings/python/src/reader.rs | 286 +++++++++++++
bindings/python/src/utils.rs | 78 ++++
7 files changed, 829 insertions(+), 722 deletions(-)
diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs
index c3c9a8cc9..4b78e27e4 100644
--- a/bindings/python/src/layers.rs
+++ b/bindings/python/src/layers.rs
@@ -17,10 +17,11 @@
use std::time::Duration;
-use ::opendal as od;
use opendal::Operator;
use pyo3::prelude::*;
+use crate::*;
+
pub trait PythonLayer: Send + Sync {
fn layer(&self, op: Operator) -> Operator;
}
@@ -30,7 +31,7 @@ pub struct Layer(pub Box<dyn PythonLayer>);
#[pyclass(module = "opendal.layers", extends=Layer)]
#[derive(Clone)]
-pub struct RetryLayer(od::layers::RetryLayer);
+pub struct RetryLayer(ocore::layers::RetryLayer);
impl PythonLayer for RetryLayer {
fn layer(&self, op: Operator) -> Operator {
@@ -55,7 +56,7 @@ impl RetryLayer {
max_delay: Option<f64>,
min_delay: Option<f64>,
) -> PyResult<PyClassInitializer<Self>> {
- let mut retry = od::layers::RetryLayer::default();
+ let mut retry = ocore::layers::RetryLayer::default();
if let Some(max_times) = max_times {
retry = retry.with_max_times(max_times);
}
@@ -79,10 +80,3 @@ impl RetryLayer {
Ok(class)
}
}
-
-pub fn create_submodule(py: Python) -> PyResult<&PyModule> {
- let submod = PyModule::new(py, "layers")?;
- submod.add_class::<Layer>()?;
- submod.add_class::<RetryLayer>()?;
- Ok(submod)
-}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index cacbadc9c..1903cf424 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -18,526 +18,25 @@
// Suppress clippy::redundant_closure warning from pyo3 generated code
#![allow(clippy::redundant_closure)]
-use std::collections::HashMap;
-use std::io::Read;
-use std::io::Seek;
-use std::io::SeekFrom;
-use std::os::raw::c_int;
-use std::str::FromStr;
-
-use ::opendal as od;
-use pyo3::create_exception;
-use pyo3::exceptions::PyException;
-use pyo3::exceptions::PyFileExistsError;
-use pyo3::exceptions::PyFileNotFoundError;
-use pyo3::exceptions::PyIOError;
-use pyo3::exceptions::PyNotImplementedError;
-use pyo3::exceptions::PyPermissionError;
-use pyo3::exceptions::PyValueError;
-use pyo3::ffi;
+// expose the opendal rust core as `core`.
+// We will use `ocore::Xxx` to represents all types from opendal rust core.
+pub use ::opendal as ocore;
use pyo3::prelude::*;
-use pyo3::types::PyDict;
-use pyo3::AsPyPointer;
-mod asyncio;
mod capability;
+pub use capability::*;
mod layers;
-
-use crate::asyncio::*;
-
-create_exception!(opendal, Error, PyException, "OpenDAL related errors");
-
-/// A bytes-like object that implements buffer protocol.
-#[pyclass(module = "opendal")]
-struct Buffer {
- inner: Vec<u8>,
-}
-
-#[pymethods]
-impl Buffer {
- unsafe fn __getbuffer__(
- slf: PyRefMut<Self>,
- view: *mut ffi::Py_buffer,
- flags: c_int,
- ) -> PyResult<()> {
- let bytes = slf.inner.as_slice();
- let ret = ffi::PyBuffer_FillInfo(
- view,
- slf.as_ptr() as *mut _,
- bytes.as_ptr() as *mut _,
- bytes.len().try_into().unwrap(),
- 1, // read only
- flags,
- );
- if ret == -1 {
- return Err(PyErr::fetch(slf.py()));
- }
- Ok(())
- }
-}
-
-impl From<Vec<u8>> for Buffer {
- fn from(inner: Vec<u8>) -> Self {
- Self { inner }
- }
-}
-
-fn build_operator(
- scheme: od::Scheme,
- map: HashMap<String, String>,
- blocking: bool,
-) -> PyResult<od::Operator> {
- let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?;
- if blocking && !op.info().full_capability().blocking {
- let runtime = pyo3_asyncio::tokio::get_runtime();
- let _guard = runtime.enter();
- op = op.layer(od::layers::BlockingLayer::create().expect("blocking
layer must be created"));
- }
-
- Ok(op)
-}
-
-/// `Operator` is the entry for all public blocking APIs
-///
-/// Create a new blocking `Operator` with the given `scheme` and
options(`**kwargs`).
-#[pyclass(module = "opendal")]
-struct Operator(od::BlockingOperator);
-
-#[pymethods]
-impl Operator {
- #[new]
- #[pyo3(signature = (scheme, *, **map))]
- pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
- let scheme = od::Scheme::from_str(scheme)
- .map_err(|err| {
- od::Error::new(od::ErrorKind::Unexpected, "unsupported
scheme").set_source(err)
- })
- .map_err(format_pyerr)?;
- let map = map
- .map(|v| {
- v.extract::<HashMap<String, String>>()
- .expect("must be valid hashmap")
- })
- .unwrap_or_default();
-
- Ok(Operator(build_operator(scheme, map, true)?.blocking()))
- }
-
- /// Add new layers upon existing operator
- pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
- let op = layer.0.layer(self.0.clone().into());
- Ok(Self(op.blocking()))
- }
-
- /// Read the whole path into bytes.
- pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p
PyAny> {
- let buffer = self
- .0
- .read(path)
- .map_err(format_pyerr)
- .map(Buffer::from)?
- .into_py(py);
- let memoryview =
- unsafe {
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
- Ok(memoryview)
- }
-
- /// Open a file-like reader for the given path.
- pub fn open_reader(&self, path: &str) -> PyResult<Reader> {
- self.0
- .reader(path)
- .map(|reader| Reader(Some(reader)))
- .map_err(format_pyerr)
- }
-
- /// Write bytes into given path.
- #[pyo3(signature = (path, bs, **kwargs))]
- pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) ->
PyResult<()> {
- let opwrite = build_opwrite(kwargs)?;
- let mut write = self.0.write_with(path, bs).append(opwrite.append());
- if let Some(buffer) = opwrite.buffer() {
- write = write.buffer(buffer);
- }
- if let Some(content_type) = opwrite.content_type() {
- write = write.content_type(content_type);
- }
- if let Some(content_disposition) = opwrite.content_disposition() {
- write = write.content_disposition(content_disposition);
- }
- if let Some(cache_control) = opwrite.cache_control() {
- write = write.cache_control(cache_control);
- }
-
- write.call().map_err(format_pyerr)
- }
-
- /// Get current path's metadata **without cache** directly.
- pub fn stat(&self, path: &str) -> PyResult<Metadata> {
- self.0.stat(path).map_err(format_pyerr).map(Metadata)
- }
-
- /// Copy source to target.
- pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
- self.0.copy(source, target).map_err(format_pyerr)
- }
-
- /// Rename filename.
- pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
- self.0.rename(source, target).map_err(format_pyerr)
- }
-
- /// Remove all file
- pub fn remove_all(&self, path: &str) -> PyResult<()> {
- self.0.remove_all(path).map_err(format_pyerr)
- }
-
- /// Create a dir at given path.
- ///
- /// # Notes
- ///
- /// To indicate that a path is a directory, it is compulsory to include
- /// a trailing / in the path. Failure to do so may result in
- /// `NotADirectory` error being returned by OpenDAL.
- ///
- /// # Behavior
- ///
- /// - Create on existing dir will succeed.
- /// - Create dir is always recursive, works like `mkdir -p`
- pub fn create_dir(&self, path: &str) -> PyResult<()> {
- self.0.create_dir(path).map_err(format_pyerr)
- }
-
- /// Delete given path.
- ///
- /// # Notes
- ///
- /// - Delete not existing error won't return errors.
- pub fn delete(&self, path: &str) -> PyResult<()> {
- self.0.delete(path).map_err(format_pyerr)
- }
-
- /// List current dir path.
- pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
- Ok(BlockingLister(self.0.lister(path).map_err(format_pyerr)?))
- }
-
- /// List dir in flat way.
- pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
- Ok(BlockingLister(
- self.0
- .lister_with(path)
- .delimiter("")
- .call()
- .map_err(format_pyerr)?,
- ))
- }
-
- pub fn capability(&self) -> PyResult<capability::Capability> {
- Ok(capability::Capability::new(self.0.info().full_capability()))
- }
-
- fn __repr__(&self) -> String {
- let info = self.0.info();
- let name = info.name();
- if name.is_empty() {
- format!("Operator(\"{}\", root=\"{}\")", info.scheme(),
info.root())
- } else {
- format!(
- "Operator(\"{}\", root=\"{}\", name=\"{name}\")",
- info.scheme(),
- info.root()
- )
- }
- }
-}
-
-/// A file-like blocking reader.
-/// Can be used as a context manager.
-#[pyclass(module = "opendal")]
-struct Reader(Option<od::BlockingReader>);
-
-impl Reader {
- fn as_mut(&mut self) -> PyResult<&mut od::BlockingReader> {
- let reader = self
- .0
- .as_mut()
- .ok_or_else(|| PyValueError::new_err("I/O operation on closed
file."))?;
- Ok(reader)
- }
-}
-
-#[pymethods]
-impl Reader {
- /// Read and return size bytes, or if size is not given, until EOF.
- #[pyo3(signature = (size=None,))]
- pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) ->
PyResult<&'p PyAny> {
- let reader = self.as_mut()?;
- let buffer = match size {
- Some(size) => {
- let mut buffer = vec![0; size];
- reader
- .read_exact(&mut buffer)
- .map_err(|err| PyIOError::new_err(err.to_string()))?;
- buffer
- }
- None => {
- let mut buffer = Vec::new();
- reader
- .read_to_end(&mut buffer)
- .map_err(|err| PyIOError::new_err(err.to_string()))?;
- buffer
- }
- };
- let buffer = Buffer::from(buffer).into_py(py);
- let memoryview =
- unsafe {
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
- Ok(memoryview)
- }
-
- /// `Reader` doesn't support write.
- /// Raises a `NotImplementedError` if called.
- pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> {
- Err(PyNotImplementedError::new_err(
- "Reader does not support write",
- ))
- }
-
- /// Change the stream position to the given byte offset.
- /// offset is interpreted relative to the position indicated by `whence`.
- /// The default value for whence is `SEEK_SET`. Values for `whence` are:
- ///
- /// * `SEEK_SET` or `0` – start of the stream (the default); offset should
be zero or positive
- /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
- /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
- ///
- /// Return the new absolute position.
- #[pyo3(signature = (pos, whence = 0))]
- pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
- let whence = match whence {
- 0 => SeekFrom::Start(pos as u64),
- 1 => SeekFrom::Current(pos),
- 2 => SeekFrom::End(pos),
- _ => return Err(PyValueError::new_err("invalid whence")),
- };
- let reader = self.as_mut()?;
- reader
- .seek(whence)
- .map_err(|err| PyIOError::new_err(err.to_string()))
- }
-
- /// Return the current stream position.
- pub fn tell(&mut self) -> PyResult<u64> {
- let reader = self.as_mut()?;
- reader
- .stream_position()
- .map_err(|err| PyIOError::new_err(err.to_string()))
- }
-
- pub fn __enter__(slf: Py<Self>) -> Py<Self> {
- slf
- }
-
- pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject,
_traceback: PyObject) {
- drop(self.0.take());
- }
-}
-
-#[pyclass(unsendable, module = "opendal")]
-struct BlockingLister(od::BlockingLister);
-
-#[pymethods]
-impl BlockingLister {
- fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
- slf
- }
- fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
- match slf.0.next() {
- Some(Ok(entry)) => Ok(Some(Entry(entry).into_py(slf.py()))),
- Some(Err(err)) => {
- let pyerr = format_pyerr(err);
- Err(pyerr)
- }
- None => Ok(None),
- }
- }
-}
-
-#[pyclass(module = "opendal")]
-struct Entry(od::Entry);
-
-#[pymethods]
-impl Entry {
- /// Path of entry. Path is relative to operator's root.
- #[getter]
- pub fn path(&self) -> &str {
- self.0.path()
- }
-
- fn __str__(&self) -> &str {
- self.0.path()
- }
-
- fn __repr__(&self) -> String {
- format!("Entry({:?})", self.0.path())
- }
-}
-
-#[pyclass(module = "opendal")]
-struct Metadata(od::Metadata);
-
-#[pymethods]
-impl Metadata {
- #[getter]
- pub fn content_disposition(&self) -> Option<&str> {
- self.0.content_disposition()
- }
-
- /// Content length of this entry.
- #[getter]
- pub fn content_length(&self) -> u64 {
- self.0.content_length()
- }
-
- /// Content MD5 of this entry.
- #[getter]
- pub fn content_md5(&self) -> Option<&str> {
- self.0.content_md5()
- }
-
- /// Content Type of this entry.
- #[getter]
- pub fn content_type(&self) -> Option<&str> {
- self.0.content_type()
- }
-
- /// ETag of this entry.
- #[getter]
- pub fn etag(&self) -> Option<&str> {
- self.0.etag()
- }
-
- /// mode represent this entry's mode.
- #[getter]
- pub fn mode(&self) -> EntryMode {
- EntryMode(self.0.mode())
- }
-}
-
-#[pyclass(module = "opendal")]
-struct EntryMode(od::EntryMode);
-
-#[pymethods]
-impl EntryMode {
- /// Returns `True` if this is a file.
- pub fn is_file(&self) -> bool {
- self.0.is_file()
- }
-
- /// Returns `True` if this is a directory.
- pub fn is_dir(&self) -> bool {
- self.0.is_dir()
- }
-
- pub fn __repr__(&self) -> &'static str {
- match self.0 {
- od::EntryMode::FILE => "EntryMode.FILE",
- od::EntryMode::DIR => "EntryMode.DIR",
- od::EntryMode::Unknown => "EntryMode.UNKNOWN",
- }
- }
-}
-
-#[pyclass(module = "opendal")]
-struct PresignedRequest(od::raw::PresignedRequest);
-
-#[pymethods]
-impl PresignedRequest {
- /// Return the URL of this request.
- #[getter]
- pub fn url(&self) -> String {
- self.0.uri().to_string()
- }
-
- /// Return the HTTP method of this request.
- #[getter]
- pub fn method(&self) -> &str {
- self.0.method().as_str()
- }
-
- /// Return the HTTP headers of this request.
- #[getter]
- pub fn headers(&self) -> PyResult<HashMap<&str, &str>> {
- let mut headers = HashMap::new();
- for (k, v) in self.0.header().iter() {
- let k = k.as_str();
- let v = v.to_str().map_err(|err| Error::new_err(err.to_string()))?;
- if headers.insert(k, v).is_some() {
- return Err(Error::new_err("duplicate header"));
- }
- }
- Ok(headers)
- }
-}
-
-fn format_pyerr(err: od::Error) -> PyErr {
- use od::ErrorKind::*;
- match err.kind() {
- NotFound => PyFileNotFoundError::new_err(err.to_string()),
- AlreadyExists => PyFileExistsError::new_err(err.to_string()),
- PermissionDenied => PyPermissionError::new_err(err.to_string()),
- Unsupported => PyNotImplementedError::new_err(err.to_string()),
- _ => Error::new_err(err.to_string()),
- }
-}
-
-/// recognize OpWrite-equivalent options passed as python dict
-pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) ->
PyResult<od::raw::OpWrite> {
- use od::raw::OpWrite;
- let mut op = OpWrite::new();
-
- let dict = if let Some(kwargs) = kwargs {
- kwargs
- } else {
- return Ok(op);
- };
-
- if let Some(append) = dict.get_item("append") {
- let v = append
- .extract::<bool>()
- .map_err(|err| PyValueError::new_err(format!("append must be bool,
got {}", err)))?;
- op = op.with_append(v);
- }
-
- if let Some(buffer) = dict.get_item("buffer") {
- let v = buffer
- .extract::<usize>()
- .map_err(|err| PyValueError::new_err(format!("buffer must be
usize, got {}", err)))?;
- op = op.with_buffer(v);
- }
-
- if let Some(content_type) = dict.get_item("content_type") {
- let v = content_type.extract::<String>().map_err(|err| {
- PyValueError::new_err(format!("content_type must be str, got {}",
err))
- })?;
- op = op.with_content_type(v.as_str());
- }
-
- if let Some(content_disposition) = dict.get_item("content_disposition") {
- let v = content_disposition.extract::<String>().map_err(|err| {
- PyValueError::new_err(format!("content_disposition must be str,
got {}", err))
- })?;
- op = op.with_content_disposition(v.as_str());
- }
-
- if let Some(cache_control) = dict.get_item("cache_control") {
- let v = cache_control.extract::<String>().map_err(|err| {
- PyValueError::new_err(format!("cache_control must be str, got {}",
err))
- })?;
- op = op.with_cache_control(v.as_str());
- }
-
- Ok(op)
-}
+pub use layers::*;
+mod lister;
+pub use lister::*;
+mod metadata;
+pub use metadata::*;
+mod operator;
+pub use operator::*;
+mod reader;
+pub use reader::*;
+mod utils;
+pub use utils::*;
/// OpenDAL Python binding
///
@@ -580,10 +79,13 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<EntryMode>()?;
m.add_class::<Metadata>()?;
m.add_class::<PresignedRequest>()?;
- m.add_class::<capability::Capability>()?;
+ m.add_class::<Capability>()?;
m.add("Error", py.get_type::<Error>())?;
- let layers_module = layers::create_submodule(py)?;
+ // Layer module
+ let layers_module = PyModule::new(py, "layers")?;
+ layers_module.add_class::<Layer>()?;
+ layers_module.add_class::<RetryLayer>()?;
m.add_submodule(layers_module)?;
py.import("sys")?
.getattr("modules")?
diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs
new file mode 100644
index 000000000..d2e7fe89c
--- /dev/null
+++ b/bindings/python/src/lister.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use futures::TryStreamExt;
+use std::sync::Arc;
+
+use pyo3::exceptions::PyStopAsyncIteration;
+use pyo3::prelude::*;
+use pyo3_asyncio::tokio::future_into_py;
+use tokio::sync::Mutex;
+
+use crate::*;
+
+#[pyclass(unsendable, module = "opendal")]
+pub struct BlockingLister(ocore::BlockingLister);
+
+impl BlockingLister {
+ /// Create a new blocking lister.
+ pub fn new(inner: ocore::BlockingLister) -> Self {
+ Self(inner)
+ }
+}
+
+#[pymethods]
+impl BlockingLister {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+ fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
+ match slf.0.next() {
+ Some(Ok(entry)) => Ok(Some(Entry::new(entry).into_py(slf.py()))),
+ Some(Err(err)) => {
+ let pyerr = format_pyerr(err);
+ Err(pyerr)
+ }
+ None => Ok(None),
+ }
+ }
+}
+
+#[pyclass(module = "opendal")]
+pub struct AsyncLister(Arc<Mutex<ocore::Lister>>);
+
+impl AsyncLister {
+ pub fn new(lister: ocore::Lister) -> Self {
+ Self(Arc::new(Mutex::new(lister)))
+ }
+}
+
+#[pymethods]
+impl AsyncLister {
+ fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<Self> {
+ slf
+ }
+ fn __anext__(slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
+ let lister = slf.0.clone();
+ let fut = future_into_py(slf.py(), async move {
+ let mut lister = lister.lock().await;
+ let entry = lister.try_next().await.map_err(format_pyerr)?;
+ match entry {
+ Some(entry) => Ok(Python::with_gil(|py|
Entry::new(entry).into_py(py))),
+ None => Err(PyStopAsyncIteration::new_err("stream exhausted")),
+ }
+ })?;
+ Ok(Some(fut.into()))
+ }
+}
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
new file mode 100644
index 000000000..e03eeda18
--- /dev/null
+++ b/bindings/python/src/metadata.rs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+
+use crate::*;
+
+#[pyclass(module = "opendal")]
+pub struct Entry(ocore::Entry);
+
+impl Entry {
+ pub fn new(entry: ocore::Entry) -> Self {
+ Self(entry)
+ }
+}
+
+#[pymethods]
+impl Entry {
+ /// Path of entry. Path is relative to operator's root.
+ #[getter]
+ pub fn path(&self) -> &str {
+ self.0.path()
+ }
+
+ fn __str__(&self) -> &str {
+ self.0.path()
+ }
+
+ fn __repr__(&self) -> String {
+ format!("Entry({:?})", self.0.path())
+ }
+}
+
+#[pyclass(module = "opendal")]
+pub struct Metadata(ocore::Metadata);
+
+impl Metadata {
+ pub fn new(meta: ocore::Metadata) -> Self {
+ Self(meta)
+ }
+}
+
+#[pymethods]
+impl Metadata {
+ #[getter]
+ pub fn content_disposition(&self) -> Option<&str> {
+ self.0.content_disposition()
+ }
+
+ /// Content length of this entry.
+ #[getter]
+ pub fn content_length(&self) -> u64 {
+ self.0.content_length()
+ }
+
+ /// Content MD5 of this entry.
+ #[getter]
+ pub fn content_md5(&self) -> Option<&str> {
+ self.0.content_md5()
+ }
+
+ /// Content Type of this entry.
+ #[getter]
+ pub fn content_type(&self) -> Option<&str> {
+ self.0.content_type()
+ }
+
+ /// ETag of this entry.
+ #[getter]
+ pub fn etag(&self) -> Option<&str> {
+ self.0.etag()
+ }
+
+ /// mode represent this entry's mode.
+ #[getter]
+ pub fn mode(&self) -> EntryMode {
+ EntryMode(self.0.mode())
+ }
+}
+
+#[pyclass(module = "opendal")]
+pub struct EntryMode(ocore::EntryMode);
+
+#[pymethods]
+impl EntryMode {
+ /// Returns `True` if this is a file.
+ pub fn is_file(&self) -> bool {
+ self.0.is_file()
+ }
+
+ /// Returns `True` if this is a directory.
+ pub fn is_dir(&self) -> bool {
+ self.0.is_dir()
+ }
+
+ pub fn __repr__(&self) -> &'static str {
+ match self.0 {
+ ocore::EntryMode::FILE => "EntryMode.FILE",
+ ocore::EntryMode::DIR => "EntryMode.DIR",
+ ocore::EntryMode::Unknown => "EntryMode.UNKNOWN",
+ }
+ }
+}
diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/operator.rs
similarity index 52%
rename from bindings/python/src/asyncio.rs
rename to bindings/python/src/operator.rs
index 2ef6253d6..0c17b2c31 100644
--- a/bindings/python/src/asyncio.rs
+++ b/bindings/python/src/operator.rs
@@ -16,52 +16,204 @@
// under the License.
use std::collections::HashMap;
-use std::io::SeekFrom;
use std::str::FromStr;
-use std::sync::Arc;
use std::time::Duration;
-use ::opendal as od;
-use futures::TryStreamExt;
-use pyo3::exceptions::PyIOError;
-use pyo3::exceptions::PyNotImplementedError;
-use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::exceptions::PyValueError;
use pyo3::ffi;
use pyo3::prelude::*;
-use pyo3::types::PyBytes;
-use pyo3::types::PyDict;
+use pyo3::types::{PyBytes, PyDict};
use pyo3::AsPyPointer;
use pyo3_asyncio::tokio::future_into_py;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncSeekExt;
-use tokio::sync::Mutex;
-use crate::build_operator;
-use crate::build_opwrite;
-use crate::format_pyerr;
-use crate::layers;
-use crate::Buffer;
-use crate::Entry;
-use crate::Metadata;
-use crate::PresignedRequest;
+use crate::*;
+
+fn build_operator(
+ scheme: ocore::Scheme,
+ map: HashMap<String, String>,
+ blocking: bool,
+) -> PyResult<ocore::Operator> {
+ let mut op = ocore::Operator::via_map(scheme, map).map_err(format_pyerr)?;
+ if blocking && !op.info().full_capability().blocking {
+ let runtime = pyo3_asyncio::tokio::get_runtime();
+ let _guard = runtime.enter();
+ op = op
+ .layer(ocore::layers::BlockingLayer::create().expect("blocking
layer must be created"));
+ }
+
+ Ok(op)
+}
+
+/// `Operator` is the entry for all public blocking APIs
+///
+/// Create a new blocking `Operator` with the given `scheme` and
options(`**kwargs`).
+#[pyclass(module = "opendal")]
+pub struct Operator(ocore::BlockingOperator);
+
+#[pymethods]
+impl Operator {
+ #[new]
+ #[pyo3(signature = (scheme, *, **map))]
+ pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
+ let scheme = ocore::Scheme::from_str(scheme)
+ .map_err(|err| {
+ ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported
scheme")
+ .set_source(err)
+ })
+ .map_err(format_pyerr)?;
+ let map = map
+ .map(|v| {
+ v.extract::<HashMap<String, String>>()
+ .expect("must be valid hashmap")
+ })
+ .unwrap_or_default();
-use crate::capability;
+ Ok(Operator(build_operator(scheme, map, true)?.blocking()))
+ }
+
+ /// Add new layers upon existing operator
+ pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
+ let op = layer.0.layer(self.0.clone().into());
+ Ok(Self(op.blocking()))
+ }
+
+ /// Read the whole path into bytes.
+ pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p
PyAny> {
+ let buffer = self
+ .0
+ .read(path)
+ .map_err(format_pyerr)
+ .map(Buffer::from)?
+ .into_py(py);
+ let memoryview =
+ unsafe {
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
+ Ok(memoryview)
+ }
+
+ /// Open a file-like reader for the given path.
+ pub fn open_reader(&self, path: &str) -> PyResult<Reader> {
+ let r = self.0.reader(path).map_err(format_pyerr)?;
+
+ Ok(Reader::new(r))
+ }
+
+ /// Write bytes into given path.
+ #[pyo3(signature = (path, bs, **kwargs))]
+ pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) ->
PyResult<()> {
+ let opwrite = build_opwrite(kwargs)?;
+ let mut write = self.0.write_with(path, bs).append(opwrite.append());
+ if let Some(buffer) = opwrite.buffer() {
+ write = write.buffer(buffer);
+ }
+ if let Some(content_type) = opwrite.content_type() {
+ write = write.content_type(content_type);
+ }
+ if let Some(content_disposition) = opwrite.content_disposition() {
+ write = write.content_disposition(content_disposition);
+ }
+ if let Some(cache_control) = opwrite.cache_control() {
+ write = write.cache_control(cache_control);
+ }
+
+ write.call().map_err(format_pyerr)
+ }
+
+ /// Get current path's metadata **without cache** directly.
+ pub fn stat(&self, path: &str) -> PyResult<Metadata> {
+ self.0.stat(path).map_err(format_pyerr).map(Metadata::new)
+ }
+
+ /// Copy source to target.
+ pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
+ self.0.copy(source, target).map_err(format_pyerr)
+ }
+
+ /// Rename filename.
+ pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
+ self.0.rename(source, target).map_err(format_pyerr)
+ }
+
+ /// Remove all file
+ pub fn remove_all(&self, path: &str) -> PyResult<()> {
+ self.0.remove_all(path).map_err(format_pyerr)
+ }
+
+ /// Create a dir at given path.
+ ///
+ /// # Notes
+ ///
+ /// To indicate that a path is a directory, it is compulsory to include
+ /// a trailing / in the path. Failure to do so may result in
+ /// `NotADirectory` error being returned by OpenDAL.
+ ///
+ /// # Behavior
+ ///
+ /// - Create on existing dir will succeed.
+ /// - Create dir is always recursive, works like `mkdir -p`
+ pub fn create_dir(&self, path: &str) -> PyResult<()> {
+ self.0.create_dir(path).map_err(format_pyerr)
+ }
+
+ /// Delete given path.
+ ///
+ /// # Notes
+ ///
+ /// - Delete not existing error won't return errors.
+ pub fn delete(&self, path: &str) -> PyResult<()> {
+ self.0.delete(path).map_err(format_pyerr)
+ }
+
+ /// List current dir path.
+ pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
+ let l = self.0.lister(path).map_err(format_pyerr)?;
+ Ok(BlockingLister::new(l))
+ }
+
+ /// List dir in flat way.
+ pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
+ let l = self
+ .0
+ .lister_with(path)
+ .delimiter("")
+ .call()
+ .map_err(format_pyerr)?;
+ Ok(BlockingLister::new(l))
+ }
+
+ pub fn capability(&self) -> PyResult<capability::Capability> {
+ Ok(capability::Capability::new(self.0.info().full_capability()))
+ }
+
+ fn __repr__(&self) -> String {
+ let info = self.0.info();
+ let name = info.name();
+ if name.is_empty() {
+ format!("Operator(\"{}\", root=\"{}\")", info.scheme(),
info.root())
+ } else {
+ format!(
+ "Operator(\"{}\", root=\"{}\", name=\"{name}\")",
+ info.scheme(),
+ info.root()
+ )
+ }
+ }
+}
/// `AsyncOperator` is the entry for all public async APIs
///
/// Create a new `AsyncOperator` with the given `scheme` and
options(`**kwargs`).
#[pyclass(module = "opendal")]
-pub struct AsyncOperator(od::Operator);
+pub struct AsyncOperator(ocore::Operator);
#[pymethods]
impl AsyncOperator {
#[new]
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
- let scheme = od::Scheme::from_str(scheme)
+ let scheme = ocore::Scheme::from_str(scheme)
.map_err(|err| {
- od::Error::new(od::ErrorKind::Unexpected, "unsupported
scheme").set_source(err)
+ ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported
scheme")
+ .set_source(err)
})
.map_err(format_pyerr)?;
let map = map
@@ -99,10 +251,7 @@ impl AsyncOperator {
/// Open a file-like reader for the given path.
pub fn open_reader(&self, path: String) -> PyResult<AsyncReader> {
- Ok(AsyncReader::new(ReaderState::Init {
- operator: self.0.clone(),
- path,
- }))
+ Ok(AsyncReader::new(self.0.clone(), path))
}
/// Write bytes into given path.
@@ -139,7 +288,11 @@ impl AsyncOperator {
pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p
PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
- let res: Metadata =
this.stat(&path).await.map_err(format_pyerr).map(Metadata)?;
+ let res: Metadata = this
+ .stat(&path)
+ .await
+ .map_err(format_pyerr)
+ .map(Metadata::new)?;
Ok(res)
})
@@ -315,187 +468,83 @@ impl AsyncOperator {
}
}
-enum ReaderState {
- Init {
- operator: od::Operator,
- path: String,
- },
- Open(od::Reader),
- Closed,
-}
-
-impl ReaderState {
- async fn reader(&mut self) -> PyResult<&mut od::Reader> {
- let reader = match self {
- ReaderState::Init { operator, path } => {
- let reader =
operator.reader(path).await.map_err(format_pyerr)?;
- *self = ReaderState::Open(reader);
- if let ReaderState::Open(ref mut reader) = self {
- reader
- } else {
- unreachable!()
- }
- }
- ReaderState::Open(ref mut reader) => reader,
- ReaderState::Closed => {
- return Err(PyValueError::new_err("I/O operation on closed
file."));
- }
- };
- Ok(reader)
- }
-
- fn close(&mut self) {
- *self = ReaderState::Closed;
- }
-}
-
-/// A file-like async reader.
-/// Can be used as an async context manager.
-#[pyclass(module = "opendal")]
-pub struct AsyncReader(Arc<Mutex<ReaderState>>);
-
-impl AsyncReader {
- fn new(reader: ReaderState) -> Self {
- Self(Arc::new(Mutex::new(reader)))
- }
-}
-
-#[pymethods]
-impl AsyncReader {
- /// Read and return size bytes, or if size is not given, until EOF.
- pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) ->
PyResult<&'p PyAny> {
- let reader = self.0.clone();
- future_into_py(py, async move {
- let mut state = reader.lock().await;
- let reader = state.reader().await?;
- let buffer = match size {
- Some(size) => {
- let mut buffer = vec![0; size];
- reader
- .read_exact(&mut buffer)
- .await
- .map_err(|err| PyIOError::new_err(err.to_string()))?;
- buffer
- }
- None => {
- let mut buffer = Vec::new();
- reader
- .read_to_end(&mut buffer)
- .await
- .map_err(|err| PyIOError::new_err(err.to_string()))?;
- buffer
- }
- };
- Python::with_gil(|py| {
- let buffer = Buffer::from(buffer).into_py(py);
- unsafe {
- PyObject::from_owned_ptr_or_err(
- py,
- ffi::PyMemoryView_FromObject(buffer.as_ptr()),
- )
- }
- })
- })
+/// recognize OpWrite-equivalent options passed as python dict
+pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) ->
PyResult<ocore::raw::OpWrite> {
+ use ocore::raw::OpWrite;
+ let mut op = OpWrite::new();
+
+ let dict = if let Some(kwargs) = kwargs {
+ kwargs
+ } else {
+ return Ok(op);
+ };
+
+ if let Some(append) = dict.get_item("append") {
+ let v = append
+ .extract::<bool>()
+ .map_err(|err| PyValueError::new_err(format!("append must be bool,
got {}", err)))?;
+ op = op.with_append(v);
}
- /// `AsyncReader` doesn't support write.
- /// Raises a `NotImplementedError` if called.
- pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) ->
PyResult<&'p PyAny> {
- future_into_py::<_, PyObject>(py, async move {
- Err(PyNotImplementedError::new_err(
- "AsyncReader does not support write",
- ))
- })
+ if let Some(buffer) = dict.get_item("buffer") {
+ let v = buffer
+ .extract::<usize>()
+ .map_err(|err| PyValueError::new_err(format!("buffer must be
usize, got {}", err)))?;
+ op = op.with_buffer(v);
}
- /// Change the stream position to the given byte offset.
- /// offset is interpreted relative to the position indicated by `whence`.
- /// The default value for whence is `SEEK_SET`. Values for `whence` are:
- ///
- /// * `SEEK_SET` or `0` – start of the stream (the default); offset should
be zero or positive
- /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
- /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
- ///
- /// Return the new absolute position.
- #[pyo3(signature = (pos, whence = 0))]
- pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) ->
PyResult<&'p PyAny> {
- let whence = match whence {
- 0 => SeekFrom::Start(pos as u64),
- 1 => SeekFrom::Current(pos),
- 2 => SeekFrom::End(pos),
- _ => return Err(PyValueError::new_err("invalid whence")),
- };
- let reader = self.0.clone();
- future_into_py(py, async move {
- let mut state = reader.lock().await;
- let reader = state.reader().await?;
- let ret = reader
- .seek(whence)
- .await
- .map_err(|err| PyIOError::new_err(err.to_string()))?;
- Ok(Python::with_gil(|py| ret.into_py(py)))
- })
+ if let Some(content_type) = dict.get_item("content_type") {
+ let v = content_type.extract::<String>().map_err(|err| {
+ PyValueError::new_err(format!("content_type must be str, got {}",
err))
+ })?;
+ op = op.with_content_type(v.as_str());
}
- /// Return the current stream position.
- pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
- let reader = self.0.clone();
- future_into_py(py, async move {
- let mut state = reader.lock().await;
- let reader = state.reader().await?;
- let pos = reader
- .stream_position()
- .await
- .map_err(|err| PyIOError::new_err(err.to_string()))?;
- Ok(Python::with_gil(|py| pos.into_py(py)))
- })
+ if let Some(content_disposition) = dict.get_item("content_disposition") {
+ let v = content_disposition.extract::<String>().map_err(|err| {
+ PyValueError::new_err(format!("content_disposition must be str,
got {}", err))
+ })?;
+ op = op.with_content_disposition(v.as_str());
}
- fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a
PyAny> {
- let slf = slf.into_py(py);
- future_into_py(py, async move { Ok(slf) })
+ if let Some(cache_control) = dict.get_item("cache_control") {
+ let v = cache_control.extract::<String>().map_err(|err| {
+ PyValueError::new_err(format!("cache_control must be str, got {}",
err))
+ })?;
+ op = op.with_cache_control(v.as_str());
}
- fn __aexit__<'a>(
- &self,
- py: Python<'a>,
- _exc_type: &'a PyAny,
- _exc_value: &'a PyAny,
- _traceback: &'a PyAny,
- ) -> PyResult<&'a PyAny> {
- let reader = self.0.clone();
- future_into_py(py, async move {
- let mut state = reader.lock().await;
- state.close();
- Ok(())
- })
- }
+ Ok(op)
}
#[pyclass(module = "opendal")]
-struct AsyncLister(Arc<Mutex<od::Lister>>);
+pub struct PresignedRequest(ocore::raw::PresignedRequest);
-impl AsyncLister {
- fn new(lister: od::Lister) -> Self {
- Self(Arc::new(Mutex::new(lister)))
+#[pymethods]
+impl PresignedRequest {
+ /// Return the URL of this request.
+ #[getter]
+ pub fn url(&self) -> String {
+ self.0.uri().to_string()
}
-}
-#[pymethods]
-impl AsyncLister {
- fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<Self> {
- slf
- }
- fn __anext__(slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
- let lister = slf.0.clone();
- let fut = future_into_py(slf.py(), async move {
- let mut lister = lister.lock().await;
- let entry = lister.try_next().await.map_err(format_pyerr)?;
- match entry {
- Some(entry) => Ok(Python::with_gil(|py|
Entry(entry).into_py(py))),
- None => Err(PyStopAsyncIteration::new_err("stream exhausted")),
+ /// Return the HTTP method of this request.
+ #[getter]
+ pub fn method(&self) -> &str {
+ self.0.method().as_str()
+ }
+
+ /// Return the HTTP headers of this request.
+ #[getter]
+ pub fn headers(&self) -> PyResult<HashMap<&str, &str>> {
+ let mut headers = HashMap::new();
+ for (k, v) in self.0.header().iter() {
+ let k = k.as_str();
+ let v = v.to_str().map_err(|err| Error::new_err(err.to_string()))?;
+ if headers.insert(k, v).is_some() {
+ return Err(Error::new_err("duplicate header"));
}
- })?;
- Ok(Some(fut.into()))
+ }
+ Ok(headers)
}
}
diff --git a/bindings/python/src/reader.rs b/bindings/python/src/reader.rs
new file mode 100644
index 000000000..584f21932
--- /dev/null
+++ b/bindings/python/src/reader.rs
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::Read;
+use std::io::Seek;
+use std::io::SeekFrom;
+use std::sync::Arc;
+
+use futures::AsyncReadExt;
+use futures::AsyncSeekExt;
+use pyo3::exceptions::PyIOError;
+use pyo3::exceptions::PyNotImplementedError;
+use pyo3::exceptions::PyValueError;
+use pyo3::ffi;
+use pyo3::prelude::*;
+use pyo3::AsPyPointer;
+use pyo3_asyncio::tokio::future_into_py;
+use tokio::sync::Mutex;
+
+use crate::*;
+
+/// A file-like blocking reader.
+/// Can be used as a context manager.
+#[pyclass(module = "opendal")]
+pub struct Reader(Option<ocore::BlockingReader>);
+
+impl Reader {
+ pub fn new(reader: ocore::BlockingReader) -> Self {
+ Self(Some(reader))
+ }
+
+ fn as_mut(&mut self) -> PyResult<&mut ocore::BlockingReader> {
+ let reader = self
+ .0
+ .as_mut()
+ .ok_or_else(|| PyValueError::new_err("I/O operation on closed
file."))?;
+ Ok(reader)
+ }
+}
+
+#[pymethods]
+impl Reader {
+ /// Read and return size bytes, or if size is not given, until EOF.
+ #[pyo3(signature = (size=None,))]
+ pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) ->
PyResult<&'p PyAny> {
+ let reader = self.as_mut()?;
+ let buffer = match size {
+ Some(size) => {
+ let mut buffer = vec![0; size];
+ reader
+ .read_exact(&mut buffer)
+ .map_err(|err| PyIOError::new_err(err.to_string()))?;
+ buffer
+ }
+ None => {
+ let mut buffer = Vec::new();
+ reader
+ .read_to_end(&mut buffer)
+ .map_err(|err| PyIOError::new_err(err.to_string()))?;
+ buffer
+ }
+ };
+ let buffer = Buffer::from(buffer).into_py(py);
+ let memoryview =
+ unsafe {
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
+ Ok(memoryview)
+ }
+
+ /// `Reader` doesn't support write.
+ /// Raises a `NotImplementedError` if called.
+ pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> {
+ Err(PyNotImplementedError::new_err(
+ "Reader does not support write",
+ ))
+ }
+
+ /// Change the stream position to the given byte offset.
+ /// offset is interpreted relative to the position indicated by `whence`.
+ /// The default value for whence is `SEEK_SET`. Values for `whence` are:
+ ///
+ /// * `SEEK_SET` or `0` – start of the stream (the default); offset should
be zero or positive
+ /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
+ /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
+ ///
+ /// Return the new absolute position.
+ #[pyo3(signature = (pos, whence = 0))]
+ pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
+ let whence = match whence {
+ 0 => SeekFrom::Start(pos as u64),
+ 1 => SeekFrom::Current(pos),
+ 2 => SeekFrom::End(pos),
+ _ => return Err(PyValueError::new_err("invalid whence")),
+ };
+ let reader = self.as_mut()?;
+ reader
+ .seek(whence)
+ .map_err(|err| PyIOError::new_err(err.to_string()))
+ }
+
+ /// Return the current stream position.
+ pub fn tell(&mut self) -> PyResult<u64> {
+ let reader = self.as_mut()?;
+ reader
+ .stream_position()
+ .map_err(|err| PyIOError::new_err(err.to_string()))
+ }
+
+ pub fn __enter__(slf: Py<Self>) -> Py<Self> {
+ slf
+ }
+
+ pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject,
_traceback: PyObject) {
+ drop(self.0.take());
+ }
+}
+
+enum ReaderState {
+ Init {
+ operator: ocore::Operator,
+ path: String,
+ },
+ Open(ocore::Reader),
+ Closed,
+}
+
+impl ReaderState {
+ async fn reader(&mut self) -> PyResult<&mut ocore::Reader> {
+ let reader = match self {
+ ReaderState::Init { operator, path } => {
+ let reader =
operator.reader(path).await.map_err(format_pyerr)?;
+ *self = ReaderState::Open(reader);
+ if let ReaderState::Open(ref mut reader) = self {
+ reader
+ } else {
+ unreachable!()
+ }
+ }
+ ReaderState::Open(ref mut reader) => reader,
+ ReaderState::Closed => {
+ return Err(PyValueError::new_err("I/O operation on closed
file."));
+ }
+ };
+ Ok(reader)
+ }
+
+ fn close(&mut self) {
+ *self = ReaderState::Closed;
+ }
+}
+
+/// A file-like async reader.
+/// Can be used as an async context manager.
+#[pyclass(module = "opendal")]
+pub struct AsyncReader(Arc<Mutex<ReaderState>>);
+
+impl AsyncReader {
+ pub fn new(operator: ocore::Operator, path: String) -> Self {
+ Self(Arc::new(Mutex::new(ReaderState::Init { operator, path })))
+ }
+}
+
+#[pymethods]
+impl AsyncReader {
+ /// Read and return size bytes, or if size is not given, until EOF.
+ pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) ->
PyResult<&'p PyAny> {
+ let reader = self.0.clone();
+ future_into_py(py, async move {
+ let mut state = reader.lock().await;
+ let reader = state.reader().await?;
+ let buffer = match size {
+ Some(size) => {
+ let mut buffer = vec![0; size];
+ reader
+ .read_exact(&mut buffer)
+ .await
+ .map_err(|err| PyIOError::new_err(err.to_string()))?;
+ buffer
+ }
+ None => {
+ let mut buffer = Vec::new();
+ reader
+ .read_to_end(&mut buffer)
+ .await
+ .map_err(|err| PyIOError::new_err(err.to_string()))?;
+ buffer
+ }
+ };
+ Python::with_gil(|py| {
+ let buffer = Buffer::from(buffer).into_py(py);
+ unsafe {
+ PyObject::from_owned_ptr_or_err(
+ py,
+ ffi::PyMemoryView_FromObject(buffer.as_ptr()),
+ )
+ }
+ })
+ })
+ }
+
+ /// `AsyncReader` doesn't support write.
+ /// Raises a `NotImplementedError` if called.
+ pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) ->
PyResult<&'p PyAny> {
+ future_into_py::<_, PyObject>(py, async move {
+ Err(PyNotImplementedError::new_err(
+ "AsyncReader does not support write",
+ ))
+ })
+ }
+
+ /// Change the stream position to the given byte offset.
+ /// offset is interpreted relative to the position indicated by `whence`.
+ /// The default value for whence is `SEEK_SET`. Values for `whence` are:
+ ///
+ /// * `SEEK_SET` or `0` – start of the stream (the default); offset should
be zero or positive
+ /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
+ /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
+ ///
+ /// Return the new absolute position.
+ #[pyo3(signature = (pos, whence = 0))]
+ pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) ->
PyResult<&'p PyAny> {
+ let whence = match whence {
+ 0 => SeekFrom::Start(pos as u64),
+ 1 => SeekFrom::Current(pos),
+ 2 => SeekFrom::End(pos),
+ _ => return Err(PyValueError::new_err("invalid whence")),
+ };
+ let reader = self.0.clone();
+ future_into_py(py, async move {
+ let mut state = reader.lock().await;
+ let reader = state.reader().await?;
+ let ret = reader
+ .seek(whence)
+ .await
+ .map_err(|err| PyIOError::new_err(err.to_string()))?;
+ Ok(Python::with_gil(|py| ret.into_py(py)))
+ })
+ }
+
+ /// Return the current stream position.
+ pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
+ let reader = self.0.clone();
+ future_into_py(py, async move {
+ let mut state = reader.lock().await;
+ let reader = state.reader().await?;
+ let pos = reader
+ .stream_position()
+ .await
+ .map_err(|err| PyIOError::new_err(err.to_string()))?;
+ Ok(Python::with_gil(|py| pos.into_py(py)))
+ })
+ }
+
+ fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a
PyAny> {
+ let slf = slf.into_py(py);
+ future_into_py(py, async move { Ok(slf) })
+ }
+
+ fn __aexit__<'a>(
+ &self,
+ py: Python<'a>,
+ _exc_type: &'a PyAny,
+ _exc_value: &'a PyAny,
+ _traceback: &'a PyAny,
+ ) -> PyResult<&'a PyAny> {
+ let reader = self.0.clone();
+ future_into_py(py, async move {
+ let mut state = reader.lock().await;
+ state.close();
+ Ok(())
+ })
+ }
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
new file mode 100644
index 000000000..328499910
--- /dev/null
+++ b/bindings/python/src/utils.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::os::raw::c_int;
+
+use pyo3::create_exception;
+use pyo3::exceptions::PyException;
+use pyo3::exceptions::PyFileExistsError;
+use pyo3::exceptions::PyFileNotFoundError;
+use pyo3::exceptions::PyNotImplementedError;
+use pyo3::exceptions::PyPermissionError;
+use pyo3::ffi;
+use pyo3::prelude::*;
+use pyo3::AsPyPointer;
+
+use crate::*;
+
+create_exception!(opendal, Error, PyException, "OpenDAL related errors");
+
+/// A bytes-like object that implements buffer protocol.
+#[pyclass(module = "opendal")]
+pub struct Buffer {
+ inner: Vec<u8>,
+}
+
+#[pymethods]
+impl Buffer {
+ unsafe fn __getbuffer__(
+ slf: PyRefMut<Self>,
+ view: *mut ffi::Py_buffer,
+ flags: c_int,
+ ) -> PyResult<()> {
+ let bytes = slf.inner.as_slice();
+ let ret = ffi::PyBuffer_FillInfo(
+ view,
+ slf.as_ptr() as *mut _,
+ bytes.as_ptr() as *mut _,
+ bytes.len().try_into().unwrap(),
+ 1, // read only
+ flags,
+ );
+ if ret == -1 {
+ return Err(PyErr::fetch(slf.py()));
+ }
+ Ok(())
+ }
+}
+
+impl From<Vec<u8>> for Buffer {
+ fn from(inner: Vec<u8>) -> Self {
+ Self { inner }
+ }
+}
+
+pub fn format_pyerr(err: ocore::Error) -> PyErr {
+ use ocore::ErrorKind::*;
+ match err.kind() {
+ NotFound => PyFileNotFoundError::new_err(err.to_string()),
+ AlreadyExists => PyFileExistsError::new_err(err.to_string()),
+ PermissionDenied => PyPermissionError::new_err(err.to_string()),
+ Unsupported => PyNotImplementedError::new_err(err.to_string()),
+ _ => Error::new_err(err.to_string()),
+ }
+}