This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new d021e6af feat: import user-defined physical optimizer rules over FFI
(#1557)
d021e6af is described below
commit d021e6afa8e08bee42fb9673ba811352c464bdf7
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Jun 4 07:57:47 2026 -0400
feat: import user-defined physical optimizer rules over FFI (#1557)
* feat: user-defined OptimizerRule and AnalyzerRule from Python
Expose `SessionContext.add_optimizer_rule` and
`SessionContext.add_analyzer_rule` symmetric with the existing
`remove_optimizer_rule`. Each accepts a Python subclass of the new
`datafusion.optimizer.OptimizerRule` / `AnalyzerRule` ABCs.
Implementation:
* New `crates/core/src/optimizer_rules.rs` wraps user Python instances
in `PyOptimizerRuleAdapter` / `PyAnalyzerRuleAdapter`, which
implement the upstream `OptimizerRule` / `AnalyzerRule` traits.
* `OptimizerRule.rewrite(plan)` returns `None` for "no change" or a
new `LogicalPlan`. The adapter maps that to
`Transformed::no` / `Transformed::yes` so the upstream optimizer's
fixed-point loop terminates correctly.
* `AnalyzerRule.analyze(plan)` must always return a `LogicalPlan`;
returning `None` surfaces a `DataFusionError::Execution` naming the
offending rule.
* The upstream `&dyn OptimizerConfig` / `&ConfigOptions` arguments are
not surfaced to Python in this MVP; rules that need configuration
should capture it at construction time (for example by holding a
`SessionContext` reference) or be implemented in Rust.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* feat: import FFI physical optimizer rules; drop Python logical rules
Replace the Python-defined OptimizerRule/AnalyzerRule approach with
FFI-imported physical optimizer rules.
The Python logical-rule approach could observe plans but not transform
them: there are no Python constructors for LogicalPlan node variants, so a rule
could only return None or the input plan unchanged. The audience for custom
rules also overlaps strongly with people who can write Rust.
DataFusion exposes no FFI bridge for the logical OptimizerRule/AnalyzerRule
traits, but it does export FFI_PhysicalOptimizerRule for the physical
PhysicalOptimizerRule trait. This commit imports those instead.
Changes:
* Remove crates/core/src/optimizer_rules.rs,
python/datafusion/optimizer.py, python/tests/test_optimizer.py, and the
SessionContext.add_optimizer_rule / add_analyzer_rule methods.
remove_optimizer_rule is unchanged (pre-existing).
* New crates/core/src/physical_optimizer.rs reads a
__datafusion_physical_optimizer_rule__ capsule and converts it via Arc<dyn
PhysicalOptimizerRule>::from(&FFI_PhysicalOptimizerRule).
* SessionContext gains a physical_optimizer_rules constructor argument.
Upstream offers no API to add physical rules to a live context, so they are
appended to the builder at construction time only.
* The datafusion-ffi-example crate gains MyPhysicalOptimizerRule, a
counter-backed rule used by _test_physical_optimizer_rule.py to prove the rule
fires over FFI during physical planning.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* refactor: type physical_optimizer_rules with an Exportable Protocol
Replace the `list[Any]` hint on the SessionContext
`physical_optimizer_rules` argument with a `PhysicalOptimizerRuleExportable`
Protocol, matching the existing `TableProviderExportable` / `*Exportable`
pattern used for other FFI-capsule objects.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* docs: reference PhysicalOptimizerRuleExportable in SessionContext
docstring
Point the `physical_optimizer_rules` argument docs at the new
`PhysicalOptimizerRuleExportable` Protocol instead of describing the duck
type inline.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* docs: move FFI capsule detail to PhysicalOptimizerRuleExportable
The PyCapsule / FFI_PhysicalOptimizerRule mechanics describe the Protocol,
not the SessionContext constructor. Move that detail onto
PhysicalOptimizerRuleExportable and leave the constructor argument docs focused
on behavior.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* docs: drop redundant comment in SessionContext constructor
Remove the explanatory comment about FFI bridge availability; the same
information already lives on PhysicalOptimizerRuleExportable.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* docs: drop module-level doc comment from physical_optimizer
Sibling FFI-import modules (udf, udaf, catalog, table) carry no
module-level docs, and the rst-style markup did not match Rust conventions. The
function doc comment already states intent.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* refactor: import physical optimizer rule via from_pycapsule! macro
Replace the hand-written crates/core/src/physical_optimizer.rs with a
`from_pycapsule!` invocation in the util crate, matching
`physical_codec_from_pycapsule` and the other FFI capsule importers. The macro
already handles the hasattr/getattr/cast/validate/pointer_checked sequence and
the infallible `Arc::from(&FFI)` conversion, so the dedicated module is no
longer needed.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* docs: trim PhysicalOptimizerRuleExportable docstring
Drop the sentence about logical-rule FFI availability; it is background,
not type-hint information, and keeps the Protocol docstring in line with the
other *Exportable hints.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* Minor refactor
* refactor: register physical optimizer rules via live add method
Drop the `physical_optimizer_rules` constructor argument on
`SessionContext` and replace it with `add_physical_optimizer_rule`,
matching the existing `register_*` shape on the same class. The new
method rebuilds the session state via
`SessionStateBuilder::new_from_existing`
so previously registered tables, UDFs, and catalogs are preserved.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* test: drop redundant FFI physical optimizer rule export test
Coverage subsumed by test_ffi_physical_optimizer_rule_runs_during_planning,
which exercises the same capsule export via add_physical_optimizer_rule.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
crates/core/src/context.rs | 14 +++-
crates/util/src/lib.rs | 9 ++
.../python/tests/_test_physical_optimizer_rule.py | 45 ++++++++++
examples/datafusion-ffi-example/src/lib.rs | 3 +
.../src/physical_optimizer.rs | 98 ++++++++++++++++++++++
python/datafusion/context.py | 34 ++++++++
6 files changed, 202 insertions(+), 1 deletion(-)
diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs
index d714861a..da0df751 100644
--- a/crates/core/src/context.rs
+++ b/crates/core/src/context.rs
@@ -59,7 +59,8 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_python_util::{
create_logical_extension_capsule, create_physical_extension_capsule,
ffi_logical_codec_from_pycapsule, get_global_ctx, get_tokio_runtime,
- physical_codec_from_pycapsule, spawn_future, wait_for_future,
+ physical_codec_from_pycapsule, physical_optimizer_rule_from_pycapsule,
spawn_future,
+ wait_for_future,
};
use object_store::ObjectStore;
use pyo3::IntoPyObjectExt;
@@ -1195,6 +1196,17 @@ impl PySessionContext {
self.ctx.remove_optimizer_rule(name)
}
+ pub fn add_physical_optimizer_rule(&self, rule: Bound<'_, PyAny>) ->
PyDataFusionResult<()> {
+ let rule = physical_optimizer_rule_from_pycapsule(&rule)?;
+ let state_ref = self.ctx.state_ref();
+ let mut guard = state_ref.write();
+ let new_state = SessionStateBuilder::new_from_existing(guard.clone())
+ .with_physical_optimizer_rule(rule)
+ .build();
+ *guard = new_state;
+ Ok(())
+ }
+
pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
let provider = wait_for_future(py, self.ctx.table_provider(name))
// Outer error: runtime/async failure
diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs
index 75b8eeec..07aa0a2d 100644
--- a/crates/util/src/lib.rs
+++ b/crates/util/src/lib.rs
@@ -24,7 +24,9 @@ use datafusion::datasource::TableProvider;
use datafusion::execution::TaskContext;
use datafusion::execution::context::SessionContext;
use datafusion::logical_expr::Volatility;
+use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion_ffi::execution::FFI_TaskContextProvider;
+use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use
datafusion_ffi::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
use datafusion_ffi::table_provider::FFI_TableProvider;
@@ -332,6 +334,13 @@ from_pycapsule!(
dyn PhysicalExtensionCodec
);
+from_pycapsule!(
+ physical_optimizer_rule_from_pycapsule,
+ "datafusion_physical_optimizer_rule",
+ FFI_PhysicalOptimizerRule,
+ dyn PhysicalOptimizerRule + Send + Sync
+);
+
try_from_pycapsule!(
task_context_from_pycapsule,
"datafusion_task_context_provider",
diff --git
a/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py
b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py
new file mode 100644
index 00000000..0c877d78
--- /dev/null
+++
b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pyarrow as pa
+from datafusion import SessionContext
+from datafusion_ffi_example import MyPhysicalOptimizerRule
+
+
+def test_ffi_physical_optimizer_rule_runs_during_planning():
+ """A rule added via add_physical_optimizer_rule is invoked while the
+ physical plan is built, and the query still returns correct results."""
+ rule = MyPhysicalOptimizerRule()
+ ctx = SessionContext()
+ ctx.add_physical_optimizer_rule(rule)
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3])],
+ names=["a"],
+ )
+ ctx.register_record_batches("t", [[batch]])
+
+ before = rule.optimize_calls()
+ result = ctx.sql("SELECT a FROM t").collect()
+ after = rule.optimize_calls()
+
+ assert after > before, (
+ f"Expected user FFI physical optimizer rule to fire, "
+ f"before={before} after={after}"
+ )
+ assert result[0].column(0).to_pylist() == [1, 2, 3]
diff --git a/examples/datafusion-ffi-example/src/lib.rs
b/examples/datafusion-ffi-example/src/lib.rs
index 3323ac98..eccf7b81 100644
--- a/examples/datafusion-ffi-example/src/lib.rs
+++ b/examples/datafusion-ffi-example/src/lib.rs
@@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider,
MyCatalogProvider, MyCatalogP
use crate::config::MyConfig;
use crate::logical_extension_codec::MyLogicalExtensionCodec;
use crate::physical_extension_codec::MyPhysicalExtensionCodec;
+use crate::physical_optimizer::MyPhysicalOptimizerRule;
use crate::scalar_udf::IsNullUDF;
use crate::table_function::MyTableFunction;
use crate::table_provider::MyTableProvider;
@@ -33,6 +34,7 @@ pub(crate) mod catalog_provider;
pub(crate) mod config;
pub(crate) mod logical_extension_codec;
pub(crate) mod physical_extension_codec;
+pub(crate) mod physical_optimizer;
pub(crate) mod scalar_udf;
pub(crate) mod table_function;
pub(crate) mod table_provider;
@@ -55,5 +57,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) ->
PyResult<()> {
m.add_class::<MyConfig>()?;
m.add_class::<MyLogicalExtensionCodec>()?;
m.add_class::<MyPhysicalExtensionCodec>()?;
+ m.add_class::<MyPhysicalOptimizerRule>()?;
Ok(())
}
diff --git a/examples/datafusion-ffi-example/src/physical_optimizer.rs
b/examples/datafusion-ffi-example/src/physical_optimizer.rs
new file mode 100644
index 00000000..0acd1bb4
--- /dev/null
+++ b/examples/datafusion-ffi-example/src/physical_optimizer.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use datafusion::common::Result;
+use datafusion::common::config::ConfigOptions;
+use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
+use datafusion_python_util::get_tokio_runtime;
+use pyo3::prelude::*;
+use pyo3::types::PyCapsule;
+
+/// A physical optimizer rule that leaves every plan unchanged but bumps a
+/// shared counter each time it runs. Tests use the counter to prove that a
+/// session built with this rule actually routed physical planning through a
+/// user-supplied [`PhysicalOptimizerRule`] over FFI.
+#[derive(Debug)]
+struct CountingPhysicalOptimizerRule {
+ optimize_calls: Arc<AtomicUsize>,
+}
+
+impl PhysicalOptimizerRule for CountingPhysicalOptimizerRule {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ self.optimize_calls.fetch_add(1, Ordering::SeqCst);
+ Ok(plan)
+ }
+
+ fn name(&self) -> &str {
+ "counting_physical_optimizer_rule"
+ }
+
+ fn schema_check(&self) -> bool {
+ // The plan is returned unchanged, so the schema is preserved.
+ true
+ }
+}
+
+/// Python-visible handle that produces an [`FFI_PhysicalOptimizerRule`] and
+/// exposes the shared call counter.
+#[pyclass(
+ from_py_object,
+ name = "MyPhysicalOptimizerRule",
+ module = "datafusion_ffi_example",
+ subclass
+)]
+#[derive(Debug, Default, Clone)]
+pub(crate) struct MyPhysicalOptimizerRule {
+ optimize_calls: Arc<AtomicUsize>,
+}
+
+#[pymethods]
+impl MyPhysicalOptimizerRule {
+ #[new]
+ fn new() -> Self {
+ Self::default()
+ }
+
+ fn optimize_calls(&self) -> usize {
+ self.optimize_calls.load(Ordering::SeqCst)
+ }
+
+ fn __datafusion_physical_optimizer_rule__<'py>(
+ &self,
+ py: Python<'py>,
+ ) -> PyResult<Bound<'py, PyCapsule>> {
+ let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ Arc::new(CountingPhysicalOptimizerRule {
+ optimize_calls: Arc::clone(&self.optimize_calls),
+ });
+
+ let runtime = get_tokio_runtime().handle().clone();
+ let ffi = FFI_PhysicalOptimizerRule::new(rule, Some(runtime));
+
+ let name = cr"datafusion_physical_optimizer_rule".into();
+ PyCapsule::new(py, ffi, Some(name))
+ }
+}
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 52bd600c..accb60f1 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -133,6 +133,16 @@ class TableProviderExportable(Protocol):
def __datafusion_table_provider__(self, session: Any) -> object: ... #
noqa: D105
+class PhysicalOptimizerRuleExportable(Protocol):
+ """Type hint for object that has __datafusion_physical_optimizer_rule__
PyCapsule.
+
+ The method returns a PyCapsule wrapping an ``FFI_PhysicalOptimizerRule``,
+ typically produced by a separate compiled extension.
+ """
+
+ def __datafusion_physical_optimizer_rule__(self) -> object: ... # noqa:
D105
+
+
class SessionConfig:
"""Session configuration options."""
@@ -1566,6 +1576,30 @@ class SessionContext:
"""
return self.ctx.remove_optimizer_rule(name)
+ def add_physical_optimizer_rule(
+ self, rule: PhysicalOptimizerRuleExportable
+ ) -> None:
+ """Append a user-defined physical optimizer rule to the session.
+
+ The rule is imported via its ``__datafusion_physical_optimizer_rule__``
+ PyCapsule, typically produced by a separate compiled extension. The
+ underlying :class:`SessionState` is rebuilt from its current state
+ with the new rule appended, so previously registered tables, UDFs,
+ and catalogs are preserved.
+
+ Args:
+ rule: Object exposing ``__datafusion_physical_optimizer_rule__``,
+ a :class:`PhysicalOptimizerRuleExportable`.
+
+ Examples:
+ >>> from datafusion import SessionContext
+ >>> ctx = SessionContext()
+ >>> from my_extension import MyPhysicalOptimizerRule # doctest:
+SKIP
+ >>> rule = MyPhysicalOptimizerRule() # doctest: +SKIP
+ >>> ctx.add_physical_optimizer_rule(rule) # doctest: +SKIP
+ """
+ self.ctx.add_physical_optimizer_rule(rule)
+
def table_provider(self, name: str) -> Table:
"""Return the :py:class:`~datafusion.catalog.Table` for the given
table name.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]