This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch python-layer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit cbca2592185384aef4c87c01048e0406b6613fb0
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 2 14:41:14 2023 +0800

    feat(bindings/python): Add layer API for operator
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/python/python/opendal/__init__.pyi  |  2 ++
 bindings/python/python/opendal/layers.pyi    |  6 ----
 bindings/python/src/asyncio.rs               | 12 +++++--
 bindings/python/src/layers.rs                | 51 +++++++++-------------------
 bindings/python/src/lib.rs                   | 38 +++++++--------------
 bindings/python/tests/conftest.py            |  4 +--
 core/src/types/operator/blocking_operator.rs |  6 ++++
 7 files changed, 48 insertions(+), 71 deletions(-)

diff --git a/bindings/python/python/opendal/__init__.pyi 
b/bindings/python/python/opendal/__init__.pyi
index df8134e3e..4d7f8a7ee 100644
--- a/bindings/python/python/opendal/__init__.pyi
+++ b/bindings/python/python/opendal/__init__.pyi
@@ -21,6 +21,7 @@ class Error(Exception): ...
 
 class Operator:
     def __init__(self, scheme: str, **kwargs): ...
+    def layer(self, layer: Layer): ...
     def read(self, path: str) -> memoryview: ...
     def open_reader(self, path: str) -> Reader: ...
     def write(
@@ -43,6 +44,7 @@ class Operator:
 
 class AsyncOperator:
     def __init__(self, scheme: str, **kwargs): ...
+    def layer(self, layer: Layer): ...
     async def read(self, path: str) -> memoryview: ...
     def open_reader(self, path: str) -> AsyncReader: ...
     async def write(
diff --git a/bindings/python/python/opendal/layers.pyi 
b/bindings/python/python/opendal/layers.pyi
index 6ae35f50e..bd2bc2e02 100644
--- a/bindings/python/python/opendal/layers.pyi
+++ b/bindings/python/python/opendal/layers.pyi
@@ -15,12 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-class ConcurrentLimitLayer:
-    def __init__(self, permits: int) -> None: ...
-
-class ImmutableIndexLayer:
-    def insert(self, key: str) -> None: ...
-
 class RetryLayer:
     def __init__(
         self,
diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs
index 2ebd99dd2..12c9eefee 100644
--- a/bindings/python/src/asyncio.rs
+++ b/bindings/python/src/asyncio.rs
@@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator);
 #[pymethods]
 impl AsyncOperator {
     #[new]
-    #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
-    pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) 
-> PyResult<Self> {
+    #[pyo3(signature = (scheme, *,  **map))]
+    pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
         let scheme = od::Scheme::from_str(scheme)
             .map_err(|err| {
                 od::Error::new(od::ErrorKind::Unexpected, "unsupported 
scheme").set_source(err)
@@ -71,7 +71,13 @@ impl AsyncOperator {
             })
             .unwrap_or_default();
 
-        Ok(AsyncOperator(build_operator(scheme, map, layers, false)?))
+        Ok(AsyncOperator(build_operator(scheme, map, false)?))
+    }
+
+    /// Add new layers upon existing operator
+    pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
+        let op = layer.0.layer(self.0.clone());
+        Ok(Self(op))
     }
 
     /// Read the whole path into bytes.
diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs
index 1e7fe72f6..52b33d559 100644
--- a/bindings/python/src/layers.rs
+++ b/bindings/python/src/layers.rs
@@ -18,47 +18,26 @@
 use std::time::Duration;
 
 use ::opendal as od;
+use opendal::Operator;
 use pyo3::prelude::*;
 
-#[derive(FromPyObject)]
-pub enum Layer {
-    ConcurrentLimit(ConcurrentLimitLayer),
-    ImmutableIndex(ImmutableIndexLayer),
-    Retry(RetryLayer),
+pub trait PythonLayer: Send + Sync {
+    fn layer(&self, op: Operator) -> Operator;
 }
 
-#[pyclass(module = "opendal.layers")]
-#[derive(Clone)]
-pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer);
+#[pyclass(module = "opendal.layers", subclass)]
+pub struct Layer(pub Box<dyn PythonLayer>);
 
-#[pymethods]
-impl ConcurrentLimitLayer {
-    #[new]
-    fn new(permits: usize) -> Self {
-        Self(od::layers::ConcurrentLimitLayer::new(permits))
-    }
-}
-
-#[pyclass(module = "opendal.layers")]
+#[pyclass(module = "opendal.layers", extends=Layer)]
 #[derive(Clone)]
-pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer);
-
-#[pymethods]
-impl ImmutableIndexLayer {
-    #[new]
-    fn new() -> Self {
-        Self(od::layers::ImmutableIndexLayer::default())
-    }
+pub struct RetryLayer(od::layers::RetryLayer);
 
-    fn insert(&mut self, key: String) {
-        self.0.insert(key);
+impl PythonLayer for RetryLayer {
+    fn layer(&self, op: Operator) -> Operator {
+        op.layer(self.0.clone())
     }
 }
 
-#[pyclass(module = "opendal.layers")]
-#[derive(Clone)]
-pub struct RetryLayer(pub od::layers::RetryLayer);
-
 #[pymethods]
 impl RetryLayer {
     #[new]
@@ -75,7 +54,7 @@ impl RetryLayer {
         jitter: bool,
         max_delay: Option<f64>,
         min_delay: Option<f64>,
-    ) -> PyResult<Self> {
+    ) -> PyResult<(Self, Layer)> {
         let mut retry = od::layers::RetryLayer::default();
         if let Some(max_times) = max_times {
             retry = retry.with_max_times(max_times);
@@ -92,14 +71,16 @@ impl RetryLayer {
         if let Some(min_delay) = min_delay {
             retry = retry.with_min_delay(Duration::from_micros((min_delay * 
1000000.0) as u64));
         }
-        Ok(Self(retry))
+
+        let retry_layer = Self(retry);
+
+        Ok((retry_layer.clone(), Layer(Box::new(retry_layer))))
     }
 }
 
 pub fn create_submodule(py: Python) -> PyResult<&PyModule> {
     let submod = PyModule::new(py, "layers")?;
-    submod.add_class::<ConcurrentLimitLayer>()?;
-    submod.add_class::<ImmutableIndexLayer>()?;
+    submod.add_class::<Layer>()?;
     submod.add_class::<RetryLayer>()?;
     Ok(submod)
 }
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 8271c483a..dee9db8f4 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -82,25 +82,9 @@ impl From<Vec<u8>> for Buffer {
     }
 }
 
-fn add_layers(mut op: od::Operator, layers: Vec<layers::Layer>) -> 
PyResult<od::Operator> {
-    for layer in layers {
-        match layer {
-            layers::Layer::Retry(layers::RetryLayer(inner)) => op = 
op.layer(inner),
-            layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner)) 
=> {
-                op = op.layer(inner)
-            }
-            
layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => {
-                op = op.layer(inner)
-            }
-        }
-    }
-    Ok(op)
-}
-
 fn build_operator(
     scheme: od::Scheme,
     map: HashMap<String, String>,
-    layers: Vec<layers::Layer>,
     blocking: bool,
 ) -> PyResult<od::Operator> {
     let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?;
@@ -110,7 +94,7 @@ fn build_operator(
         op = op.layer(od::layers::BlockingLayer::create().expect("blocking 
layer must be created"));
     }
 
-    add_layers(op, layers)
+    Ok(op)
 }
 
 /// `Operator` is the entry for all public blocking APIs
@@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator);
 #[pymethods]
 impl Operator {
     #[new]
-    #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
-    pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) 
-> PyResult<Self> {
+    #[pyo3(signature = (scheme, *, **map))]
+    pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
         let scheme = od::Scheme::from_str(scheme)
             .map_err(|err| {
                 od::Error::new(od::ErrorKind::Unexpected, "unsupported 
scheme").set_source(err)
@@ -136,9 +120,13 @@ impl Operator {
             })
             .unwrap_or_default();
 
-        Ok(Operator(
-            build_operator(scheme, map, layers, true)?.blocking(),
-        ))
+        Ok(Operator(build_operator(scheme, map, true)?.blocking()))
+    }
+
+    /// Add new layers upon existing operator
+    pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
+        let op = layer.0.layer(self.0.clone().into());
+        Ok(Self(op.blocking()))
     }
 
     /// Read the whole path into bytes.
@@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
     m.add_class::<capability::Capability>()?;
     m.add("Error", py.get_type::<Error>())?;
 
-    let layers = layers::create_submodule(py)?;
-    m.add_submodule(layers)?;
+    let layers_module = layers::create_submodule(py)?;
+    m.add_submodule(layers_module)?;
     py.import("sys")?
         .getattr("modules")?
-        .set_item("opendal.layers", layers)?;
+        .set_item("opendal.layers", layers_module)?;
 
     Ok(())
 }
diff --git a/bindings/python/tests/conftest.py 
b/bindings/python/tests/conftest.py
index e4a88caa7..bc831de69 100644
--- a/bindings/python/tests/conftest.py
+++ b/bindings/python/tests/conftest.py
@@ -56,12 +56,12 @@ def setup_config(service_name):
 
 @pytest.fixture()
 def operator(service_name, setup_config):
-    return opendal.Operator(service_name, **setup_config)
+    return opendal.Operator(service_name, 
**setup_config).layer(opendal.layers.RetryLayer())
 
 
 @pytest.fixture()
 def async_operator(service_name, setup_config):
-    return opendal.AsyncOperator(service_name, **setup_config)
+    return opendal.AsyncOperator(service_name, 
**setup_config).layer(opendal.layers.RetryLayer())
 
 
 @pytest.fixture(autouse=True)
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index f55a1c48a..e8b8572da 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -1140,3 +1140,9 @@ impl BlockingOperator {
         ))
     }
 }
+
+impl From<BlockingOperator> for Operator {
+    fn from(v: BlockingOperator) -> Self {
+        Operator::from_inner(v.accessor).with_limit(v.limit)
+    }
+}

Reply via email to