This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-python
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit cf92431c0cc22f0f1aba6131f8788a81a680b871
Author: Xuanwo <[email protected]>
AuthorDate: Fri Nov 3 16:38:00 2023 +0800

    refactor(bindings/python): Refactor layout for python bindings
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/python/src/layers.rs                   |   7 +-
 bindings/python/src/lib.rs                      | 537 +-----------------------
 bindings/python/src/lister.rs                   |  81 ++++
 bindings/python/src/metadata.rs                 | 117 ++++++
 bindings/python/src/{asyncio.rs => operator.rs} | 437 ++++++++++---------
 bindings/python/src/reader.rs                   | 286 +++++++++++++
 bindings/python/src/utils.rs                    |  78 ++++
 7 files changed, 827 insertions(+), 716 deletions(-)

diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs
index d270c6be4..4b78e27e4 100644
--- a/bindings/python/src/layers.rs
+++ b/bindings/python/src/layers.rs
@@ -17,10 +17,11 @@
 
 use std::time::Duration;
 
-use ::opendal as od;
 use opendal::Operator;
 use pyo3::prelude::*;
 
+use crate::*;
+
 pub trait PythonLayer: Send + Sync {
     fn layer(&self, op: Operator) -> Operator;
 }
@@ -30,7 +31,7 @@ pub struct Layer(pub Box<dyn PythonLayer>);
 
 #[pyclass(module = "opendal.layers", extends=Layer)]
 #[derive(Clone)]
-pub struct RetryLayer(od::layers::RetryLayer);
+pub struct RetryLayer(ocore::layers::RetryLayer);
 
 impl PythonLayer for RetryLayer {
     fn layer(&self, op: Operator) -> Operator {
@@ -55,7 +56,7 @@ impl RetryLayer {
         max_delay: Option<f64>,
         min_delay: Option<f64>,
     ) -> PyResult<PyClassInitializer<Self>> {
-        let mut retry = od::layers::RetryLayer::default();
+        let mut retry = ocore::layers::RetryLayer::default();
         if let Some(max_times) = max_times {
             retry = retry.with_max_times(max_times);
         }
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index c2e293fd7..1903cf424 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -18,526 +18,25 @@
 // Suppress clippy::redundant_closure warning from pyo3 generated code
 #![allow(clippy::redundant_closure)]
 
-use std::collections::HashMap;
-use std::io::Read;
-use std::io::Seek;
-use std::io::SeekFrom;
-use std::os::raw::c_int;
-use std::str::FromStr;
-
-use ::opendal as od;
-use pyo3::create_exception;
-use pyo3::exceptions::PyException;
-use pyo3::exceptions::PyFileExistsError;
-use pyo3::exceptions::PyFileNotFoundError;
-use pyo3::exceptions::PyIOError;
-use pyo3::exceptions::PyNotImplementedError;
-use pyo3::exceptions::PyPermissionError;
-use pyo3::exceptions::PyValueError;
-use pyo3::ffi;
+// expose the opendal rust core as `core`.
+// We will use `ocore::Xxx` to represents all types from opendal rust core.
+pub use ::opendal as ocore;
 use pyo3::prelude::*;
-use pyo3::types::PyDict;
-use pyo3::AsPyPointer;
 
-mod asyncio;
 mod capability;
+pub use capability::*;
 mod layers;
-
-use crate::asyncio::*;
-
-create_exception!(opendal, Error, PyException, "OpenDAL related errors");
-
-/// A bytes-like object that implements buffer protocol.
-#[pyclass(module = "opendal")]
-struct Buffer {
-    inner: Vec<u8>,
-}
-
-#[pymethods]
-impl Buffer {
-    unsafe fn __getbuffer__(
-        slf: PyRefMut<Self>,
-        view: *mut ffi::Py_buffer,
-        flags: c_int,
-    ) -> PyResult<()> {
-        let bytes = slf.inner.as_slice();
-        let ret = ffi::PyBuffer_FillInfo(
-            view,
-            slf.as_ptr() as *mut _,
-            bytes.as_ptr() as *mut _,
-            bytes.len().try_into().unwrap(),
-            1, // read only
-            flags,
-        );
-        if ret == -1 {
-            return Err(PyErr::fetch(slf.py()));
-        }
-        Ok(())
-    }
-}
-
-impl From<Vec<u8>> for Buffer {
-    fn from(inner: Vec<u8>) -> Self {
-        Self { inner }
-    }
-}
-
-fn build_operator(
-    scheme: od::Scheme,
-    map: HashMap<String, String>,
-    blocking: bool,
-) -> PyResult<od::Operator> {
-    let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?;
-    if blocking && !op.info().full_capability().blocking {
-        let runtime = pyo3_asyncio::tokio::get_runtime();
-        let _guard = runtime.enter();
-        op = op.layer(od::layers::BlockingLayer::create().expect("blocking 
layer must be created"));
-    }
-
-    Ok(op)
-}
-
-/// `Operator` is the entry for all public blocking APIs
-///
-/// Create a new blocking `Operator` with the given `scheme` and 
options(`**kwargs`).
-#[pyclass(module = "opendal")]
-struct Operator(od::BlockingOperator);
-
-#[pymethods]
-impl Operator {
-    #[new]
-    #[pyo3(signature = (scheme, *, **map))]
-    pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
-        let scheme = od::Scheme::from_str(scheme)
-            .map_err(|err| {
-                od::Error::new(od::ErrorKind::Unexpected, "unsupported 
scheme").set_source(err)
-            })
-            .map_err(format_pyerr)?;
-        let map = map
-            .map(|v| {
-                v.extract::<HashMap<String, String>>()
-                    .expect("must be valid hashmap")
-            })
-            .unwrap_or_default();
-
-        Ok(Operator(build_operator(scheme, map, true)?.blocking()))
-    }
-
-    /// Add new layers upon existing operator
-    pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
-        let op = layer.0.layer(self.0.clone().into());
-        Ok(Self(op.blocking()))
-    }
-
-    /// Read the whole path into bytes.
-    pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p 
PyAny> {
-        let buffer = self
-            .0
-            .read(path)
-            .map_err(format_pyerr)
-            .map(Buffer::from)?
-            .into_py(py);
-        let memoryview =
-            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
-        Ok(memoryview)
-    }
-
-    /// Open a file-like reader for the given path.
-    pub fn open_reader(&self, path: &str) -> PyResult<Reader> {
-        self.0
-            .reader(path)
-            .map(|reader| Reader(Some(reader)))
-            .map_err(format_pyerr)
-    }
-
-    /// Write bytes into given path.
-    #[pyo3(signature = (path, bs, **kwargs))]
-    pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) -> 
PyResult<()> {
-        let opwrite = build_opwrite(kwargs)?;
-        let mut write = self.0.write_with(path, bs).append(opwrite.append());
-        if let Some(buffer) = opwrite.buffer() {
-            write = write.buffer(buffer);
-        }
-        if let Some(content_type) = opwrite.content_type() {
-            write = write.content_type(content_type);
-        }
-        if let Some(content_disposition) = opwrite.content_disposition() {
-            write = write.content_disposition(content_disposition);
-        }
-        if let Some(cache_control) = opwrite.cache_control() {
-            write = write.cache_control(cache_control);
-        }
-
-        write.call().map_err(format_pyerr)
-    }
-
-    /// Get current path's metadata **without cache** directly.
-    pub fn stat(&self, path: &str) -> PyResult<Metadata> {
-        self.0.stat(path).map_err(format_pyerr).map(Metadata)
-    }
-
-    /// Copy source to target.
-    pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
-        self.0.copy(source, target).map_err(format_pyerr)
-    }
-
-    /// Rename filename.
-    pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
-        self.0.rename(source, target).map_err(format_pyerr)
-    }
-
-    /// Remove all file
-    pub fn remove_all(&self, path: &str) -> PyResult<()> {
-        self.0.remove_all(path).map_err(format_pyerr)
-    }
-
-    /// Create a dir at given path.
-    ///
-    /// # Notes
-    ///
-    /// To indicate that a path is a directory, it is compulsory to include
-    /// a trailing / in the path. Failure to do so may result in
-    /// `NotADirectory` error being returned by OpenDAL.
-    ///
-    /// # Behavior
-    ///
-    /// - Create on existing dir will succeed.
-    /// - Create dir is always recursive, works like `mkdir -p`
-    pub fn create_dir(&self, path: &str) -> PyResult<()> {
-        self.0.create_dir(path).map_err(format_pyerr)
-    }
-
-    /// Delete given path.
-    ///
-    /// # Notes
-    ///
-    /// - Delete not existing error won't return errors.
-    pub fn delete(&self, path: &str) -> PyResult<()> {
-        self.0.delete(path).map_err(format_pyerr)
-    }
-
-    /// List current dir path.
-    pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
-        Ok(BlockingLister(self.0.lister(path).map_err(format_pyerr)?))
-    }
-
-    /// List dir in flat way.
-    pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
-        Ok(BlockingLister(
-            self.0
-                .lister_with(path)
-                .delimiter("")
-                .call()
-                .map_err(format_pyerr)?,
-        ))
-    }
-
-    pub fn capability(&self) -> PyResult<capability::Capability> {
-        Ok(capability::Capability::new(self.0.info().full_capability()))
-    }
-
-    fn __repr__(&self) -> String {
-        let info = self.0.info();
-        let name = info.name();
-        if name.is_empty() {
-            format!("Operator(\"{}\", root=\"{}\")", info.scheme(), 
info.root())
-        } else {
-            format!(
-                "Operator(\"{}\", root=\"{}\", name=\"{name}\")",
-                info.scheme(),
-                info.root()
-            )
-        }
-    }
-}
-
-/// A file-like blocking reader.
-/// Can be used as a context manager.
-#[pyclass(module = "opendal")]
-struct Reader(Option<od::BlockingReader>);
-
-impl Reader {
-    fn as_mut(&mut self) -> PyResult<&mut od::BlockingReader> {
-        let reader = self
-            .0
-            .as_mut()
-            .ok_or_else(|| PyValueError::new_err("I/O operation on closed 
file."))?;
-        Ok(reader)
-    }
-}
-
-#[pymethods]
-impl Reader {
-    /// Read and return size bytes, or if size is not given, until EOF.
-    #[pyo3(signature = (size=None,))]
-    pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> 
PyResult<&'p PyAny> {
-        let reader = self.as_mut()?;
-        let buffer = match size {
-            Some(size) => {
-                let mut buffer = vec![0; size];
-                reader
-                    .read_exact(&mut buffer)
-                    .map_err(|err| PyIOError::new_err(err.to_string()))?;
-                buffer
-            }
-            None => {
-                let mut buffer = Vec::new();
-                reader
-                    .read_to_end(&mut buffer)
-                    .map_err(|err| PyIOError::new_err(err.to_string()))?;
-                buffer
-            }
-        };
-        let buffer = Buffer::from(buffer).into_py(py);
-        let memoryview =
-            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
-        Ok(memoryview)
-    }
-
-    /// `Reader` doesn't support write.
-    /// Raises a `NotImplementedError` if called.
-    pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> {
-        Err(PyNotImplementedError::new_err(
-            "Reader does not support write",
-        ))
-    }
-
-    /// Change the stream position to the given byte offset.
-    /// offset is interpreted relative to the position indicated by `whence`.
-    /// The default value for whence is `SEEK_SET`. Values for `whence` are:
-    ///
-    /// * `SEEK_SET` or `0` – start of the stream (the default); offset should 
be zero or positive
-    /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
-    /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
-    ///
-    /// Return the new absolute position.
-    #[pyo3(signature = (pos, whence = 0))]
-    pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
-        let whence = match whence {
-            0 => SeekFrom::Start(pos as u64),
-            1 => SeekFrom::Current(pos),
-            2 => SeekFrom::End(pos),
-            _ => return Err(PyValueError::new_err("invalid whence")),
-        };
-        let reader = self.as_mut()?;
-        reader
-            .seek(whence)
-            .map_err(|err| PyIOError::new_err(err.to_string()))
-    }
-
-    /// Return the current stream position.
-    pub fn tell(&mut self) -> PyResult<u64> {
-        let reader = self.as_mut()?;
-        reader
-            .stream_position()
-            .map_err(|err| PyIOError::new_err(err.to_string()))
-    }
-
-    pub fn __enter__(slf: Py<Self>) -> Py<Self> {
-        slf
-    }
-
-    pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, 
_traceback: PyObject) {
-        drop(self.0.take());
-    }
-}
-
-#[pyclass(unsendable, module = "opendal")]
-struct BlockingLister(od::BlockingLister);
-
-#[pymethods]
-impl BlockingLister {
-    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
-        slf
-    }
-    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
-        match slf.0.next() {
-            Some(Ok(entry)) => Ok(Some(Entry(entry).into_py(slf.py()))),
-            Some(Err(err)) => {
-                let pyerr = format_pyerr(err);
-                Err(pyerr)
-            }
-            None => Ok(None),
-        }
-    }
-}
-
-#[pyclass(module = "opendal")]
-struct Entry(od::Entry);
-
-#[pymethods]
-impl Entry {
-    /// Path of entry. Path is relative to operator's root.
-    #[getter]
-    pub fn path(&self) -> &str {
-        self.0.path()
-    }
-
-    fn __str__(&self) -> &str {
-        self.0.path()
-    }
-
-    fn __repr__(&self) -> String {
-        format!("Entry({:?})", self.0.path())
-    }
-}
-
-#[pyclass(module = "opendal")]
-struct Metadata(od::Metadata);
-
-#[pymethods]
-impl Metadata {
-    #[getter]
-    pub fn content_disposition(&self) -> Option<&str> {
-        self.0.content_disposition()
-    }
-
-    /// Content length of this entry.
-    #[getter]
-    pub fn content_length(&self) -> u64 {
-        self.0.content_length()
-    }
-
-    /// Content MD5 of this entry.
-    #[getter]
-    pub fn content_md5(&self) -> Option<&str> {
-        self.0.content_md5()
-    }
-
-    /// Content Type of this entry.
-    #[getter]
-    pub fn content_type(&self) -> Option<&str> {
-        self.0.content_type()
-    }
-
-    /// ETag of this entry.
-    #[getter]
-    pub fn etag(&self) -> Option<&str> {
-        self.0.etag()
-    }
-
-    /// mode represent this entry's mode.
-    #[getter]
-    pub fn mode(&self) -> EntryMode {
-        EntryMode(self.0.mode())
-    }
-}
-
-#[pyclass(module = "opendal")]
-struct EntryMode(od::EntryMode);
-
-#[pymethods]
-impl EntryMode {
-    /// Returns `True` if this is a file.
-    pub fn is_file(&self) -> bool {
-        self.0.is_file()
-    }
-
-    /// Returns `True` if this is a directory.
-    pub fn is_dir(&self) -> bool {
-        self.0.is_dir()
-    }
-
-    pub fn __repr__(&self) -> &'static str {
-        match self.0 {
-            od::EntryMode::FILE => "EntryMode.FILE",
-            od::EntryMode::DIR => "EntryMode.DIR",
-            od::EntryMode::Unknown => "EntryMode.UNKNOWN",
-        }
-    }
-}
-
-#[pyclass(module = "opendal")]
-struct PresignedRequest(od::raw::PresignedRequest);
-
-#[pymethods]
-impl PresignedRequest {
-    /// Return the URL of this request.
-    #[getter]
-    pub fn url(&self) -> String {
-        self.0.uri().to_string()
-    }
-
-    /// Return the HTTP method of this request.
-    #[getter]
-    pub fn method(&self) -> &str {
-        self.0.method().as_str()
-    }
-
-    /// Return the HTTP headers of this request.
-    #[getter]
-    pub fn headers(&self) -> PyResult<HashMap<&str, &str>> {
-        let mut headers = HashMap::new();
-        for (k, v) in self.0.header().iter() {
-            let k = k.as_str();
-            let v = v.to_str().map_err(|err| Error::new_err(err.to_string()))?;
-            if headers.insert(k, v).is_some() {
-                return Err(Error::new_err("duplicate header"));
-            }
-        }
-        Ok(headers)
-    }
-}
-
-fn format_pyerr(err: od::Error) -> PyErr {
-    use od::ErrorKind::*;
-    match err.kind() {
-        NotFound => PyFileNotFoundError::new_err(err.to_string()),
-        AlreadyExists => PyFileExistsError::new_err(err.to_string()),
-        PermissionDenied => PyPermissionError::new_err(err.to_string()),
-        Unsupported => PyNotImplementedError::new_err(err.to_string()),
-        _ => Error::new_err(err.to_string()),
-    }
-}
-
-/// recognize OpWrite-equivalent options passed as python dict
-pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) -> 
PyResult<od::raw::OpWrite> {
-    use od::raw::OpWrite;
-    let mut op = OpWrite::new();
-
-    let dict = if let Some(kwargs) = kwargs {
-        kwargs
-    } else {
-        return Ok(op);
-    };
-
-    if let Some(append) = dict.get_item("append") {
-        let v = append
-            .extract::<bool>()
-            .map_err(|err| PyValueError::new_err(format!("append must be bool, 
got {}", err)))?;
-        op = op.with_append(v);
-    }
-
-    if let Some(buffer) = dict.get_item("buffer") {
-        let v = buffer
-            .extract::<usize>()
-            .map_err(|err| PyValueError::new_err(format!("buffer must be 
usize, got {}", err)))?;
-        op = op.with_buffer(v);
-    }
-
-    if let Some(content_type) = dict.get_item("content_type") {
-        let v = content_type.extract::<String>().map_err(|err| {
-            PyValueError::new_err(format!("content_type must be str, got {}", 
err))
-        })?;
-        op = op.with_content_type(v.as_str());
-    }
-
-    if let Some(content_disposition) = dict.get_item("content_disposition") {
-        let v = content_disposition.extract::<String>().map_err(|err| {
-            PyValueError::new_err(format!("content_disposition must be str, 
got {}", err))
-        })?;
-        op = op.with_content_disposition(v.as_str());
-    }
-
-    if let Some(cache_control) = dict.get_item("cache_control") {
-        let v = cache_control.extract::<String>().map_err(|err| {
-            PyValueError::new_err(format!("cache_control must be str, got {}", 
err))
-        })?;
-        op = op.with_cache_control(v.as_str());
-    }
-
-    Ok(op)
-}
+pub use layers::*;
+mod lister;
+pub use lister::*;
+mod metadata;
+pub use metadata::*;
+mod operator;
+pub use operator::*;
+mod reader;
+pub use reader::*;
+mod utils;
+pub use utils::*;
 
 /// OpenDAL Python binding
 ///
@@ -580,13 +79,13 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
     m.add_class::<EntryMode>()?;
     m.add_class::<Metadata>()?;
     m.add_class::<PresignedRequest>()?;
-    m.add_class::<capability::Capability>()?;
+    m.add_class::<Capability>()?;
     m.add("Error", py.get_type::<Error>())?;
 
     // Layer module
     let layers_module = PyModule::new(py, "layers")?;
-    layers_module.add_class::<layers::Layer>()?;
-    layers_module.add_class::<layers::RetryLayer>()?;
+    layers_module.add_class::<Layer>()?;
+    layers_module.add_class::<RetryLayer>()?;
     m.add_submodule(layers_module)?;
     py.import("sys")?
         .getattr("modules")?
diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs
new file mode 100644
index 000000000..d2e7fe89c
--- /dev/null
+++ b/bindings/python/src/lister.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use futures::TryStreamExt;
+use std::sync::Arc;
+
+use pyo3::exceptions::PyStopAsyncIteration;
+use pyo3::prelude::*;
+use pyo3_asyncio::tokio::future_into_py;
+use tokio::sync::Mutex;
+
+use crate::*;
+
+#[pyclass(unsendable, module = "opendal")]
+pub struct BlockingLister(ocore::BlockingLister);
+
+impl BlockingLister {
+    /// Create a new blocking lister.
+    pub fn new(inner: ocore::BlockingLister) -> Self {
+        Self(inner)
+    }
+}
+
+#[pymethods]
+impl BlockingLister {
+    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
+        match slf.0.next() {
+            Some(Ok(entry)) => Ok(Some(Entry::new(entry).into_py(slf.py()))),
+            Some(Err(err)) => {
+                let pyerr = format_pyerr(err);
+                Err(pyerr)
+            }
+            None => Ok(None),
+        }
+    }
+}
+
+#[pyclass(module = "opendal")]
+pub struct AsyncLister(Arc<Mutex<ocore::Lister>>);
+
+impl AsyncLister {
+    pub fn new(lister: ocore::Lister) -> Self {
+        Self(Arc::new(Mutex::new(lister)))
+    }
+}
+
+#[pymethods]
+impl AsyncLister {
+    fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<Self> {
+        slf
+    }
+    fn __anext__(slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
+        let lister = slf.0.clone();
+        let fut = future_into_py(slf.py(), async move {
+            let mut lister = lister.lock().await;
+            let entry = lister.try_next().await.map_err(format_pyerr)?;
+            match entry {
+                Some(entry) => Ok(Python::with_gil(|py| 
Entry::new(entry).into_py(py))),
+                None => Err(PyStopAsyncIteration::new_err("stream exhausted")),
+            }
+        })?;
+        Ok(Some(fut.into()))
+    }
+}
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
new file mode 100644
index 000000000..e03eeda18
--- /dev/null
+++ b/bindings/python/src/metadata.rs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+
+use crate::*;
+
+#[pyclass(module = "opendal")]
+pub struct Entry(ocore::Entry);
+
+impl Entry {
+    pub fn new(entry: ocore::Entry) -> Self {
+        Self(entry)
+    }
+}
+
+#[pymethods]
+impl Entry {
+    /// Path of entry. Path is relative to operator's root.
+    #[getter]
+    pub fn path(&self) -> &str {
+        self.0.path()
+    }
+
+    fn __str__(&self) -> &str {
+        self.0.path()
+    }
+
+    fn __repr__(&self) -> String {
+        format!("Entry({:?})", self.0.path())
+    }
+}
+
+#[pyclass(module = "opendal")]
+pub struct Metadata(ocore::Metadata);
+
+impl Metadata {
+    pub fn new(meta: ocore::Metadata) -> Self {
+        Self(meta)
+    }
+}
+
+#[pymethods]
+impl Metadata {
+    #[getter]
+    pub fn content_disposition(&self) -> Option<&str> {
+        self.0.content_disposition()
+    }
+
+    /// Content length of this entry.
+    #[getter]
+    pub fn content_length(&self) -> u64 {
+        self.0.content_length()
+    }
+
+    /// Content MD5 of this entry.
+    #[getter]
+    pub fn content_md5(&self) -> Option<&str> {
+        self.0.content_md5()
+    }
+
+    /// Content Type of this entry.
+    #[getter]
+    pub fn content_type(&self) -> Option<&str> {
+        self.0.content_type()
+    }
+
+    /// ETag of this entry.
+    #[getter]
+    pub fn etag(&self) -> Option<&str> {
+        self.0.etag()
+    }
+
+    /// mode represent this entry's mode.
+    #[getter]
+    pub fn mode(&self) -> EntryMode {
+        EntryMode(self.0.mode())
+    }
+}
+
+#[pyclass(module = "opendal")]
+pub struct EntryMode(ocore::EntryMode);
+
+#[pymethods]
+impl EntryMode {
+    /// Returns `True` if this is a file.
+    pub fn is_file(&self) -> bool {
+        self.0.is_file()
+    }
+
+    /// Returns `True` if this is a directory.
+    pub fn is_dir(&self) -> bool {
+        self.0.is_dir()
+    }
+
+    pub fn __repr__(&self) -> &'static str {
+        match self.0 {
+            ocore::EntryMode::FILE => "EntryMode.FILE",
+            ocore::EntryMode::DIR => "EntryMode.DIR",
+            ocore::EntryMode::Unknown => "EntryMode.UNKNOWN",
+        }
+    }
+}
diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/operator.rs
similarity index 52%
rename from bindings/python/src/asyncio.rs
rename to bindings/python/src/operator.rs
index 2ef6253d6..0c17b2c31 100644
--- a/bindings/python/src/asyncio.rs
+++ b/bindings/python/src/operator.rs
@@ -16,52 +16,204 @@
 // under the License.
 
 use std::collections::HashMap;
-use std::io::SeekFrom;
 use std::str::FromStr;
-use std::sync::Arc;
 use std::time::Duration;
 
-use ::opendal as od;
-use futures::TryStreamExt;
-use pyo3::exceptions::PyIOError;
-use pyo3::exceptions::PyNotImplementedError;
-use pyo3::exceptions::PyStopAsyncIteration;
 use pyo3::exceptions::PyValueError;
 use pyo3::ffi;
 use pyo3::prelude::*;
-use pyo3::types::PyBytes;
-use pyo3::types::PyDict;
+use pyo3::types::{PyBytes, PyDict};
 use pyo3::AsPyPointer;
 use pyo3_asyncio::tokio::future_into_py;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncSeekExt;
-use tokio::sync::Mutex;
 
-use crate::build_operator;
-use crate::build_opwrite;
-use crate::format_pyerr;
-use crate::layers;
-use crate::Buffer;
-use crate::Entry;
-use crate::Metadata;
-use crate::PresignedRequest;
+use crate::*;
+
+fn build_operator(
+    scheme: ocore::Scheme,
+    map: HashMap<String, String>,
+    blocking: bool,
+) -> PyResult<ocore::Operator> {
+    let mut op = ocore::Operator::via_map(scheme, map).map_err(format_pyerr)?;
+    if blocking && !op.info().full_capability().blocking {
+        let runtime = pyo3_asyncio::tokio::get_runtime();
+        let _guard = runtime.enter();
+        op = op
+            .layer(ocore::layers::BlockingLayer::create().expect("blocking 
layer must be created"));
+    }
+
+    Ok(op)
+}
+
+/// `Operator` is the entry for all public blocking APIs
+///
+/// Create a new blocking `Operator` with the given `scheme` and 
options(`**kwargs`).
+#[pyclass(module = "opendal")]
+pub struct Operator(ocore::BlockingOperator);
+
+#[pymethods]
+impl Operator {
+    #[new]
+    #[pyo3(signature = (scheme, *, **map))]
+    pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
+        let scheme = ocore::Scheme::from_str(scheme)
+            .map_err(|err| {
+                ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported 
scheme")
+                    .set_source(err)
+            })
+            .map_err(format_pyerr)?;
+        let map = map
+            .map(|v| {
+                v.extract::<HashMap<String, String>>()
+                    .expect("must be valid hashmap")
+            })
+            .unwrap_or_default();
 
-use crate::capability;
+        Ok(Operator(build_operator(scheme, map, true)?.blocking()))
+    }
+
+    /// Add new layers upon existing operator
+    pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
+        let op = layer.0.layer(self.0.clone().into());
+        Ok(Self(op.blocking()))
+    }
+
+    /// Read the whole path into bytes.
+    pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p 
PyAny> {
+        let buffer = self
+            .0
+            .read(path)
+            .map_err(format_pyerr)
+            .map(Buffer::from)?
+            .into_py(py);
+        let memoryview =
+            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
+        Ok(memoryview)
+    }
+
+    /// Open a file-like reader for the given path.
+    pub fn open_reader(&self, path: &str) -> PyResult<Reader> {
+        let r = self.0.reader(path).map_err(format_pyerr)?;
+
+        Ok(Reader::new(r))
+    }
+
+    /// Write bytes into given path.
+    #[pyo3(signature = (path, bs, **kwargs))]
+    pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) -> 
PyResult<()> {
+        let opwrite = build_opwrite(kwargs)?;
+        let mut write = self.0.write_with(path, bs).append(opwrite.append());
+        if let Some(buffer) = opwrite.buffer() {
+            write = write.buffer(buffer);
+        }
+        if let Some(content_type) = opwrite.content_type() {
+            write = write.content_type(content_type);
+        }
+        if let Some(content_disposition) = opwrite.content_disposition() {
+            write = write.content_disposition(content_disposition);
+        }
+        if let Some(cache_control) = opwrite.cache_control() {
+            write = write.cache_control(cache_control);
+        }
+
+        write.call().map_err(format_pyerr)
+    }
+
+    /// Get current path's metadata **without cache** directly.
+    pub fn stat(&self, path: &str) -> PyResult<Metadata> {
+        self.0.stat(path).map_err(format_pyerr).map(Metadata::new)
+    }
+
+    /// Copy source to target.
+    pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
+        self.0.copy(source, target).map_err(format_pyerr)
+    }
+
+    /// Rename filename.
+    pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
+        self.0.rename(source, target).map_err(format_pyerr)
+    }
+
+    /// Remove all file
+    pub fn remove_all(&self, path: &str) -> PyResult<()> {
+        self.0.remove_all(path).map_err(format_pyerr)
+    }
+
+    /// Create a dir at given path.
+    ///
+    /// # Notes
+    ///
+    /// To indicate that a path is a directory, it is compulsory to include
+    /// a trailing / in the path. Failure to do so may result in
+    /// `NotADirectory` error being returned by OpenDAL.
+    ///
+    /// # Behavior
+    ///
+    /// - Create on existing dir will succeed.
+    /// - Create dir is always recursive, works like `mkdir -p`
+    pub fn create_dir(&self, path: &str) -> PyResult<()> {
+        self.0.create_dir(path).map_err(format_pyerr)
+    }
+
+    /// Delete given path.
+    ///
+    /// # Notes
+    ///
+    /// - Delete not existing error won't return errors.
+    pub fn delete(&self, path: &str) -> PyResult<()> {
+        self.0.delete(path).map_err(format_pyerr)
+    }
+
+    /// List current dir path.
+    pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
+        let l = self.0.lister(path).map_err(format_pyerr)?;
+        Ok(BlockingLister::new(l))
+    }
+
+    /// List dir in flat way.
+    pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
+        let l = self
+            .0
+            .lister_with(path)
+            .delimiter("")
+            .call()
+            .map_err(format_pyerr)?;
+        Ok(BlockingLister::new(l))
+    }
+
+    pub fn capability(&self) -> PyResult<capability::Capability> {
+        Ok(capability::Capability::new(self.0.info().full_capability()))
+    }
+
+    fn __repr__(&self) -> String {
+        let info = self.0.info();
+        let name = info.name();
+        if name.is_empty() {
+            format!("Operator(\"{}\", root=\"{}\")", info.scheme(), 
info.root())
+        } else {
+            format!(
+                "Operator(\"{}\", root=\"{}\", name=\"{name}\")",
+                info.scheme(),
+                info.root()
+            )
+        }
+    }
+}
 
 /// `AsyncOperator` is the entry for all public async APIs
 ///
 /// Create a new `AsyncOperator` with the given `scheme` and 
options(`**kwargs`).
 #[pyclass(module = "opendal")]
-pub struct AsyncOperator(od::Operator);
+pub struct AsyncOperator(ocore::Operator);
 
 #[pymethods]
 impl AsyncOperator {
     #[new]
     #[pyo3(signature = (scheme, *,  **map))]
     pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
-        let scheme = od::Scheme::from_str(scheme)
+        let scheme = ocore::Scheme::from_str(scheme)
             .map_err(|err| {
-                od::Error::new(od::ErrorKind::Unexpected, "unsupported 
scheme").set_source(err)
+                ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported 
scheme")
+                    .set_source(err)
             })
             .map_err(format_pyerr)?;
         let map = map
@@ -99,10 +251,7 @@ impl AsyncOperator {
 
     /// Open a file-like reader for the given path.
     pub fn open_reader(&self, path: String) -> PyResult<AsyncReader> {
-        Ok(AsyncReader::new(ReaderState::Init {
-            operator: self.0.clone(),
-            path,
-        }))
+        Ok(AsyncReader::new(self.0.clone(), path))
     }
 
     /// Write bytes into given path.
@@ -139,7 +288,11 @@ impl AsyncOperator {
     pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p 
PyAny> {
         let this = self.0.clone();
         future_into_py(py, async move {
-            let res: Metadata = 
this.stat(&path).await.map_err(format_pyerr).map(Metadata)?;
+            let res: Metadata = this
+                .stat(&path)
+                .await
+                .map_err(format_pyerr)
+                .map(Metadata::new)?;
 
             Ok(res)
         })
@@ -315,187 +468,83 @@ impl AsyncOperator {
     }
 }
 
-enum ReaderState {
-    Init {
-        operator: od::Operator,
-        path: String,
-    },
-    Open(od::Reader),
-    Closed,
-}
-
-impl ReaderState {
-    async fn reader(&mut self) -> PyResult<&mut od::Reader> {
-        let reader = match self {
-            ReaderState::Init { operator, path } => {
-                let reader = 
operator.reader(path).await.map_err(format_pyerr)?;
-                *self = ReaderState::Open(reader);
-                if let ReaderState::Open(ref mut reader) = self {
-                    reader
-                } else {
-                    unreachable!()
-                }
-            }
-            ReaderState::Open(ref mut reader) => reader,
-            ReaderState::Closed => {
-                return Err(PyValueError::new_err("I/O operation on closed 
file."));
-            }
-        };
-        Ok(reader)
-    }
-
-    fn close(&mut self) {
-        *self = ReaderState::Closed;
-    }
-}
-
-/// A file-like async reader.
-/// Can be used as an async context manager.
-#[pyclass(module = "opendal")]
-pub struct AsyncReader(Arc<Mutex<ReaderState>>);
-
-impl AsyncReader {
-    fn new(reader: ReaderState) -> Self {
-        Self(Arc::new(Mutex::new(reader)))
-    }
-}
-
-#[pymethods]
-impl AsyncReader {
-    /// Read and return size bytes, or if size is not given, until EOF.
-    pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> 
PyResult<&'p PyAny> {
-        let reader = self.0.clone();
-        future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            let reader = state.reader().await?;
-            let buffer = match size {
-                Some(size) => {
-                    let mut buffer = vec![0; size];
-                    reader
-                        .read_exact(&mut buffer)
-                        .await
-                        .map_err(|err| PyIOError::new_err(err.to_string()))?;
-                    buffer
-                }
-                None => {
-                    let mut buffer = Vec::new();
-                    reader
-                        .read_to_end(&mut buffer)
-                        .await
-                        .map_err(|err| PyIOError::new_err(err.to_string()))?;
-                    buffer
-                }
-            };
-            Python::with_gil(|py| {
-                let buffer = Buffer::from(buffer).into_py(py);
-                unsafe {
-                    PyObject::from_owned_ptr_or_err(
-                        py,
-                        ffi::PyMemoryView_FromObject(buffer.as_ptr()),
-                    )
-                }
-            })
-        })
+/// recognize OpWrite-equivalent options passed as python dict
+pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) -> 
PyResult<ocore::raw::OpWrite> {
+    use ocore::raw::OpWrite;
+    let mut op = OpWrite::new();
+
+    let dict = if let Some(kwargs) = kwargs {
+        kwargs
+    } else {
+        return Ok(op);
+    };
+
+    if let Some(append) = dict.get_item("append") {
+        let v = append
+            .extract::<bool>()
+            .map_err(|err| PyValueError::new_err(format!("append must be bool, 
got {}", err)))?;
+        op = op.with_append(v);
     }
 
-    /// `AsyncReader` doesn't support write.
-    /// Raises a `NotImplementedError` if called.
-    pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) -> 
PyResult<&'p PyAny> {
-        future_into_py::<_, PyObject>(py, async move {
-            Err(PyNotImplementedError::new_err(
-                "AsyncReader does not support write",
-            ))
-        })
+    if let Some(buffer) = dict.get_item("buffer") {
+        let v = buffer
+            .extract::<usize>()
+            .map_err(|err| PyValueError::new_err(format!("buffer must be 
usize, got {}", err)))?;
+        op = op.with_buffer(v);
     }
 
-    /// Change the stream position to the given byte offset.
-    /// offset is interpreted relative to the position indicated by `whence`.
-    /// The default value for whence is `SEEK_SET`. Values for `whence` are:
-    ///
-    /// * `SEEK_SET` or `0` – start of the stream (the default); offset should 
be zero or positive
-    /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
-    /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
-    ///
-    /// Return the new absolute position.
-    #[pyo3(signature = (pos, whence = 0))]
-    pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> 
PyResult<&'p PyAny> {
-        let whence = match whence {
-            0 => SeekFrom::Start(pos as u64),
-            1 => SeekFrom::Current(pos),
-            2 => SeekFrom::End(pos),
-            _ => return Err(PyValueError::new_err("invalid whence")),
-        };
-        let reader = self.0.clone();
-        future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            let reader = state.reader().await?;
-            let ret = reader
-                .seek(whence)
-                .await
-                .map_err(|err| PyIOError::new_err(err.to_string()))?;
-            Ok(Python::with_gil(|py| ret.into_py(py)))
-        })
+    if let Some(content_type) = dict.get_item("content_type") {
+        let v = content_type.extract::<String>().map_err(|err| {
+            PyValueError::new_err(format!("content_type must be str, got {}", 
err))
+        })?;
+        op = op.with_content_type(v.as_str());
     }
 
-    /// Return the current stream position.
-    pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
-        let reader = self.0.clone();
-        future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            let reader = state.reader().await?;
-            let pos = reader
-                .stream_position()
-                .await
-                .map_err(|err| PyIOError::new_err(err.to_string()))?;
-            Ok(Python::with_gil(|py| pos.into_py(py)))
-        })
+    if let Some(content_disposition) = dict.get_item("content_disposition") {
+        let v = content_disposition.extract::<String>().map_err(|err| {
+            PyValueError::new_err(format!("content_disposition must be str, 
got {}", err))
+        })?;
+        op = op.with_content_disposition(v.as_str());
     }
 
-    fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a 
PyAny> {
-        let slf = slf.into_py(py);
-        future_into_py(py, async move { Ok(slf) })
+    if let Some(cache_control) = dict.get_item("cache_control") {
+        let v = cache_control.extract::<String>().map_err(|err| {
+            PyValueError::new_err(format!("cache_control must be str, got {}", 
err))
+        })?;
+        op = op.with_cache_control(v.as_str());
     }
 
-    fn __aexit__<'a>(
-        &self,
-        py: Python<'a>,
-        _exc_type: &'a PyAny,
-        _exc_value: &'a PyAny,
-        _traceback: &'a PyAny,
-    ) -> PyResult<&'a PyAny> {
-        let reader = self.0.clone();
-        future_into_py(py, async move {
-            let mut state = reader.lock().await;
-            state.close();
-            Ok(())
-        })
-    }
+    Ok(op)
 }
 
 #[pyclass(module = "opendal")]
-struct AsyncLister(Arc<Mutex<od::Lister>>);
+pub struct PresignedRequest(ocore::raw::PresignedRequest);
 
-impl AsyncLister {
-    fn new(lister: od::Lister) -> Self {
-        Self(Arc::new(Mutex::new(lister)))
+#[pymethods]
+impl PresignedRequest {
+    /// Return the URL of this request.
+    #[getter]
+    pub fn url(&self) -> String {
+        self.0.uri().to_string()
     }
-}
 
-#[pymethods]
-impl AsyncLister {
-    fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<Self> {
-        slf
-    }
-    fn __anext__(slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
-        let lister = slf.0.clone();
-        let fut = future_into_py(slf.py(), async move {
-            let mut lister = lister.lock().await;
-            let entry = lister.try_next().await.map_err(format_pyerr)?;
-            match entry {
-                Some(entry) => Ok(Python::with_gil(|py| 
Entry(entry).into_py(py))),
-                None => Err(PyStopAsyncIteration::new_err("stream exhausted")),
+    /// Return the HTTP method of this request.
+    #[getter]
+    pub fn method(&self) -> &str {
+        self.0.method().as_str()
+    }
+
+    /// Return the HTTP headers of this request.
+    #[getter]
+    pub fn headers(&self) -> PyResult<HashMap<&str, &str>> {
+        let mut headers = HashMap::new();
+        for (k, v) in self.0.header().iter() {
+            let k = k.as_str();
+            let v = v.to_str().map_err(|err| Error::new_err(err.to_string()))?;
+            if headers.insert(k, v).is_some() {
+                return Err(Error::new_err("duplicate header"));
             }
-        })?;
-        Ok(Some(fut.into()))
+        }
+        Ok(headers)
     }
 }
diff --git a/bindings/python/src/reader.rs b/bindings/python/src/reader.rs
new file mode 100644
index 000000000..584f21932
--- /dev/null
+++ b/bindings/python/src/reader.rs
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::Read;
+use std::io::Seek;
+use std::io::SeekFrom;
+use std::sync::Arc;
+
+use futures::AsyncReadExt;
+use futures::AsyncSeekExt;
+use pyo3::exceptions::PyIOError;
+use pyo3::exceptions::PyNotImplementedError;
+use pyo3::exceptions::PyValueError;
+use pyo3::ffi;
+use pyo3::prelude::*;
+use pyo3::AsPyPointer;
+use pyo3_asyncio::tokio::future_into_py;
+use tokio::sync::Mutex;
+
+use crate::*;
+
+/// A file-like blocking reader.
+/// Can be used as a context manager.
+#[pyclass(module = "opendal")]
+pub struct Reader(Option<ocore::BlockingReader>);
+
+impl Reader {
+    pub fn new(reader: ocore::BlockingReader) -> Self {
+        Self(Some(reader))
+    }
+
+    fn as_mut(&mut self) -> PyResult<&mut ocore::BlockingReader> {
+        let reader = self
+            .0
+            .as_mut()
+            .ok_or_else(|| PyValueError::new_err("I/O operation on closed 
file."))?;
+        Ok(reader)
+    }
+}
+
+#[pymethods]
+impl Reader {
+    /// Read and return size bytes, or if size is not given, until EOF.
+    #[pyo3(signature = (size=None,))]
+    pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> 
PyResult<&'p PyAny> {
+        let reader = self.as_mut()?;
+        let buffer = match size {
+            Some(size) => {
+                let mut buffer = vec![0; size];
+                reader
+                    .read_exact(&mut buffer)
+                    .map_err(|err| PyIOError::new_err(err.to_string()))?;
+                buffer
+            }
+            None => {
+                let mut buffer = Vec::new();
+                reader
+                    .read_to_end(&mut buffer)
+                    .map_err(|err| PyIOError::new_err(err.to_string()))?;
+                buffer
+            }
+        };
+        let buffer = Buffer::from(buffer).into_py(py);
+        let memoryview =
+            unsafe { 
py.from_owned_ptr_or_err(ffi::PyMemoryView_FromObject(buffer.as_ptr()))? };
+        Ok(memoryview)
+    }
+
+    /// `Reader` doesn't support write.
+    /// Raises a `NotImplementedError` if called.
+    pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> {
+        Err(PyNotImplementedError::new_err(
+            "Reader does not support write",
+        ))
+    }
+
+    /// Change the stream position to the given byte offset.
+    /// offset is interpreted relative to the position indicated by `whence`.
+    /// The default value for whence is `SEEK_SET`. Values for `whence` are:
+    ///
+    /// * `SEEK_SET` or `0` – start of the stream (the default); offset should 
be zero or positive
+    /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
+    /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
+    ///
+    /// Return the new absolute position.
+    #[pyo3(signature = (pos, whence = 0))]
+    pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
+        let whence = match whence {
+            0 => SeekFrom::Start(pos as u64),
+            1 => SeekFrom::Current(pos),
+            2 => SeekFrom::End(pos),
+            _ => return Err(PyValueError::new_err("invalid whence")),
+        };
+        let reader = self.as_mut()?;
+        reader
+            .seek(whence)
+            .map_err(|err| PyIOError::new_err(err.to_string()))
+    }
+
+    /// Return the current stream position.
+    pub fn tell(&mut self) -> PyResult<u64> {
+        let reader = self.as_mut()?;
+        reader
+            .stream_position()
+            .map_err(|err| PyIOError::new_err(err.to_string()))
+    }
+
+    pub fn __enter__(slf: Py<Self>) -> Py<Self> {
+        slf
+    }
+
+    pub fn __exit__(&mut self, _exc_type: PyObject, _exc_value: PyObject, 
_traceback: PyObject) {
+        drop(self.0.take());
+    }
+}
+
+enum ReaderState {
+    Init {
+        operator: ocore::Operator,
+        path: String,
+    },
+    Open(ocore::Reader),
+    Closed,
+}
+
+impl ReaderState {
+    async fn reader(&mut self) -> PyResult<&mut ocore::Reader> {
+        let reader = match self {
+            ReaderState::Init { operator, path } => {
+                let reader = 
operator.reader(path).await.map_err(format_pyerr)?;
+                *self = ReaderState::Open(reader);
+                if let ReaderState::Open(ref mut reader) = self {
+                    reader
+                } else {
+                    unreachable!()
+                }
+            }
+            ReaderState::Open(ref mut reader) => reader,
+            ReaderState::Closed => {
+                return Err(PyValueError::new_err("I/O operation on closed 
file."));
+            }
+        };
+        Ok(reader)
+    }
+
+    fn close(&mut self) {
+        *self = ReaderState::Closed;
+    }
+}
+
+/// A file-like async reader.
+/// Can be used as an async context manager.
+#[pyclass(module = "opendal")]
+pub struct AsyncReader(Arc<Mutex<ReaderState>>);
+
+impl AsyncReader {
+    pub fn new(operator: ocore::Operator, path: String) -> Self {
+        Self(Arc::new(Mutex::new(ReaderState::Init { operator, path })))
+    }
+}
+
+#[pymethods]
+impl AsyncReader {
+    /// Read and return size bytes, or if size is not given, until EOF.
+    pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> 
PyResult<&'p PyAny> {
+        let reader = self.0.clone();
+        future_into_py(py, async move {
+            let mut state = reader.lock().await;
+            let reader = state.reader().await?;
+            let buffer = match size {
+                Some(size) => {
+                    let mut buffer = vec![0; size];
+                    reader
+                        .read_exact(&mut buffer)
+                        .await
+                        .map_err(|err| PyIOError::new_err(err.to_string()))?;
+                    buffer
+                }
+                None => {
+                    let mut buffer = Vec::new();
+                    reader
+                        .read_to_end(&mut buffer)
+                        .await
+                        .map_err(|err| PyIOError::new_err(err.to_string()))?;
+                    buffer
+                }
+            };
+            Python::with_gil(|py| {
+                let buffer = Buffer::from(buffer).into_py(py);
+                unsafe {
+                    PyObject::from_owned_ptr_or_err(
+                        py,
+                        ffi::PyMemoryView_FromObject(buffer.as_ptr()),
+                    )
+                }
+            })
+        })
+    }
+
+    /// `AsyncReader` doesn't support write.
+    /// Raises a `NotImplementedError` if called.
+    pub fn write<'p>(&'p mut self, py: Python<'p>, _bs: &'p [u8]) -> 
PyResult<&'p PyAny> {
+        future_into_py::<_, PyObject>(py, async move {
+            Err(PyNotImplementedError::new_err(
+                "AsyncReader does not support write",
+            ))
+        })
+    }
+
+    /// Change the stream position to the given byte offset.
+    /// offset is interpreted relative to the position indicated by `whence`.
+    /// The default value for whence is `SEEK_SET`. Values for `whence` are:
+    ///
+    /// * `SEEK_SET` or `0` – start of the stream (the default); offset should 
be zero or positive
+    /// * `SEEK_CUR` or `1` – current stream position; offset may be negative
+    /// * `SEEK_END` or `2` – end of the stream; offset is usually negative
+    ///
+    /// Return the new absolute position.
+    #[pyo3(signature = (pos, whence = 0))]
+    pub fn seek<'p>(&'p mut self, py: Python<'p>, pos: i64, whence: u8) -> 
PyResult<&'p PyAny> {
+        let whence = match whence {
+            0 => SeekFrom::Start(pos as u64),
+            1 => SeekFrom::Current(pos),
+            2 => SeekFrom::End(pos),
+            _ => return Err(PyValueError::new_err("invalid whence")),
+        };
+        let reader = self.0.clone();
+        future_into_py(py, async move {
+            let mut state = reader.lock().await;
+            let reader = state.reader().await?;
+            let ret = reader
+                .seek(whence)
+                .await
+                .map_err(|err| PyIOError::new_err(err.to_string()))?;
+            Ok(Python::with_gil(|py| ret.into_py(py)))
+        })
+    }
+
+    /// Return the current stream position.
+    pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
+        let reader = self.0.clone();
+        future_into_py(py, async move {
+            let mut state = reader.lock().await;
+            let reader = state.reader().await?;
+            let pos = reader
+                .stream_position()
+                .await
+                .map_err(|err| PyIOError::new_err(err.to_string()))?;
+            Ok(Python::with_gil(|py| pos.into_py(py)))
+        })
+    }
+
+    fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a 
PyAny> {
+        let slf = slf.into_py(py);
+        future_into_py(py, async move { Ok(slf) })
+    }
+
+    fn __aexit__<'a>(
+        &self,
+        py: Python<'a>,
+        _exc_type: &'a PyAny,
+        _exc_value: &'a PyAny,
+        _traceback: &'a PyAny,
+    ) -> PyResult<&'a PyAny> {
+        let reader = self.0.clone();
+        future_into_py(py, async move {
+            let mut state = reader.lock().await;
+            state.close();
+            Ok(())
+        })
+    }
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
new file mode 100644
index 000000000..328499910
--- /dev/null
+++ b/bindings/python/src/utils.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::os::raw::c_int;
+
+use pyo3::create_exception;
+use pyo3::exceptions::PyException;
+use pyo3::exceptions::PyFileExistsError;
+use pyo3::exceptions::PyFileNotFoundError;
+use pyo3::exceptions::PyNotImplementedError;
+use pyo3::exceptions::PyPermissionError;
+use pyo3::ffi;
+use pyo3::prelude::*;
+use pyo3::AsPyPointer;
+
+use crate::*;
+
+create_exception!(opendal, Error, PyException, "OpenDAL related errors");
+
+/// A bytes-like object that implements buffer protocol.
+#[pyclass(module = "opendal")]
+pub struct Buffer {
+    inner: Vec<u8>,
+}
+
+#[pymethods]
+impl Buffer {
+    unsafe fn __getbuffer__(
+        slf: PyRefMut<Self>,
+        view: *mut ffi::Py_buffer,
+        flags: c_int,
+    ) -> PyResult<()> {
+        let bytes = slf.inner.as_slice();
+        let ret = ffi::PyBuffer_FillInfo(
+            view,
+            slf.as_ptr() as *mut _,
+            bytes.as_ptr() as *mut _,
+            bytes.len().try_into().unwrap(),
+            1, // read only
+            flags,
+        );
+        if ret == -1 {
+            return Err(PyErr::fetch(slf.py()));
+        }
+        Ok(())
+    }
+}
+
+impl From<Vec<u8>> for Buffer {
+    fn from(inner: Vec<u8>) -> Self {
+        Self { inner }
+    }
+}
+
+pub fn format_pyerr(err: ocore::Error) -> PyErr {
+    use ocore::ErrorKind::*;
+    match err.kind() {
+        NotFound => PyFileNotFoundError::new_err(err.to_string()),
+        AlreadyExists => PyFileExistsError::new_err(err.to_string()),
+        PermissionDenied => PyPermissionError::new_err(err.to_string()),
+        Unsupported => PyNotImplementedError::new_err(err.to_string()),
+        _ => Error::new_err(err.to_string()),
+    }
+}

Reply via email to