This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e1d8d463b5 Add optimize_with_context to FFI_PhysicalOptimizerRule 
(#22584)
e1d8d463b5 is described below

commit e1d8d463b51e67e777b3ef744e80fa75593b3e5b
Author: Nathan <[email protected]>
AuthorDate: Thu Jun 4 21:41:07 2026 -0400

    Add optimize_with_context to FFI_PhysicalOptimizerRule (#22584)
    
    ## Which issue does this PR close?
    
    Closes #22334
    
    ## Rationale for this change
    
    `FFI_PhysicalOptimizerRule` only plumbed `optimize`, `name`, and
    `schema_check` — not `optimize_with_context`. Foreign rules that
    override the context-aware variant had their override silently
    discarded.
    
    ## What changes are included in this PR?
    
    - Added `FFI_PhysicalOptimizerContext` struct to pass optimizer context
    (config + statistics registry) across FFI
    - Added `optimize_with_context` function pointer to
    `FFI_PhysicalOptimizerRule`
    - `ForeignPhysicalOptimizerRule` now overrides `optimize_with_context`
    to route through FFI
    - Unit tests for context-aware round-trip (with and without statistics
    registry)
    
    ## Are these changes tested?
    
    Yes — two new tests (`test_optimize_with_context_round_trip`,
    `test_optimize_with_context_with_registry`) plus all existing tests
    continue to pass.
    
    ## Are there any user-facing changes?
    
    API change: `FFI_PhysicalOptimizerRule` gains a new field
    (`optimize_with_context`). This is a layout change for any external
    consumer of this struct.
    
    ---------
    
    Co-authored-by: Nathan Bezualem <[email protected]>
---
 datafusion/ffi/src/physical_optimizer.rs       | 227 ++++++++++++++++++++++++-
 datafusion/ffi/src/tests/mod.rs                |   4 +
 datafusion/ffi/src/tests/physical_optimizer.rs |  44 ++++-
 datafusion/ffi/tests/ffi_physical_optimizer.rs |  28 ++-
 4 files changed, 299 insertions(+), 4 deletions(-)

diff --git a/datafusion/ffi/src/physical_optimizer.rs 
b/datafusion/ffi/src/physical_optimizer.rs
index 84dc40ce8f..3fb2132083 100644
--- a/datafusion/ffi/src/physical_optimizer.rs
+++ b/datafusion/ffi/src/physical_optimizer.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
 use async_trait::async_trait;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::error::Result;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{PhysicalOptimizerContext, 
PhysicalOptimizerRule};
 use datafusion_physical_plan::ExecutionPlan;
 use stabby::string::String as SString;
 use tokio::runtime::Handle;
@@ -31,6 +31,84 @@ use crate::execution_plan::FFI_ExecutionPlan;
 use crate::util::FFI_Result;
 use crate::{df_result, sresult_return};
 
+/// A stable struct for sharing [`PhysicalOptimizerContext`] across FFI 
boundaries.
+///
+/// This provides access to configuration options for optimizer rules that need
+/// extended context beyond the plan itself.
+#[repr(C)]
+#[derive(Debug)]
+pub struct FFI_PhysicalOptimizerContext {
+    pub config_options:
+        unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> 
FFI_ConfigOptions,
+
+    /// Release the memory of the private data.
+    pub release: unsafe extern "C" fn(&mut FFI_PhysicalOptimizerContext),
+
+    /// Internal data. Only accessed by the provider.
+    pub private_data: *const c_void,
+}
+
+unsafe impl Send for FFI_PhysicalOptimizerContext {}
+unsafe impl Sync for FFI_PhysicalOptimizerContext {}
+
+struct OptimizerContextPrivateData {
+    config: ConfigOptions,
+}
+
+impl FFI_PhysicalOptimizerContext {
+    pub fn new(context: &dyn PhysicalOptimizerContext) -> Self {
+        let private_data = Box::new(OptimizerContextPrivateData {
+            config: context.config_options().clone(),
+        });
+        let private_data = Box::into_raw(private_data) as *const c_void;
+
+        Self {
+            config_options: context_config_options_fn,
+            release: context_release_fn,
+            private_data,
+        }
+    }
+
+    fn inner(&self) -> &OptimizerContextPrivateData {
+        unsafe { &*(self.private_data as *const OptimizerContextPrivateData) }
+    }
+}
+
+impl Drop for FFI_PhysicalOptimizerContext {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+unsafe extern "C" fn context_config_options_fn(
+    ctx: &FFI_PhysicalOptimizerContext,
+) -> FFI_ConfigOptions {
+    FFI_ConfigOptions::from(&ctx.inner().config)
+}
+
+unsafe extern "C" fn context_release_fn(ctx: &mut 
FFI_PhysicalOptimizerContext) {
+    if !ctx.private_data.is_null() {
+        unsafe {
+            let _ = Box::from_raw(ctx.private_data as *mut 
OptimizerContextPrivateData);
+        }
+        ctx.private_data = std::ptr::null();
+    }
+}
+
+/// Reconstructed [`PhysicalOptimizerContext`] on the consumer side of FFI.
+///
+/// `StatisticsRegistry` is not plumbed because it contains trait object 
vtables
+/// that are only valid within the originating library.
+struct ForeignOptimizerContext {
+    config: ConfigOptions,
+}
+
+impl PhysicalOptimizerContext for ForeignOptimizerContext {
+    fn config_options(&self) -> &ConfigOptions {
+        &self.config
+    }
+}
+
 /// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI 
boundaries.
 #[repr(C)]
 #[derive(Debug)]
@@ -55,6 +133,12 @@ pub struct FFI_PhysicalOptimizerRule {
     /// Return the major DataFusion version number of this rule.
     pub version: unsafe extern "C" fn() -> u64,
 
+    pub optimize_with_context: unsafe extern "C" fn(
+        &Self,
+        plan: &FFI_ExecutionPlan,
+        context: &FFI_PhysicalOptimizerContext,
+    ) -> FFI_Result<FFI_ExecutionPlan>,
+
     /// Internal data. This is only to be accessed by the provider of the rule.
     /// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this 
data.
     pub private_data: *mut c_void,
@@ -98,6 +182,23 @@ unsafe extern "C" fn optimize_fn_wrapper(
     FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
 }
 
+unsafe extern "C" fn optimize_with_context_fn_wrapper(
+    rule: &FFI_PhysicalOptimizerRule,
+    plan: &FFI_ExecutionPlan,
+    context: &FFI_PhysicalOptimizerContext,
+) -> FFI_Result<FFI_ExecutionPlan> {
+    let runtime = rule.runtime();
+    let inner = rule.inner();
+    let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
+    let config = sresult_return!(ConfigOptions::try_from(unsafe {
+        (context.config_options)(context)
+    }));
+    let foreign_ctx = ForeignOptimizerContext { config };
+    let optimized_plan = sresult_return!(inner.optimize_with_context(plan, 
&foreign_ctx));
+
+    FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
+}
+
 unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> 
SString {
     let rule = rule.inner();
     rule.name().into()
@@ -127,6 +228,7 @@ unsafe extern "C" fn clone_fn_wrapper(
 
     FFI_PhysicalOptimizerRule {
         optimize: optimize_fn_wrapper,
+        optimize_with_context: optimize_with_context_fn_wrapper,
         name: name_fn_wrapper,
         schema_check: schema_check_fn_wrapper,
         clone: clone_fn_wrapper,
@@ -160,6 +262,7 @@ impl FFI_PhysicalOptimizerRule {
 
         Self {
             optimize: optimize_fn_wrapper,
+            optimize_with_context: optimize_with_context_fn_wrapper,
             name: name_fn_wrapper,
             schema_check: schema_check_fn_wrapper,
             clone: clone_fn_wrapper,
@@ -220,6 +323,24 @@ impl PhysicalOptimizerRule for 
ForeignPhysicalOptimizerRule {
         (&optimized_plan).try_into()
     }
 
+    fn optimize_with_context(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: &dyn PhysicalOptimizerContext,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let ffi_context = FFI_PhysicalOptimizerContext::new(context);
+        let plan = FFI_ExecutionPlan::new(plan, None);
+
+        let optimized_plan = unsafe {
+            df_result!((self.rule.optimize_with_context)(
+                &self.rule,
+                &plan,
+                &ffi_context
+            ))?
+        };
+        (&optimized_plan).try_into()
+    }
+
     fn name(&self) -> &str {
         &self.name
     }
@@ -236,8 +357,11 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::config::ConfigOptions;
     use datafusion_common::error::Result;
-    use datafusion_physical_optimizer::PhysicalOptimizerRule;
+    use datafusion_physical_optimizer::{
+        ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule,
+    };
     use datafusion_physical_plan::ExecutionPlan;
+    use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
 
     use super::*;
     use crate::execution_plan::tests::EmptyExec;
@@ -265,6 +389,39 @@ mod tests {
         }
     }
 
+    /// A rule that returns an error from `optimize` but succeeds when
+    /// called via `optimize_with_context`, proving the context path is taken.
+    #[derive(Debug)]
+    struct ContextAwareRule;
+
+    impl PhysicalOptimizerRule for ContextAwareRule {
+        fn optimize(
+            &self,
+            _plan: Arc<dyn ExecutionPlan>,
+            _config: &ConfigOptions,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Err(datafusion_common::DataFusionError::Plan(
+                "optimize should not be called directly".to_string(),
+            ))
+        }
+
+        fn optimize_with_context(
+            &self,
+            plan: Arc<dyn ExecutionPlan>,
+            _context: &dyn PhysicalOptimizerContext,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Ok(plan)
+        }
+
+        fn name(&self) -> &str {
+            "context_aware_rule"
+        }
+
+        fn schema_check(&self) -> bool {
+            true
+        }
+    }
+
     fn create_test_plan() -> Arc<dyn ExecutionPlan> {
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
false)]));
@@ -374,4 +531,70 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_optimize_with_context_round_trip() -> Result<()> {
+        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+            Arc::new(ContextAwareRule);
+
+        let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
+        ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
+
+        let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+            (&ffi_rule).into();
+
+        let plan = create_test_plan();
+        let config = ConfigOptions::new();
+        let context = ConfigOnlyContext::new(&config);
+
+        let optimized = foreign_rule.optimize_with_context(plan, &context)?;
+        assert_eq!(optimized.name(), "empty-exec");
+
+        Ok(())
+    }
+
+    /// Tests that `optimize_with_context` works even when the caller supplies 
a
+    /// statistics registry. The registry cannot survive the FFI round-trip (it
+    /// contains trait object vtables that are library-local), so the provider
+    /// side will always see `None`. This test verifies the context-aware path
+    /// still succeeds in that scenario.
+    #[test]
+    fn test_optimize_with_context_with_registry() -> Result<()> {
+        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+            Arc::new(ContextAwareRule);
+
+        let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
+        ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
+
+        let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+            (&ffi_rule).into();
+
+        struct ContextWithRegistry {
+            config: ConfigOptions,
+            registry: StatisticsRegistry,
+        }
+
+        impl PhysicalOptimizerContext for ContextWithRegistry {
+            fn config_options(&self) -> &ConfigOptions {
+                &self.config
+            }
+
+            fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
+                Some(&self.registry)
+            }
+        }
+
+        let ctx = ContextWithRegistry {
+            config: ConfigOptions::new(),
+            registry: StatisticsRegistry::default_with_builtin_providers(),
+        };
+
+        let plan = create_test_plan();
+        // The optimize_with_context path works, but the registry is not
+        // available on the provider side (it will be None).
+        let optimized = foreign_rule.optimize_with_context(plan, &ctx)?;
+        assert_eq!(optimized.name(), "empty-exec");
+
+        Ok(())
+    }
 }
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 62e62d8235..03b3a7ab24 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -113,6 +113,8 @@ pub struct ForeignLibraryModule {
 
     pub create_physical_optimizer_rule: extern "C" fn() -> 
FFI_PhysicalOptimizerRule,
 
+    pub create_context_aware_optimizer_rule: extern "C" fn() -> 
FFI_PhysicalOptimizerRule,
+
     pub version: extern "C" fn() -> u64,
 }
 
@@ -259,6 +261,8 @@ pub extern "C" fn datafusion_ffi_get_module() -> 
ForeignLibraryModule {
         create_table_with_statistics,
         create_physical_optimizer_rule:
             physical_optimizer::create_physical_optimizer_rule,
+        create_context_aware_optimizer_rule:
+            physical_optimizer::create_context_aware_optimizer_rule,
         version: super::version,
     }
 }
diff --git a/datafusion/ffi/src/tests/physical_optimizer.rs 
b/datafusion/ffi/src/tests/physical_optimizer.rs
index 2476526125..581f454e52 100644
--- a/datafusion/ffi/src/tests/physical_optimizer.rs
+++ b/datafusion/ffi/src/tests/physical_optimizer.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
 
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::error::Result;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{PhysicalOptimizerContext, 
PhysicalOptimizerRule};
 use datafusion_physical_plan::ExecutionPlan;
 use datafusion_physical_plan::limit::GlobalLimitExec;
 
@@ -52,3 +52,45 @@ pub(crate) extern "C" fn create_physical_optimizer_rule() -> 
FFI_PhysicalOptimiz
     let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = 
Arc::new(AddLimitRule);
     FFI_PhysicalOptimizerRule::new(rule, None)
 }
+
+/// A rule that returns an error from `optimize()` (proving the context path 
must
+/// be taken) but succeeds in `optimize_with_context()` by wrapping the plan 
in a
+/// `GlobalLimitExec`.
+#[derive(Debug)]
+struct ContextAwareAddLimitRule;
+
+impl PhysicalOptimizerRule for ContextAwareAddLimitRule {
+    fn optimize(
+        &self,
+        _plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Err(datafusion_common::DataFusionError::Plan(
+            "optimize should not be called directly; use optimize_with_context"
+                .to_string(),
+        ))
+    }
+
+    fn optimize_with_context(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _context: &dyn PhysicalOptimizerContext,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(10))))
+    }
+
+    fn name(&self) -> &str {
+        "context_aware_add_limit_rule"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+pub(crate) extern "C" fn create_context_aware_optimizer_rule() -> 
FFI_PhysicalOptimizerRule
+{
+    let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+        Arc::new(ContextAwareAddLimitRule);
+    FFI_PhysicalOptimizerRule::new(rule, None)
+}
diff --git a/datafusion/ffi/tests/ffi_physical_optimizer.rs 
b/datafusion/ffi/tests/ffi_physical_optimizer.rs
index d860fda340..d8baf52288 100644
--- a/datafusion/ffi/tests/ffi_physical_optimizer.rs
+++ b/datafusion/ffi/tests/ffi_physical_optimizer.rs
@@ -25,7 +25,7 @@ mod tests {
     use datafusion_ffi::execution_plan::tests::EmptyExec;
     use datafusion_ffi::physical_optimizer::ForeignPhysicalOptimizerRule;
     use datafusion_ffi::tests::utils::get_module;
-    use datafusion_physical_optimizer::PhysicalOptimizerRule;
+    use datafusion_physical_optimizer::{ConfigOnlyContext, 
PhysicalOptimizerRule};
     use datafusion_physical_plan::ExecutionPlan;
 
     fn create_test_plan() -> Arc<dyn ExecutionPlan> {
@@ -66,4 +66,30 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_ffi_physical_optimizer_rule_with_context() -> Result<(), 
DataFusionError> {
+        let module = get_module()?;
+
+        let ffi_rule = (module.create_context_aware_optimizer_rule)();
+
+        let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+            (&ffi_rule).into();
+
+        // Verify that plain optimize fails (proving we need context path)
+        let plan = create_test_plan();
+        let config = ConfigOptions::new();
+        assert!(foreign_rule.optimize(plan, &config).is_err());
+
+        // Verify context-aware path works
+        let plan = create_test_plan();
+        let context = ConfigOnlyContext::new(&config);
+        let optimized = foreign_rule.optimize_with_context(plan, &context)?;
+
+        assert_eq!(optimized.name(), "GlobalLimitExec");
+        assert_eq!(optimized.children().len(), 1);
+        assert_eq!(optimized.children()[0].name(), "empty-exec");
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to