This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2c25f432e feat(bindings/python): Add layer API for operator (#3464)
2c25f432e is described below
commit 2c25f432ef3e796e0c09f2fd0fe2709258b66486
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 2 15:44:18 2023 +0800
feat(bindings/python): Add layer API for operator (#3464)
* feat(bindings/python): Add layer API for operator
Signed-off-by: Xuanwo <[email protected]>
* add upgrade
Signed-off-by: Xuanwo <[email protected]>
* Use PyClassInitializer instead
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/python/python/opendal/__init__.pyi | 2 ++
bindings/python/python/opendal/layers.pyi | 6 ----
bindings/python/src/asyncio.rs | 12 +++++--
bindings/python/src/layers.rs | 53 ++++++++++------------------
bindings/python/src/lib.rs | 38 +++++++-------------
bindings/python/tests/conftest.py | 4 +--
bindings/python/upgrade.md | 11 ++++++
core/src/types/operator/blocking_operator.rs | 6 ++++
8 files changed, 61 insertions(+), 71 deletions(-)
diff --git a/bindings/python/python/opendal/__init__.pyi
b/bindings/python/python/opendal/__init__.pyi
index df8134e3e..4d7f8a7ee 100644
--- a/bindings/python/python/opendal/__init__.pyi
+++ b/bindings/python/python/opendal/__init__.pyi
@@ -21,6 +21,7 @@ class Error(Exception): ...
class Operator:
def __init__(self, scheme: str, **kwargs): ...
+ def layer(self, layer: Layer): ...
def read(self, path: str) -> memoryview: ...
def open_reader(self, path: str) -> Reader: ...
def write(
@@ -43,6 +44,7 @@ class Operator:
class AsyncOperator:
def __init__(self, scheme: str, **kwargs): ...
+ def layer(self, layer: Layer): ...
async def read(self, path: str) -> memoryview: ...
def open_reader(self, path: str) -> AsyncReader: ...
async def write(
diff --git a/bindings/python/python/opendal/layers.pyi
b/bindings/python/python/opendal/layers.pyi
index 6ae35f50e..bd2bc2e02 100644
--- a/bindings/python/python/opendal/layers.pyi
+++ b/bindings/python/python/opendal/layers.pyi
@@ -15,12 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-class ConcurrentLimitLayer:
- def __init__(self, permits: int) -> None: ...
-
-class ImmutableIndexLayer:
- def insert(self, key: str) -> None: ...
-
class RetryLayer:
def __init__(
self,
diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs
index 2ebd99dd2..12c9eefee 100644
--- a/bindings/python/src/asyncio.rs
+++ b/bindings/python/src/asyncio.rs
@@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator);
#[pymethods]
impl AsyncOperator {
#[new]
- #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
- pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>)
-> PyResult<Self> {
+ #[pyo3(signature = (scheme, *, **map))]
+ pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
let scheme = od::Scheme::from_str(scheme)
.map_err(|err| {
od::Error::new(od::ErrorKind::Unexpected, "unsupported
scheme").set_source(err)
@@ -71,7 +71,13 @@ impl AsyncOperator {
})
.unwrap_or_default();
- Ok(AsyncOperator(build_operator(scheme, map, layers, false)?))
+ Ok(AsyncOperator(build_operator(scheme, map, false)?))
+ }
+
+ /// Add new layers upon existing operator
+ pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
+ let op = layer.0.layer(self.0.clone());
+ Ok(Self(op))
}
/// Read the whole path into bytes.
diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs
index 1e7fe72f6..c3c9a8cc9 100644
--- a/bindings/python/src/layers.rs
+++ b/bindings/python/src/layers.rs
@@ -18,47 +18,26 @@
use std::time::Duration;
use ::opendal as od;
+use opendal::Operator;
use pyo3::prelude::*;
-#[derive(FromPyObject)]
-pub enum Layer {
- ConcurrentLimit(ConcurrentLimitLayer),
- ImmutableIndex(ImmutableIndexLayer),
- Retry(RetryLayer),
+pub trait PythonLayer: Send + Sync {
+ fn layer(&self, op: Operator) -> Operator;
}
-#[pyclass(module = "opendal.layers")]
-#[derive(Clone)]
-pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer);
+#[pyclass(module = "opendal.layers", subclass)]
+pub struct Layer(pub Box<dyn PythonLayer>);
-#[pymethods]
-impl ConcurrentLimitLayer {
- #[new]
- fn new(permits: usize) -> Self {
- Self(od::layers::ConcurrentLimitLayer::new(permits))
- }
-}
-
-#[pyclass(module = "opendal.layers")]
+#[pyclass(module = "opendal.layers", extends=Layer)]
#[derive(Clone)]
-pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer);
-
-#[pymethods]
-impl ImmutableIndexLayer {
- #[new]
- fn new() -> Self {
- Self(od::layers::ImmutableIndexLayer::default())
- }
+pub struct RetryLayer(od::layers::RetryLayer);
- fn insert(&mut self, key: String) {
- self.0.insert(key);
+impl PythonLayer for RetryLayer {
+ fn layer(&self, op: Operator) -> Operator {
+ op.layer(self.0.clone())
}
}
-#[pyclass(module = "opendal.layers")]
-#[derive(Clone)]
-pub struct RetryLayer(pub od::layers::RetryLayer);
-
#[pymethods]
impl RetryLayer {
#[new]
@@ -75,7 +54,7 @@ impl RetryLayer {
jitter: bool,
max_delay: Option<f64>,
min_delay: Option<f64>,
- ) -> PyResult<Self> {
+ ) -> PyResult<PyClassInitializer<Self>> {
let mut retry = od::layers::RetryLayer::default();
if let Some(max_times) = max_times {
retry = retry.with_max_times(max_times);
@@ -92,14 +71,18 @@ impl RetryLayer {
if let Some(min_delay) = min_delay {
retry = retry.with_min_delay(Duration::from_micros((min_delay *
1000000.0) as u64));
}
- Ok(Self(retry))
+
+ let retry_layer = Self(retry);
+ let class =
PyClassInitializer::from(Layer(Box::new(retry_layer.clone())))
+ .add_subclass(retry_layer);
+
+ Ok(class)
}
}
pub fn create_submodule(py: Python) -> PyResult<&PyModule> {
let submod = PyModule::new(py, "layers")?;
- submod.add_class::<ConcurrentLimitLayer>()?;
- submod.add_class::<ImmutableIndexLayer>()?;
+ submod.add_class::<Layer>()?;
submod.add_class::<RetryLayer>()?;
Ok(submod)
}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 8271c483a..dee9db8f4 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -82,25 +82,9 @@ impl From<Vec<u8>> for Buffer {
}
}
-fn add_layers(mut op: od::Operator, layers: Vec<layers::Layer>) ->
PyResult<od::Operator> {
- for layer in layers {
- match layer {
- layers::Layer::Retry(layers::RetryLayer(inner)) => op =
op.layer(inner),
- layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner))
=> {
- op = op.layer(inner)
- }
-
layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => {
- op = op.layer(inner)
- }
- }
- }
- Ok(op)
-}
-
fn build_operator(
scheme: od::Scheme,
map: HashMap<String, String>,
- layers: Vec<layers::Layer>,
blocking: bool,
) -> PyResult<od::Operator> {
let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?;
@@ -110,7 +94,7 @@ fn build_operator(
op = op.layer(od::layers::BlockingLayer::create().expect("blocking
layer must be created"));
}
- add_layers(op, layers)
+ Ok(op)
}
/// `Operator` is the entry for all public blocking APIs
@@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator);
#[pymethods]
impl Operator {
#[new]
- #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
- pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>)
-> PyResult<Self> {
+ #[pyo3(signature = (scheme, *, **map))]
+ pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
let scheme = od::Scheme::from_str(scheme)
.map_err(|err| {
od::Error::new(od::ErrorKind::Unexpected, "unsupported
scheme").set_source(err)
@@ -136,9 +120,13 @@ impl Operator {
})
.unwrap_or_default();
- Ok(Operator(
- build_operator(scheme, map, layers, true)?.blocking(),
- ))
+ Ok(Operator(build_operator(scheme, map, true)?.blocking()))
+ }
+
+ /// Add new layers upon existing operator
+ pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
+ let op = layer.0.layer(self.0.clone().into());
+ Ok(Self(op.blocking()))
}
/// Read the whole path into bytes.
@@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<capability::Capability>()?;
m.add("Error", py.get_type::<Error>())?;
- let layers = layers::create_submodule(py)?;
- m.add_submodule(layers)?;
+ let layers_module = layers::create_submodule(py)?;
+ m.add_submodule(layers_module)?;
py.import("sys")?
.getattr("modules")?
- .set_item("opendal.layers", layers)?;
+ .set_item("opendal.layers", layers_module)?;
Ok(())
}
diff --git a/bindings/python/tests/conftest.py
b/bindings/python/tests/conftest.py
index e4a88caa7..bc831de69 100644
--- a/bindings/python/tests/conftest.py
+++ b/bindings/python/tests/conftest.py
@@ -56,12 +56,12 @@ def setup_config(service_name):
@pytest.fixture()
def operator(service_name, setup_config):
- return opendal.Operator(service_name, **setup_config)
+ return opendal.Operator(service_name,
**setup_config).layer(opendal.layers.RetryLayer())
@pytest.fixture()
def async_operator(service_name, setup_config):
- return opendal.AsyncOperator(service_name, **setup_config)
+ return opendal.AsyncOperator(service_name,
**setup_config).layer(opendal.layers.RetryLayer())
@pytest.fixture(autouse=True)
diff --git a/bindings/python/upgrade.md b/bindings/python/upgrade.md
new file mode 100644
index 000000000..91dc34480
--- /dev/null
+++ b/bindings/python/upgrade.md
@@ -0,0 +1,11 @@
+# Unreleased
+
+## Breaking change for layers
+
+Operator and BlockingOperator won't accept `layers` anymore. Instead, we
provide a `layer` API:
+
+```python
+op = opendal.Operator("memory").layer(opendal.layers.RetryLayer())
+```
+
+We removed not used layers `ConcurrentLimitLayer` and `ImmutableIndexLayer`
along with this change.
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index f55a1c48a..e8b8572da 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -1140,3 +1140,9 @@ impl BlockingOperator {
))
}
}
+
+impl From<BlockingOperator> for Operator {
+ fn from(v: BlockingOperator) -> Self {
+ Operator::from_inner(v.accessor).with_limit(v.limit)
+ }
+}