This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch python-layer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit cbca2592185384aef4c87c01048e0406b6613fb0 Author: Xuanwo <[email protected]> AuthorDate: Thu Nov 2 14:41:14 2023 +0800 feat(bindings/python): Add layer API for operator Signed-off-by: Xuanwo <[email protected]> --- bindings/python/python/opendal/__init__.pyi | 2 ++ bindings/python/python/opendal/layers.pyi | 6 ---- bindings/python/src/asyncio.rs | 12 +++++-- bindings/python/src/layers.rs | 51 +++++++++------------------- bindings/python/src/lib.rs | 38 +++++++-------------- bindings/python/tests/conftest.py | 4 +-- core/src/types/operator/blocking_operator.rs | 6 ++++ 7 files changed, 48 insertions(+), 71 deletions(-) diff --git a/bindings/python/python/opendal/__init__.pyi b/bindings/python/python/opendal/__init__.pyi index df8134e3e..4d7f8a7ee 100644 --- a/bindings/python/python/opendal/__init__.pyi +++ b/bindings/python/python/opendal/__init__.pyi @@ -21,6 +21,7 @@ class Error(Exception): ... class Operator: def __init__(self, scheme: str, **kwargs): ... + def layer(self, layer: Layer): ... def read(self, path: str) -> memoryview: ... def open_reader(self, path: str) -> Reader: ... def write( @@ -43,6 +44,7 @@ class Operator: class AsyncOperator: def __init__(self, scheme: str, **kwargs): ... + def layer(self, layer: Layer): ... async def read(self, path: str) -> memoryview: ... def open_reader(self, path: str) -> AsyncReader: ... async def write( diff --git a/bindings/python/python/opendal/layers.pyi b/bindings/python/python/opendal/layers.pyi index 6ae35f50e..bd2bc2e02 100644 --- a/bindings/python/python/opendal/layers.pyi +++ b/bindings/python/python/opendal/layers.pyi @@ -15,12 +15,6 @@ # specific language governing permissions and limitations # under the License. -class ConcurrentLimitLayer: - def __init__(self, permits: int) -> None: ... - -class ImmutableIndexLayer: - def insert(self, key: str) -> None: ... - class RetryLayer: def __init__( self, diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs index 2ebd99dd2..12c9eefee 100644 --- a/bindings/python/src/asyncio.rs +++ b/bindings/python/src/asyncio.rs @@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator); #[pymethods] impl AsyncOperator { #[new] - #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))] - pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) -> PyResult<Self> { + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { let scheme = od::Scheme::from_str(scheme) .map_err(|err| { od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) @@ -71,7 +71,13 @@ impl AsyncOperator { }) .unwrap_or_default(); - Ok(AsyncOperator(build_operator(scheme, map, layers, false)?)) + Ok(AsyncOperator(build_operator(scheme, map, false)?)) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> { + let op = layer.0.layer(self.0.clone()); + Ok(Self(op)) } /// Read the whole path into bytes. diff --git a/bindings/python/src/layers.rs b/bindings/python/src/layers.rs index 1e7fe72f6..52b33d559 100644 --- a/bindings/python/src/layers.rs +++ b/bindings/python/src/layers.rs @@ -18,47 +18,26 @@ use std::time::Duration; use ::opendal as od; +use opendal::Operator; use pyo3::prelude::*; -#[derive(FromPyObject)] -pub enum Layer { - ConcurrentLimit(ConcurrentLimitLayer), - ImmutableIndex(ImmutableIndexLayer), - Retry(RetryLayer), +pub trait PythonLayer: Send + Sync { + fn layer(&self, op: Operator) -> Operator; } -#[pyclass(module = "opendal.layers")] -#[derive(Clone)] -pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer); +#[pyclass(module = "opendal.layers", subclass)] +pub struct Layer(pub Box<dyn PythonLayer>); -#[pymethods] -impl ConcurrentLimitLayer { - #[new] - fn new(permits: usize) -> Self { - Self(od::layers::ConcurrentLimitLayer::new(permits)) - } -} - -#[pyclass(module = "opendal.layers")] +#[pyclass(module = "opendal.layers", extends=Layer)] #[derive(Clone)] -pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer); - -#[pymethods] -impl ImmutableIndexLayer { - #[new] - fn new() -> Self { - Self(od::layers::ImmutableIndexLayer::default()) - } +pub struct RetryLayer(od::layers::RetryLayer); - fn insert(&mut self, key: String) { - self.0.insert(key); +impl PythonLayer for RetryLayer { + fn layer(&self, op: Operator) -> Operator { + op.layer(self.0.clone()) } } -#[pyclass(module = "opendal.layers")] -#[derive(Clone)] -pub struct RetryLayer(pub od::layers::RetryLayer); - #[pymethods] impl RetryLayer { #[new] @@ -75,7 +54,7 @@ impl RetryLayer { jitter: bool, max_delay: Option<f64>, min_delay: Option<f64>, - ) -> PyResult<Self> { + ) -> PyResult<(Self, Layer)> { let mut retry = od::layers::RetryLayer::default(); if let Some(max_times) = max_times { retry = retry.with_max_times(max_times); @@ -92,14 +71,16 @@ impl RetryLayer { if let Some(min_delay) = min_delay { retry = retry.with_min_delay(Duration::from_micros((min_delay * 1000000.0) as u64)); } - Ok(Self(retry)) + + let retry_layer = Self(retry); + + Ok((retry_layer.clone(), Layer(Box::new(retry_layer)))) } } pub fn create_submodule(py: Python) -> PyResult<&PyModule> { let submod = PyModule::new(py, "layers")?; - submod.add_class::<ConcurrentLimitLayer>()?; - submod.add_class::<ImmutableIndexLayer>()?; + submod.add_class::<Layer>()?; submod.add_class::<RetryLayer>()?; Ok(submod) } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 8271c483a..dee9db8f4 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -82,25 +82,9 @@ impl From<Vec<u8>> for Buffer { } } -fn add_layers(mut op: od::Operator, layers: Vec<layers::Layer>) -> PyResult<od::Operator> { - for layer in layers { - match layer { - layers::Layer::Retry(layers::RetryLayer(inner)) => op = op.layer(inner), - layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner)) => { - op = op.layer(inner) - } - layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => { - op = op.layer(inner) - } - } - } - Ok(op) -} - fn build_operator( scheme: od::Scheme, map: HashMap<String, String>, - layers: Vec<layers::Layer>, blocking: bool, ) -> PyResult<od::Operator> { let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?; @@ -110,7 +94,7 @@ fn build_operator( op = op.layer(od::layers::BlockingLayer::create().expect("blocking layer must be created")); } - add_layers(op, layers) + Ok(op) } /// `Operator` is the entry for all public blocking APIs @@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator); #[pymethods] impl Operator { #[new] - #[pyo3(signature = (scheme, *, layers=Vec::new(), **map))] - pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) -> PyResult<Self> { + #[pyo3(signature = (scheme, *, **map))] + pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { let scheme = od::Scheme::from_str(scheme) .map_err(|err| { od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err) @@ -136,9 +120,13 @@ impl Operator { }) .unwrap_or_default(); - Ok(Operator( - build_operator(scheme, map, layers, true)?.blocking(), - )) + Ok(Operator(build_operator(scheme, map, true)?.blocking())) + } + + /// Add new layers upon existing operator + pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> { + let op = layer.0.layer(self.0.clone().into()); + Ok(Self(op.blocking())) } /// Read the whole path into bytes. @@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::<capability::Capability>()?; m.add("Error", py.get_type::<Error>())?; - let layers = layers::create_submodule(py)?; - m.add_submodule(layers)?; + let layers_module = layers::create_submodule(py)?; + m.add_submodule(layers_module)?; py.import("sys")? .getattr("modules")? - .set_item("opendal.layers", layers)?; + .set_item("opendal.layers", layers_module)?; Ok(()) } diff --git a/bindings/python/tests/conftest.py b/bindings/python/tests/conftest.py index e4a88caa7..bc831de69 100644 --- a/bindings/python/tests/conftest.py +++ b/bindings/python/tests/conftest.py @@ -56,12 +56,12 @@ def setup_config(service_name): @pytest.fixture() def operator(service_name, setup_config): - return opendal.Operator(service_name, **setup_config) + return opendal.Operator(service_name, **setup_config).layer(opendal.layers.RetryLayer()) @pytest.fixture() def async_operator(service_name, setup_config): - return opendal.AsyncOperator(service_name, **setup_config) + return opendal.AsyncOperator(service_name, **setup_config).layer(opendal.layers.RetryLayer()) @pytest.fixture(autouse=True) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index f55a1c48a..e8b8572da 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -1140,3 +1140,9 @@ impl BlockingOperator { )) } } + +impl From<BlockingOperator> for Operator { + fn from(v: BlockingOperator) -> Self { + Operator::from_inner(v.accessor).with_limit(v.limit) + } +}
