This is an automated email from the ASF dual-hosted git repository.

manjusaka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 73bbb8591 feat(bindings/python): support pickle [de]serialization for 
Operator (#5324)
73bbb8591 is described below

commit 73bbb859172859f7938edf8c4e615a01600334fd
Author: TennyZhuang <[email protected]>
AuthorDate: Sat Nov 16 23:16:45 2024 +0800

    feat(bindings/python): support pickle [de]serialization for Operator (#5324)
    
    * feat(bindings/python): support pickle [de]serialization for Operator
    
    * feat(core): add new cap shared
    
    * add a missing file
    
    * refine tests
    
    * fix C binding
    
    * fix java binding
    
    * refine tests
    
    * fix nodejs binding
---
 bindings/python/src/file.rs                        |   1 +
 bindings/python/src/operator.rs                    | 132 +++++++++++++++------
 .../{test_pickle.py => test_async_pickle_types.py} |  35 +++---
 .../tests/{test_pickle.py => test_pickle_rw.py}    |   0
 .../{test_pickle.py => test_sync_pickle_types.py}  |  34 +++---
 5 files changed, 123 insertions(+), 79 deletions(-)

diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index b23c6d290..374db32de 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -350,6 +350,7 @@ impl AsyncFile {
 #[pymethods]
 impl AsyncFile {
     /// Read and return at most size bytes, or if size is not given, until EOF.
+    #[pyo3(signature = (size=None))]
     pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> 
PyResult<Bound<PyAny>> {
         let state = self.0.clone();
 
diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs
index 0e6c51ce1..c412cdc91 100644
--- a/bindings/python/src/operator.rs
+++ b/bindings/python/src/operator.rs
@@ -22,6 +22,7 @@ use std::time::Duration;
 use pyo3::prelude::*;
 use pyo3::types::PyBytes;
 use pyo3::types::PyDict;
+use pyo3::types::PyTuple;
 use pyo3_async_runtimes::tokio::future_into_py;
 
 use crate::*;
@@ -45,7 +46,11 @@ fn build_operator(
 ///
 /// Create a new blocking `Operator` with the given `scheme` and 
options(`**kwargs`).
 #[pyclass(module = "opendal")]
-pub struct Operator(ocore::BlockingOperator);
+pub struct Operator {
+    core: ocore::BlockingOperator,
+    __scheme: ocore::Scheme,
+    __map: HashMap<String, String>,
+}
 
 #[pymethods]
 impl Operator {
@@ -65,18 +70,26 @@ impl Operator {
             })
             .unwrap_or_default();
 
-        Ok(Operator(build_operator(scheme, map)?.blocking()))
+        Ok(Operator {
+            core: build_operator(scheme.clone(), map.clone())?.blocking(),
+            __scheme: scheme,
+            __map: map,
+        })
     }
 
     /// Add new layers upon existing operator
     pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
-        let op = layer.0.layer(self.0.clone().into());
-        Ok(Self(op.blocking()))
+        let op = layer.0.layer(self.core.clone().into());
+        Ok(Self {
+            core: op.blocking(),
+            __scheme: self.__scheme.clone(),
+            __map: self.__map.clone(),
+        })
     }
 
     /// Open a file-like reader for the given path.
     pub fn open(&self, path: String, mode: String) -> PyResult<File> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         if mode == "rb" {
             let r = this
                 .reader(&path)
@@ -96,7 +109,7 @@ impl Operator {
 
     /// Read the whole path into bytes.
     pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> 
PyResult<Bound<PyAny>> {
-        let buffer = self.0.read(path).map_err(format_pyerr)?.to_vec();
+        let buffer = self.core.read(path).map_err(format_pyerr)?.to_vec();
         Buffer::new(buffer).into_bytes_ref(py)
     }
 
@@ -104,7 +117,7 @@ impl Operator {
     #[pyo3(signature = (path, bs, **kwargs))]
     pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<WriteOptions>) 
-> PyResult<()> {
         let kwargs = kwargs.unwrap_or_default();
-        let mut write = self.0.write_with(path, bs).append(kwargs.append);
+        let mut write = self.core.write_with(path, bs).append(kwargs.append);
         if let Some(chunk) = kwargs.chunk {
             write = write.chunk(chunk);
         }
@@ -123,22 +136,25 @@ impl Operator {
 
     /// Get current path's metadata **without cache** directly.
     pub fn stat(&self, path: &str) -> PyResult<Metadata> {
-        self.0.stat(path).map_err(format_pyerr).map(Metadata::new)
+        self.core
+            .stat(path)
+            .map_err(format_pyerr)
+            .map(Metadata::new)
     }
 
     /// Copy source to target.
     pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
-        self.0.copy(source, target).map_err(format_pyerr)
+        self.core.copy(source, target).map_err(format_pyerr)
     }
 
     /// Rename filename.
     pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
-        self.0.rename(source, target).map_err(format_pyerr)
+        self.core.rename(source, target).map_err(format_pyerr)
     }
 
     /// Remove all file
     pub fn remove_all(&self, path: &str) -> PyResult<()> {
-        self.0.remove_all(path).map_err(format_pyerr)
+        self.core.remove_all(path).map_err(format_pyerr)
     }
 
     /// Create a dir at given path.
@@ -154,7 +170,7 @@ impl Operator {
     /// - Create on existing dir will succeed.
     /// - Create dir is always recursive, works like `mkdir -p`
     pub fn create_dir(&self, path: &str) -> PyResult<()> {
-        self.0.create_dir(path).map_err(format_pyerr)
+        self.core.create_dir(path).map_err(format_pyerr)
     }
 
     /// Delete given path.
@@ -163,19 +179,19 @@ impl Operator {
     ///
     /// - Delete not existing error won't return errors.
     pub fn delete(&self, path: &str) -> PyResult<()> {
-        self.0.delete(path).map_err(format_pyerr)
+        self.core.delete(path).map_err(format_pyerr)
     }
 
     /// List current dir path.
     pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
-        let l = self.0.lister(path).map_err(format_pyerr)?;
+        let l = self.core.lister(path).map_err(format_pyerr)?;
         Ok(BlockingLister::new(l))
     }
 
     /// List dir in flat way.
     pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
         let l = self
-            .0
+            .core
             .lister_with(path)
             .recursive(true)
             .call()
@@ -184,15 +200,21 @@ impl Operator {
     }
 
     pub fn capability(&self) -> PyResult<capability::Capability> {
-        Ok(capability::Capability::new(self.0.info().full_capability()))
+        Ok(capability::Capability::new(
+            self.core.info().full_capability(),
+        ))
     }
 
     pub fn to_async_operator(&self) -> PyResult<AsyncOperator> {
-        Ok(AsyncOperator(self.0.clone().into()))
+        Ok(AsyncOperator {
+            core: self.core.clone().into(),
+            __scheme: self.__scheme.clone(),
+            __map: self.__map.clone(),
+        })
     }
 
     fn __repr__(&self) -> String {
-        let info = self.0.info();
+        let info = self.core.info();
         let name = info.name();
         if name.is_empty() {
             format!("Operator(\"{}\", root=\"{}\")", info.scheme(), 
info.root())
@@ -204,13 +226,24 @@ impl Operator {
             )
         }
     }
+
+    fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
+        let args = vec![self.__scheme.to_string().to_object(py)];
+        let args = PyTuple::new_bound(py, args);
+        let kwargs = self.__map.clone().into_py(py);
+        Ok(PyTuple::new_bound(py, [args.to_object(py), 
kwargs.to_object(py)]).to_object(py))
+    }
 }
 
 /// `AsyncOperator` is the entry for all public async APIs
 ///
 /// Create a new `AsyncOperator` with the given `scheme` and 
options(`**kwargs`).
 #[pyclass(module = "opendal")]
-pub struct AsyncOperator(ocore::Operator);
+pub struct AsyncOperator {
+    core: ocore::Operator,
+    __scheme: ocore::Scheme,
+    __map: HashMap<String, String>,
+}
 
 #[pymethods]
 impl AsyncOperator {
@@ -230,13 +263,21 @@ impl AsyncOperator {
             })
             .unwrap_or_default();
 
-        Ok(AsyncOperator(build_operator(scheme, map)?))
+        Ok(AsyncOperator {
+            core: build_operator(scheme.clone(), map.clone())?.into(),
+            __scheme: scheme,
+            __map: map,
+        })
     }
 
     /// Add new layers upon existing operator
     pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
-        let op = layer.0.layer(self.0.clone());
-        Ok(Self(op))
+        let op = layer.0.layer(self.core.clone());
+        Ok(Self {
+            core: op,
+            __scheme: self.__scheme.clone(),
+            __map: self.__map.clone(),
+        })
     }
 
     /// Open a file-like reader for the given path.
@@ -246,7 +287,7 @@ impl AsyncOperator {
         path: String,
         mode: String,
     ) -> PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
 
         future_into_py(py, async move {
             if mode == "rb" {
@@ -271,7 +312,7 @@ impl AsyncOperator {
 
     /// Read the whole path into bytes.
     pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let res: Vec<u8> = 
this.read(&path).await.map_err(format_pyerr)?.to_vec();
             Python::with_gil(|py| Buffer::new(res).into_bytes(py))
@@ -288,7 +329,7 @@ impl AsyncOperator {
         kwargs: Option<WriteOptions>,
     ) -> PyResult<Bound<PyAny>> {
         let kwargs = kwargs.unwrap_or_default();
-        let this = self.0.clone();
+        let this = self.core.clone();
         let bs = bs.as_bytes().to_vec();
         future_into_py(py, async move {
             let mut write = this.write_with(&path, bs).append(kwargs.append);
@@ -310,7 +351,7 @@ impl AsyncOperator {
 
     /// Get current path's metadata **without cache** directly.
     pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let res: Metadata = this
                 .stat(&path)
@@ -329,7 +370,7 @@ impl AsyncOperator {
         source: String,
         target: String,
     ) -> PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             this.copy(&source, &target).await.map_err(format_pyerr)
         })
@@ -342,7 +383,7 @@ impl AsyncOperator {
         source: String,
         target: String,
     ) -> PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             this.rename(&source, &target).await.map_err(format_pyerr)
         })
@@ -350,7 +391,7 @@ impl AsyncOperator {
 
     /// Remove all file
     pub fn remove_all<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             this.remove_all(&path).await.map_err(format_pyerr)
         })
@@ -369,7 +410,7 @@ impl AsyncOperator {
     /// - Create on existing dir will succeed.
     /// - Create dir is always recursive, works like `mkdir -p`
     pub fn create_dir<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             this.create_dir(&path).await.map_err(format_pyerr)
         })
@@ -381,7 +422,7 @@ impl AsyncOperator {
     ///
     /// - Delete not existing error won't return errors.
     pub fn delete<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(
             py,
             async move { this.delete(&path).await.map_err(format_pyerr) },
@@ -390,7 +431,7 @@ impl AsyncOperator {
 
     /// List current dir path.
     pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let lister = this.lister(&path).await.map_err(format_pyerr)?;
             let pylister: PyObject = Python::with_gil(|py| 
AsyncLister::new(lister).into_py(py));
@@ -400,7 +441,7 @@ impl AsyncOperator {
 
     /// List dir in flat way.
     pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> 
PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let lister = this
                 .lister_with(&path)
@@ -419,7 +460,7 @@ impl AsyncOperator {
         path: String,
         expire_second: u64,
     ) -> PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let res = this
                 .presign_stat(&path, Duration::from_secs(expire_second))
@@ -438,7 +479,7 @@ impl AsyncOperator {
         path: String,
         expire_second: u64,
     ) -> PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let res = this
                 .presign_read(&path, Duration::from_secs(expire_second))
@@ -457,7 +498,7 @@ impl AsyncOperator {
         path: String,
         expire_second: u64,
     ) -> PyResult<Bound<PyAny>> {
-        let this = self.0.clone();
+        let this = self.core.clone();
         future_into_py(py, async move {
             let res = this
                 .presign_write(&path, Duration::from_secs(expire_second))
@@ -470,15 +511,21 @@ impl AsyncOperator {
     }
 
     pub fn capability(&self) -> PyResult<capability::Capability> {
-        Ok(capability::Capability::new(self.0.info().full_capability()))
+        Ok(capability::Capability::new(
+            self.core.info().full_capability(),
+        ))
     }
 
     pub fn to_operator(&self) -> PyResult<Operator> {
-        Ok(Operator(self.0.clone().blocking()))
+        Ok(Operator {
+            core: self.core.clone().blocking(),
+            __scheme: self.__scheme.clone(),
+            __map: self.__map.clone(),
+        })
     }
 
     fn __repr__(&self) -> String {
-        let info = self.0.info();
+        let info = self.core.info();
         let name = info.name();
         if name.is_empty() {
             format!(
@@ -494,6 +541,13 @@ impl AsyncOperator {
             )
         }
     }
+
+    fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
+        let args = vec![self.__scheme.to_string().to_object(py)];
+        let args = PyTuple::new_bound(py, args);
+        let kwargs = self.__map.clone().into_py(py);
+        Ok(PyTuple::new_bound(py, [args.to_object(py), 
kwargs.to_object(py)]).to_object(py))
+    }
 }
 
 #[pyclass(module = "opendal")]
diff --git a/bindings/python/tests/test_pickle.py 
b/bindings/python/tests/test_async_pickle_types.py
similarity index 57%
copy from bindings/python/tests/test_pickle.py
copy to bindings/python/tests/test_async_pickle_types.py
index cba001628..c163be3c2 100644
--- a/bindings/python/tests/test_pickle.py
+++ b/bindings/python/tests/test_async_pickle_types.py
@@ -17,31 +17,26 @@
 
 import pytest
 import pickle
-from datetime import datetime
+from random import randint
 from uuid import uuid4
+import os
 
 
[email protected]_capability("read", "write", "delete")
-def test_sync_file_pickle(service_name, operator, async_operator):
[email protected]
[email protected]_capability("read", "write", "delete", "shared")
+async def test_operator_pickle(service_name, operator, async_operator):
     """
-    Test pickle streaming serialization and deserialization using OpenDAL 
operator.
+    Test AsyncOperator's pickle serialization and deserialization.
     """
-    data = {
-        "a": 1,
-        "b": "hello",
-        "c": [1, 2, 3],
-        "d": {"e": 4},
-        "f": None,
-        "g": b"hello\nworld",
-        "h": set([1, 2, 3]),
-        "i": 1.23,
-        "j": True,
-        "k": datetime.strptime("2024-01-01", "%Y-%m-%d"),
-    }
 
+    size = randint(1, 1024)
     filename = f"random_file_{str(uuid4())}"
-    with operator.open(filename, "wb") as f:
-        pickle.dump(data, f)
+    content = os.urandom(size)
+    await async_operator.write(filename, content)
 
-    with operator.open(filename, "rb") as f:
-        assert pickle.load(f) == data
+    serialized = pickle.dumps(async_operator)
+
+    deserialized = pickle.loads(serialized)
+    assert await deserialized.read(filename) == content
+
+    await async_operator.delete(filename)
diff --git a/bindings/python/tests/test_pickle.py 
b/bindings/python/tests/test_pickle_rw.py
similarity index 100%
copy from bindings/python/tests/test_pickle.py
copy to bindings/python/tests/test_pickle_rw.py
diff --git a/bindings/python/tests/test_pickle.py 
b/bindings/python/tests/test_sync_pickle_types.py
similarity index 57%
rename from bindings/python/tests/test_pickle.py
rename to bindings/python/tests/test_sync_pickle_types.py
index cba001628..337ebfa6b 100644
--- a/bindings/python/tests/test_pickle.py
+++ b/bindings/python/tests/test_sync_pickle_types.py
@@ -17,31 +17,25 @@
 
 import pytest
 import pickle
-from datetime import datetime
+from random import randint
 from uuid import uuid4
+import os
 
 
[email protected]_capability("read", "write", "delete")
-def test_sync_file_pickle(service_name, operator, async_operator):
[email protected]_capability("read", "write", "delete", "shared")
+def test_operator_pickle(service_name, operator, async_operator):
     """
-    Test pickle streaming serialization and deserialization using OpenDAL 
operator.
+    Test Operator's pickle serialization and deserialization.
     """
-    data = {
-        "a": 1,
-        "b": "hello",
-        "c": [1, 2, 3],
-        "d": {"e": 4},
-        "f": None,
-        "g": b"hello\nworld",
-        "h": set([1, 2, 3]),
-        "i": 1.23,
-        "j": True,
-        "k": datetime.strptime("2024-01-01", "%Y-%m-%d"),
-    }
 
+    size = randint(1, 1024)
     filename = f"random_file_{str(uuid4())}"
-    with operator.open(filename, "wb") as f:
-        pickle.dump(data, f)
+    content = os.urandom(size)
+    operator.write(filename, content)
 
-    with operator.open(filename, "rb") as f:
-        assert pickle.load(f) == data
+    serialized = pickle.dumps(operator)
+
+    deserialized = pickle.loads(serialized)
+    assert deserialized.read(filename) == content
+
+    operator.delete(filename)

Reply via email to