This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new d021e6af feat: import user-defined physical optimizer rules over FFI 
(#1557)
d021e6af is described below

commit d021e6afa8e08bee42fb9673ba811352c464bdf7
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Jun 4 07:57:47 2026 -0400

    feat: import user-defined physical optimizer rules over FFI (#1557)
    
    * feat: user-defined OptimizerRule and AnalyzerRule from Python
    
    Expose `SessionContext.add_optimizer_rule` and
    `SessionContext.add_analyzer_rule` symmetric with the existing
    `remove_optimizer_rule`. Each accepts a Python subclass of the new
    `datafusion.optimizer.OptimizerRule` / `AnalyzerRule` ABCs.
    
    Implementation:
    
    * New `crates/core/src/optimizer_rules.rs` wraps user Python instances
      in `PyOptimizerRuleAdapter` / `PyAnalyzerRuleAdapter`, which
      implement the upstream `OptimizerRule` / `AnalyzerRule` traits.
    * `OptimizerRule.rewrite(plan)` returns `None` for "no change" or a
      new `LogicalPlan`. The adapter maps that to
      `Transformed::no` / `Transformed::yes` so the upstream optimizer's
      fixed-point loop terminates correctly.
    * `AnalyzerRule.analyze(plan)` must always return a `LogicalPlan`;
      returning `None` surfaces a `DataFusionError::Execution` naming the
      offending rule.
    * The upstream `&dyn OptimizerConfig` / `&ConfigOptions` arguments are
      not surfaced to Python in this MVP; rules that need configuration
      should capture it at construction time (for example by holding a
      `SessionContext` reference) or be implemented in Rust.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * feat: import FFI physical optimizer rules; drop Python logical rules
    
    Replace the Python-defined OptimizerRule/AnalyzerRule approach with 
FFI-imported physical optimizer rules.
    
    The Python logical-rule approach could observe plans but not transform 
them: there are no Python constructors for LogicalPlan node variants, so a rule 
could only return None or the input plan unchanged. The audience for custom 
rules also overlaps strongly with people who can write Rust.
    
    DataFusion exposes no FFI bridge for the logical OptimizerRule/AnalyzerRule 
traits, but it does export FFI_PhysicalOptimizerRule for the physical 
PhysicalOptimizerRule trait. This commit imports those instead.
    
    Changes:
    
    * Remove crates/core/src/optimizer_rules.rs, 
python/datafusion/optimizer.py, python/tests/test_optimizer.py, and the 
SessionContext.add_optimizer_rule / add_analyzer_rule methods. 
remove_optimizer_rule is unchanged (pre-existing).
    * New crates/core/src/physical_optimizer.rs reads a 
__datafusion_physical_optimizer_rule__ capsule and converts it via Arc<dyn 
PhysicalOptimizerRule>::from(&FFI_PhysicalOptimizerRule).
    * SessionContext gains a physical_optimizer_rules constructor argument. 
Upstream offers no API to add physical rules to a live context, so they are 
appended to the builder at construction time only.
    * The datafusion-ffi-example crate gains MyPhysicalOptimizerRule, a 
counter-backed rule used by _test_physical_optimizer_rule.py to prove the rule 
fires over FFI during physical planning.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * refactor: type physical_optimizer_rules with an Exportable Protocol
    
    Replace the `list[Any]` hint on the SessionContext 
`physical_optimizer_rules` argument with a `PhysicalOptimizerRuleExportable` 
Protocol, matching the existing `TableProviderExportable` / `*Exportable` 
pattern used for other FFI-capsule objects.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * docs: reference PhysicalOptimizerRuleExportable in SessionContext 
docstring
    
    Point the `physical_optimizer_rules` argument docs at the new
    `PhysicalOptimizerRuleExportable` Protocol instead of describing the duck 
type inline.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * docs: move FFI capsule detail to PhysicalOptimizerRuleExportable
    
    The PyCapsule / FFI_PhysicalOptimizerRule mechanics describe the Protocol, 
not the SessionContext constructor. Move that detail onto 
PhysicalOptimizerRuleExportable and leave the constructor argument docs focused 
on behavior.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * docs: drop redundant comment in SessionContext constructor
    
    Remove the explanatory comment about FFI bridge availability; the same 
information already lives on PhysicalOptimizerRuleExportable.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * docs: drop module-level doc comment from physical_optimizer
    
    Sibling FFI-import modules (udf, udaf, catalog, table) carry no 
module-level docs, and the rst-style markup did not match Rust conventions. The 
function doc comment already states intent.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * refactor: import physical optimizer rule via from_pycapsule! macro
    
    Replace the hand-written crates/core/src/physical_optimizer.rs with a 
`from_pycapsule!` invocation in the util crate, matching 
`physical_codec_from_pycapsule` and the other FFI capsule importers. The macro 
already handles the hasattr/getattr/cast/validate/pointer_checked sequence and 
the infallible `Arc::from(&FFI)` conversion, so the dedicated module is no 
longer needed.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * docs: trim PhysicalOptimizerRuleExportable docstring
    
    Drop the sentence about logical-rule FFI availability; it is background, 
not type-hint information, and keeps the Protocol docstring in line with the 
other *Exportable hints.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Minor refactor
    
    * refactor: register physical optimizer rules via live add method
    
    Drop the `physical_optimizer_rules` constructor argument on
    `SessionContext` and replace it with `add_physical_optimizer_rule`,
    matching the existing `register_*` shape on the same class. The new
    method rebuilds the session state via 
`SessionStateBuilder::new_from_existing`
    so previously registered tables, UDFs, and catalogs are preserved.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * test: drop redundant FFI physical optimizer rule export test
    
    Coverage subsumed by test_ffi_physical_optimizer_rule_runs_during_planning,
    which exercises the same capsule export via add_physical_optimizer_rule.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 crates/core/src/context.rs                         | 14 +++-
 crates/util/src/lib.rs                             |  9 ++
 .../python/tests/_test_physical_optimizer_rule.py  | 45 ++++++++++
 examples/datafusion-ffi-example/src/lib.rs         |  3 +
 .../src/physical_optimizer.rs                      | 98 ++++++++++++++++++++++
 python/datafusion/context.py                       | 34 ++++++++
 6 files changed, 202 insertions(+), 1 deletion(-)

diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs
index d714861a..da0df751 100644
--- a/crates/core/src/context.rs
+++ b/crates/core/src/context.rs
@@ -59,7 +59,8 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
 use datafusion_python_util::{
     create_logical_extension_capsule, create_physical_extension_capsule,
     ffi_logical_codec_from_pycapsule, get_global_ctx, get_tokio_runtime,
-    physical_codec_from_pycapsule, spawn_future, wait_for_future,
+    physical_codec_from_pycapsule, physical_optimizer_rule_from_pycapsule, 
spawn_future,
+    wait_for_future,
 };
 use object_store::ObjectStore;
 use pyo3::IntoPyObjectExt;
@@ -1195,6 +1196,17 @@ impl PySessionContext {
         self.ctx.remove_optimizer_rule(name)
     }
 
+    pub fn add_physical_optimizer_rule(&self, rule: Bound<'_, PyAny>) -> 
PyDataFusionResult<()> {
+        let rule = physical_optimizer_rule_from_pycapsule(&rule)?;
+        let state_ref = self.ctx.state_ref();
+        let mut guard = state_ref.write();
+        let new_state = SessionStateBuilder::new_from_existing(guard.clone())
+            .with_physical_optimizer_rule(rule)
+            .build();
+        *guard = new_state;
+        Ok(())
+    }
+
     pub fn table_provider(&self, name: &str, py: Python) -> PyResult<PyTable> {
         let provider = wait_for_future(py, self.ctx.table_provider(name))
             // Outer error: runtime/async failure
diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs
index 75b8eeec..07aa0a2d 100644
--- a/crates/util/src/lib.rs
+++ b/crates/util/src/lib.rs
@@ -24,7 +24,9 @@ use datafusion::datasource::TableProvider;
 use datafusion::execution::TaskContext;
 use datafusion::execution::context::SessionContext;
 use datafusion::logical_expr::Volatility;
+use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion_ffi::execution::FFI_TaskContextProvider;
+use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
 use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
 use 
datafusion_ffi::proto::physical_extension_codec::FFI_PhysicalExtensionCodec;
 use datafusion_ffi::table_provider::FFI_TableProvider;
@@ -332,6 +334,13 @@ from_pycapsule!(
     dyn PhysicalExtensionCodec
 );
 
+from_pycapsule!(
+    physical_optimizer_rule_from_pycapsule,
+    "datafusion_physical_optimizer_rule",
+    FFI_PhysicalOptimizerRule,
+    dyn PhysicalOptimizerRule + Send + Sync
+);
+
 try_from_pycapsule!(
     task_context_from_pycapsule,
     "datafusion_task_context_provider",
diff --git 
a/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py 
b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py
new file mode 100644
index 00000000..0c877d78
--- /dev/null
+++ 
b/examples/datafusion-ffi-example/python/tests/_test_physical_optimizer_rule.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pyarrow as pa
+from datafusion import SessionContext
+from datafusion_ffi_example import MyPhysicalOptimizerRule
+
+
+def test_ffi_physical_optimizer_rule_runs_during_planning():
+    """A rule added via add_physical_optimizer_rule is invoked while the
+    physical plan is built, and the query still returns correct results."""
+    rule = MyPhysicalOptimizerRule()
+    ctx = SessionContext()
+    ctx.add_physical_optimizer_rule(rule)
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([1, 2, 3])],
+        names=["a"],
+    )
+    ctx.register_record_batches("t", [[batch]])
+
+    before = rule.optimize_calls()
+    result = ctx.sql("SELECT a FROM t").collect()
+    after = rule.optimize_calls()
+
+    assert after > before, (
+        f"Expected user FFI physical optimizer rule to fire, "
+        f"before={before} after={after}"
+    )
+    assert result[0].column(0).to_pylist() == [1, 2, 3]
diff --git a/examples/datafusion-ffi-example/src/lib.rs 
b/examples/datafusion-ffi-example/src/lib.rs
index 3323ac98..eccf7b81 100644
--- a/examples/datafusion-ffi-example/src/lib.rs
+++ b/examples/datafusion-ffi-example/src/lib.rs
@@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider, 
MyCatalogProvider, MyCatalogP
 use crate::config::MyConfig;
 use crate::logical_extension_codec::MyLogicalExtensionCodec;
 use crate::physical_extension_codec::MyPhysicalExtensionCodec;
+use crate::physical_optimizer::MyPhysicalOptimizerRule;
 use crate::scalar_udf::IsNullUDF;
 use crate::table_function::MyTableFunction;
 use crate::table_provider::MyTableProvider;
@@ -33,6 +34,7 @@ pub(crate) mod catalog_provider;
 pub(crate) mod config;
 pub(crate) mod logical_extension_codec;
 pub(crate) mod physical_extension_codec;
+pub(crate) mod physical_optimizer;
 pub(crate) mod scalar_udf;
 pub(crate) mod table_function;
 pub(crate) mod table_provider;
@@ -55,5 +57,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> 
PyResult<()> {
     m.add_class::<MyConfig>()?;
     m.add_class::<MyLogicalExtensionCodec>()?;
     m.add_class::<MyPhysicalExtensionCodec>()?;
+    m.add_class::<MyPhysicalOptimizerRule>()?;
     Ok(())
 }
diff --git a/examples/datafusion-ffi-example/src/physical_optimizer.rs 
b/examples/datafusion-ffi-example/src/physical_optimizer.rs
new file mode 100644
index 00000000..0acd1bb4
--- /dev/null
+++ b/examples/datafusion-ffi-example/src/physical_optimizer.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use datafusion::common::Result;
+use datafusion::common::config::ConfigOptions;
+use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_ffi::physical_optimizer::FFI_PhysicalOptimizerRule;
+use datafusion_python_util::get_tokio_runtime;
+use pyo3::prelude::*;
+use pyo3::types::PyCapsule;
+
+/// A physical optimizer rule that leaves every plan unchanged but bumps a
+/// shared counter each time it runs. Tests use the counter to prove that a
+/// session built with this rule actually routed physical planning through a
+/// user-supplied [`PhysicalOptimizerRule`] over FFI.
+#[derive(Debug)]
+struct CountingPhysicalOptimizerRule {
+    optimize_calls: Arc<AtomicUsize>,
+}
+
+impl PhysicalOptimizerRule for CountingPhysicalOptimizerRule {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        self.optimize_calls.fetch_add(1, Ordering::SeqCst);
+        Ok(plan)
+    }
+
+    fn name(&self) -> &str {
+        "counting_physical_optimizer_rule"
+    }
+
+    fn schema_check(&self) -> bool {
+        // The plan is returned unchanged, so the schema is preserved.
+        true
+    }
+}
+
+/// Python-visible handle that produces an [`FFI_PhysicalOptimizerRule`] and
+/// exposes the shared call counter.
+#[pyclass(
+    from_py_object,
+    name = "MyPhysicalOptimizerRule",
+    module = "datafusion_ffi_example",
+    subclass
+)]
+#[derive(Debug, Default, Clone)]
+pub(crate) struct MyPhysicalOptimizerRule {
+    optimize_calls: Arc<AtomicUsize>,
+}
+
+#[pymethods]
+impl MyPhysicalOptimizerRule {
+    #[new]
+    fn new() -> Self {
+        Self::default()
+    }
+
+    fn optimize_calls(&self) -> usize {
+        self.optimize_calls.load(Ordering::SeqCst)
+    }
+
+    fn __datafusion_physical_optimizer_rule__<'py>(
+        &self,
+        py: Python<'py>,
+    ) -> PyResult<Bound<'py, PyCapsule>> {
+        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+            Arc::new(CountingPhysicalOptimizerRule {
+                optimize_calls: Arc::clone(&self.optimize_calls),
+            });
+
+        let runtime = get_tokio_runtime().handle().clone();
+        let ffi = FFI_PhysicalOptimizerRule::new(rule, Some(runtime));
+
+        let name = cr"datafusion_physical_optimizer_rule".into();
+        PyCapsule::new(py, ffi, Some(name))
+    }
+}
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 52bd600c..accb60f1 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -133,6 +133,16 @@ class TableProviderExportable(Protocol):
     def __datafusion_table_provider__(self, session: Any) -> object: ...  # 
noqa: D105
 
 
+class PhysicalOptimizerRuleExportable(Protocol):
+    """Type hint for object that has __datafusion_physical_optimizer_rule__ 
PyCapsule.
+
+    The method returns a PyCapsule wrapping an ``FFI_PhysicalOptimizerRule``,
+    typically produced by a separate compiled extension.
+    """
+
+    def __datafusion_physical_optimizer_rule__(self) -> object: ...  # noqa: 
D105
+
+
 class SessionConfig:
     """Session configuration options."""
 
@@ -1566,6 +1576,30 @@ class SessionContext:
         """
         return self.ctx.remove_optimizer_rule(name)
 
+    def add_physical_optimizer_rule(
+        self, rule: PhysicalOptimizerRuleExportable
+    ) -> None:
+        """Append a user-defined physical optimizer rule to the session.
+
+        The rule is imported via its ``__datafusion_physical_optimizer_rule__``
+        PyCapsule, typically produced by a separate compiled extension. The
+        underlying :class:`SessionState` is rebuilt from its current state
+        with the new rule appended, so previously registered tables, UDFs,
+        and catalogs are preserved.
+
+        Args:
+            rule: Object exposing ``__datafusion_physical_optimizer_rule__``,
+                a :class:`PhysicalOptimizerRuleExportable`.
+
+        Examples:
+            >>> from datafusion import SessionContext
+            >>> ctx = SessionContext()
+            >>> from my_extension import MyPhysicalOptimizerRule  # doctest: 
+SKIP
+            >>> rule = MyPhysicalOptimizerRule()  # doctest: +SKIP
+            >>> ctx.add_physical_optimizer_rule(rule)  # doctest: +SKIP
+        """
+        self.ctx.add_physical_optimizer_rule(rule)
+
     def table_provider(self, name: str) -> Table:
         """Return the :py:class:`~datafusion.catalog.Table` for the given 
table name.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to