This is an automated email from the ASF dual-hosted git repository.
manjusaka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 73bbb8591 feat(bindings/python): support pickle [de]serialization for
Operator (#5324)
73bbb8591 is described below
commit 73bbb859172859f7938edf8c4e615a01600334fd
Author: TennyZhuang <[email protected]>
AuthorDate: Sat Nov 16 23:16:45 2024 +0800
feat(bindings/python): support pickle [de]serialization for Operator (#5324)
* feat(bindings/python): support pickle [de]serialization for Operator
* feat(core): add new cap shared
* add a missing file
* refine tests
* fix C binding
* fix java binding
* refine tests
* fix nodejs binding
---
bindings/python/src/file.rs | 1 +
bindings/python/src/operator.rs | 132 +++++++++++++++------
.../{test_pickle.py => test_async_pickle_types.py} | 35 +++---
.../tests/{test_pickle.py => test_pickle_rw.py} | 0
.../{test_pickle.py => test_sync_pickle_types.py} | 34 +++---
5 files changed, 123 insertions(+), 79 deletions(-)
diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index b23c6d290..374db32de 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -350,6 +350,7 @@ impl AsyncFile {
#[pymethods]
impl AsyncFile {
/// Read and return at most size bytes, or if size is not given, until EOF.
+ #[pyo3(signature = (size=None))]
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) ->
PyResult<Bound<PyAny>> {
let state = self.0.clone();
diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs
index 0e6c51ce1..c412cdc91 100644
--- a/bindings/python/src/operator.rs
+++ b/bindings/python/src/operator.rs
@@ -22,6 +22,7 @@ use std::time::Duration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::types::PyDict;
+use pyo3::types::PyTuple;
use pyo3_async_runtimes::tokio::future_into_py;
use crate::*;
@@ -45,7 +46,11 @@ fn build_operator(
///
/// Create a new blocking `Operator` with the given `scheme` and
options(`**kwargs`).
#[pyclass(module = "opendal")]
-pub struct Operator(ocore::BlockingOperator);
+pub struct Operator {
+ core: ocore::BlockingOperator,
+ __scheme: ocore::Scheme,
+ __map: HashMap<String, String>,
+}
#[pymethods]
impl Operator {
@@ -65,18 +70,26 @@ impl Operator {
})
.unwrap_or_default();
- Ok(Operator(build_operator(scheme, map)?.blocking()))
+ Ok(Operator {
+ core: build_operator(scheme.clone(), map.clone())?.blocking(),
+ __scheme: scheme,
+ __map: map,
+ })
}
/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
- let op = layer.0.layer(self.0.clone().into());
- Ok(Self(op.blocking()))
+ let op = layer.0.layer(self.core.clone().into());
+ Ok(Self {
+ core: op.blocking(),
+ __scheme: self.__scheme.clone(),
+ __map: self.__map.clone(),
+ })
}
/// Open a file-like reader for the given path.
pub fn open(&self, path: String, mode: String) -> PyResult<File> {
- let this = self.0.clone();
+ let this = self.core.clone();
if mode == "rb" {
let r = this
.reader(&path)
@@ -96,7 +109,7 @@ impl Operator {
/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: &str) ->
PyResult<Bound<PyAny>> {
- let buffer = self.0.read(path).map_err(format_pyerr)?.to_vec();
+ let buffer = self.core.read(path).map_err(format_pyerr)?.to_vec();
Buffer::new(buffer).into_bytes_ref(py)
}
@@ -104,7 +117,7 @@ impl Operator {
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<WriteOptions>)
-> PyResult<()> {
let kwargs = kwargs.unwrap_or_default();
- let mut write = self.0.write_with(path, bs).append(kwargs.append);
+ let mut write = self.core.write_with(path, bs).append(kwargs.append);
if let Some(chunk) = kwargs.chunk {
write = write.chunk(chunk);
}
@@ -123,22 +136,25 @@ impl Operator {
/// Get current path's metadata **without cache** directly.
pub fn stat(&self, path: &str) -> PyResult<Metadata> {
- self.0.stat(path).map_err(format_pyerr).map(Metadata::new)
+ self.core
+ .stat(path)
+ .map_err(format_pyerr)
+ .map(Metadata::new)
}
/// Copy source to target.
pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
- self.0.copy(source, target).map_err(format_pyerr)
+ self.core.copy(source, target).map_err(format_pyerr)
}
/// Rename filename.
pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
- self.0.rename(source, target).map_err(format_pyerr)
+ self.core.rename(source, target).map_err(format_pyerr)
}
/// Remove all file
pub fn remove_all(&self, path: &str) -> PyResult<()> {
- self.0.remove_all(path).map_err(format_pyerr)
+ self.core.remove_all(path).map_err(format_pyerr)
}
/// Create a dir at given path.
@@ -154,7 +170,7 @@ impl Operator {
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir(&self, path: &str) -> PyResult<()> {
- self.0.create_dir(path).map_err(format_pyerr)
+ self.core.create_dir(path).map_err(format_pyerr)
}
/// Delete given path.
@@ -163,19 +179,19 @@ impl Operator {
///
/// - Delete not existing error won't return errors.
pub fn delete(&self, path: &str) -> PyResult<()> {
- self.0.delete(path).map_err(format_pyerr)
+ self.core.delete(path).map_err(format_pyerr)
}
/// List current dir path.
pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
- let l = self.0.lister(path).map_err(format_pyerr)?;
+ let l = self.core.lister(path).map_err(format_pyerr)?;
Ok(BlockingLister::new(l))
}
/// List dir in flat way.
pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
let l = self
- .0
+ .core
.lister_with(path)
.recursive(true)
.call()
@@ -184,15 +200,21 @@ impl Operator {
}
pub fn capability(&self) -> PyResult<capability::Capability> {
- Ok(capability::Capability::new(self.0.info().full_capability()))
+ Ok(capability::Capability::new(
+ self.core.info().full_capability(),
+ ))
}
pub fn to_async_operator(&self) -> PyResult<AsyncOperator> {
- Ok(AsyncOperator(self.0.clone().into()))
+ Ok(AsyncOperator {
+ core: self.core.clone().into(),
+ __scheme: self.__scheme.clone(),
+ __map: self.__map.clone(),
+ })
}
fn __repr__(&self) -> String {
- let info = self.0.info();
+ let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!("Operator(\"{}\", root=\"{}\")", info.scheme(),
info.root())
@@ -204,13 +226,24 @@ impl Operator {
)
}
}
+
+ fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
+ let args = vec![self.__scheme.to_string().to_object(py)];
+ let args = PyTuple::new_bound(py, args);
+ let kwargs = self.__map.clone().into_py(py);
+ Ok(PyTuple::new_bound(py, [args.to_object(py),
kwargs.to_object(py)]).to_object(py))
+ }
}
/// `AsyncOperator` is the entry for all public async APIs
///
/// Create a new `AsyncOperator` with the given `scheme` and
options(`**kwargs`).
#[pyclass(module = "opendal")]
-pub struct AsyncOperator(ocore::Operator);
+pub struct AsyncOperator {
+ core: ocore::Operator,
+ __scheme: ocore::Scheme,
+ __map: HashMap<String, String>,
+}
#[pymethods]
impl AsyncOperator {
@@ -230,13 +263,21 @@ impl AsyncOperator {
})
.unwrap_or_default();
- Ok(AsyncOperator(build_operator(scheme, map)?))
+ Ok(AsyncOperator {
+ core: build_operator(scheme.clone(), map.clone())?.into(),
+ __scheme: scheme,
+ __map: map,
+ })
}
/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
- let op = layer.0.layer(self.0.clone());
- Ok(Self(op))
+ let op = layer.0.layer(self.core.clone());
+ Ok(Self {
+ core: op,
+ __scheme: self.__scheme.clone(),
+ __map: self.__map.clone(),
+ })
}
/// Open a file-like reader for the given path.
@@ -246,7 +287,7 @@ impl AsyncOperator {
path: String,
mode: String,
) -> PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
if mode == "rb" {
@@ -271,7 +312,7 @@ impl AsyncOperator {
/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let res: Vec<u8> =
this.read(&path).await.map_err(format_pyerr)?.to_vec();
Python::with_gil(|py| Buffer::new(res).into_bytes(py))
@@ -288,7 +329,7 @@ impl AsyncOperator {
kwargs: Option<WriteOptions>,
) -> PyResult<Bound<PyAny>> {
let kwargs = kwargs.unwrap_or_default();
- let this = self.0.clone();
+ let this = self.core.clone();
let bs = bs.as_bytes().to_vec();
future_into_py(py, async move {
let mut write = this.write_with(&path, bs).append(kwargs.append);
@@ -310,7 +351,7 @@ impl AsyncOperator {
/// Get current path's metadata **without cache** directly.
pub fn stat<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let res: Metadata = this
.stat(&path)
@@ -329,7 +370,7 @@ impl AsyncOperator {
source: String,
target: String,
) -> PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
this.copy(&source, &target).await.map_err(format_pyerr)
})
@@ -342,7 +383,7 @@ impl AsyncOperator {
source: String,
target: String,
) -> PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
this.rename(&source, &target).await.map_err(format_pyerr)
})
@@ -350,7 +391,7 @@ impl AsyncOperator {
/// Remove all file
pub fn remove_all<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
this.remove_all(&path).await.map_err(format_pyerr)
})
@@ -369,7 +410,7 @@ impl AsyncOperator {
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
this.create_dir(&path).await.map_err(format_pyerr)
})
@@ -381,7 +422,7 @@ impl AsyncOperator {
///
/// - Delete not existing error won't return errors.
pub fn delete<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(
py,
async move { this.delete(&path).await.map_err(format_pyerr) },
@@ -390,7 +431,7 @@ impl AsyncOperator {
/// List current dir path.
pub fn list<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let lister = this.lister(&path).await.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py|
AsyncLister::new(lister).into_py(py));
@@ -400,7 +441,7 @@ impl AsyncOperator {
/// List dir in flat way.
pub fn scan<'p>(&'p self, py: Python<'p>, path: String) ->
PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let lister = this
.lister_with(&path)
@@ -419,7 +460,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_stat(&path, Duration::from_secs(expire_second))
@@ -438,7 +479,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_read(&path, Duration::from_secs(expire_second))
@@ -457,7 +498,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
- let this = self.0.clone();
+ let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_write(&path, Duration::from_secs(expire_second))
@@ -470,15 +511,21 @@ impl AsyncOperator {
}
pub fn capability(&self) -> PyResult<capability::Capability> {
- Ok(capability::Capability::new(self.0.info().full_capability()))
+ Ok(capability::Capability::new(
+ self.core.info().full_capability(),
+ ))
}
pub fn to_operator(&self) -> PyResult<Operator> {
- Ok(Operator(self.0.clone().blocking()))
+ Ok(Operator {
+ core: self.core.clone().blocking(),
+ __scheme: self.__scheme.clone(),
+ __map: self.__map.clone(),
+ })
}
fn __repr__(&self) -> String {
- let info = self.0.info();
+ let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!(
@@ -494,6 +541,13 @@ impl AsyncOperator {
)
}
}
+
+ fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
+ let args = vec![self.__scheme.to_string().to_object(py)];
+ let args = PyTuple::new_bound(py, args);
+ let kwargs = self.__map.clone().into_py(py);
+ Ok(PyTuple::new_bound(py, [args.to_object(py),
kwargs.to_object(py)]).to_object(py))
+ }
}
#[pyclass(module = "opendal")]
diff --git a/bindings/python/tests/test_pickle.py
b/bindings/python/tests/test_async_pickle_types.py
similarity index 57%
copy from bindings/python/tests/test_pickle.py
copy to bindings/python/tests/test_async_pickle_types.py
index cba001628..c163be3c2 100644
--- a/bindings/python/tests/test_pickle.py
+++ b/bindings/python/tests/test_async_pickle_types.py
@@ -17,31 +17,26 @@
import pytest
import pickle
-from datetime import datetime
+from random import randint
from uuid import uuid4
+import os
[email protected]_capability("read", "write", "delete")
-def test_sync_file_pickle(service_name, operator, async_operator):
[email protected]
[email protected]_capability("read", "write", "delete", "shared")
+async def test_operator_pickle(service_name, operator, async_operator):
"""
- Test pickle streaming serialization and deserialization using OpenDAL
operator.
+ Test AsyncOperator's pickle serialization and deserialization.
"""
- data = {
- "a": 1,
- "b": "hello",
- "c": [1, 2, 3],
- "d": {"e": 4},
- "f": None,
- "g": b"hello\nworld",
- "h": set([1, 2, 3]),
- "i": 1.23,
- "j": True,
- "k": datetime.strptime("2024-01-01", "%Y-%m-%d"),
- }
+ size = randint(1, 1024)
filename = f"random_file_{str(uuid4())}"
- with operator.open(filename, "wb") as f:
- pickle.dump(data, f)
+ content = os.urandom(size)
+ await async_operator.write(filename, content)
- with operator.open(filename, "rb") as f:
- assert pickle.load(f) == data
+ serialized = pickle.dumps(async_operator)
+
+ deserialized = pickle.loads(serialized)
+ assert await deserialized.read(filename) == content
+
+ await async_operator.delete(filename)
diff --git a/bindings/python/tests/test_pickle.py
b/bindings/python/tests/test_pickle_rw.py
similarity index 100%
copy from bindings/python/tests/test_pickle.py
copy to bindings/python/tests/test_pickle_rw.py
diff --git a/bindings/python/tests/test_pickle.py
b/bindings/python/tests/test_sync_pickle_types.py
similarity index 57%
rename from bindings/python/tests/test_pickle.py
rename to bindings/python/tests/test_sync_pickle_types.py
index cba001628..337ebfa6b 100644
--- a/bindings/python/tests/test_pickle.py
+++ b/bindings/python/tests/test_sync_pickle_types.py
@@ -17,31 +17,25 @@
import pytest
import pickle
-from datetime import datetime
+from random import randint
from uuid import uuid4
+import os
[email protected]_capability("read", "write", "delete")
-def test_sync_file_pickle(service_name, operator, async_operator):
[email protected]_capability("read", "write", "delete", "shared")
+def test_operator_pickle(service_name, operator, async_operator):
"""
- Test pickle streaming serialization and deserialization using OpenDAL
operator.
+ Test Operator's pickle serialization and deserialization.
"""
- data = {
- "a": 1,
- "b": "hello",
- "c": [1, 2, 3],
- "d": {"e": 4},
- "f": None,
- "g": b"hello\nworld",
- "h": set([1, 2, 3]),
- "i": 1.23,
- "j": True,
- "k": datetime.strptime("2024-01-01", "%Y-%m-%d"),
- }
+ size = randint(1, 1024)
filename = f"random_file_{str(uuid4())}"
- with operator.open(filename, "wb") as f:
- pickle.dump(data, f)
+ content = os.urandom(size)
+ operator.write(filename, content)
- with operator.open(filename, "rb") as f:
- assert pickle.load(f) == data
+ serialized = pickle.dumps(operator)
+
+ deserialized = pickle.loads(serialized)
+ assert deserialized.read(filename) == content
+
+ operator.delete(filename)