This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22584-d1ec74e0d16aacede8091224ca2f8463f0671842 in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit e1d8d463b51e67e777b3ef744e80fa75593b3e5b Author: Nathan <[email protected]> AuthorDate: Thu Jun 4 21:41:07 2026 -0400 Add optimize_with_context to FFI_PhysicalOptimizerRule (#22584) ## Which issue does this PR close? Closes #22334 ## Rationale for this change `FFI_PhysicalOptimizerRule` only plumbed `optimize`, `name`, and `schema_check` — not `optimize_with_context`. Foreign rules that override the context-aware variant had their override silently discarded. ## What changes are included in this PR? - Added `FFI_PhysicalOptimizerContext` struct to pass optimizer context (config + statistics registry) across FFI - Added `optimize_with_context` function pointer to `FFI_PhysicalOptimizerRule` - `ForeignPhysicalOptimizerRule` now overrides `optimize_with_context` to route through FFI - Unit tests for context-aware round-trip (with and without statistics registry) ## Are these changes tested? Yes — two new tests (`test_optimize_with_context_round_trip`, `test_optimize_with_context_with_registry`) plus all existing tests continue to pass. ## Are there any user-facing changes? API change: `FFI_PhysicalOptimizerRule` gains a new field (`optimize_with_context`). This is a layout change for any external consumer of this struct. --------- Co-authored-by: Nathan Bezualem <[email protected]> --- datafusion/ffi/src/physical_optimizer.rs | 227 ++++++++++++++++++++++++- datafusion/ffi/src/tests/mod.rs | 4 + datafusion/ffi/src/tests/physical_optimizer.rs | 44 ++++- datafusion/ffi/tests/ffi_physical_optimizer.rs | 28 ++- 4 files changed, 299 insertions(+), 4 deletions(-) diff --git a/datafusion/ffi/src/physical_optimizer.rs b/datafusion/ffi/src/physical_optimizer.rs index 84dc40ce8f..3fb2132083 100644 --- a/datafusion/ffi/src/physical_optimizer.rs +++ b/datafusion/ffi/src/physical_optimizer.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; -use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::{PhysicalOptimizerContext, PhysicalOptimizerRule}; use datafusion_physical_plan::ExecutionPlan; use stabby::string::String as SString; use tokio::runtime::Handle; @@ -31,6 +31,84 @@ use crate::execution_plan::FFI_ExecutionPlan; use crate::util::FFI_Result; use crate::{df_result, sresult_return}; +/// A stable struct for sharing [`PhysicalOptimizerContext`] across FFI boundaries. +/// +/// This provides access to configuration options for optimizer rules that need +/// extended context beyond the plan itself. +#[repr(C)] +#[derive(Debug)] +pub struct FFI_PhysicalOptimizerContext { + pub config_options: + unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> FFI_ConfigOptions, + + /// Release the memory of the private data. + pub release: unsafe extern "C" fn(&mut FFI_PhysicalOptimizerContext), + + /// Internal data. Only accessed by the provider. + pub private_data: *const c_void, +} + +unsafe impl Send for FFI_PhysicalOptimizerContext {} +unsafe impl Sync for FFI_PhysicalOptimizerContext {} + +struct OptimizerContextPrivateData { + config: ConfigOptions, +} + +impl FFI_PhysicalOptimizerContext { + pub fn new(context: &dyn PhysicalOptimizerContext) -> Self { + let private_data = Box::new(OptimizerContextPrivateData { + config: context.config_options().clone(), + }); + let private_data = Box::into_raw(private_data) as *const c_void; + + Self { + config_options: context_config_options_fn, + release: context_release_fn, + private_data, + } + } + + fn inner(&self) -> &OptimizerContextPrivateData { + unsafe { &*(self.private_data as *const OptimizerContextPrivateData) } + } +} + +impl Drop for FFI_PhysicalOptimizerContext { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +unsafe extern "C" fn context_config_options_fn( + ctx: &FFI_PhysicalOptimizerContext, +) -> FFI_ConfigOptions { + FFI_ConfigOptions::from(&ctx.inner().config) +} + +unsafe extern "C" fn context_release_fn(ctx: &mut FFI_PhysicalOptimizerContext) { + if !ctx.private_data.is_null() { + unsafe { + let _ = Box::from_raw(ctx.private_data as *mut OptimizerContextPrivateData); + } + ctx.private_data = std::ptr::null(); + } +} + +/// Reconstructed [`PhysicalOptimizerContext`] on the consumer side of FFI. +/// +/// `StatisticsRegistry` is not plumbed because it contains trait object vtables +/// that are only valid within the originating library. +struct ForeignOptimizerContext { + config: ConfigOptions, +} + +impl PhysicalOptimizerContext for ForeignOptimizerContext { + fn config_options(&self) -> &ConfigOptions { + &self.config + } +} + /// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries. #[repr(C)] #[derive(Debug)] @@ -55,6 +133,12 @@ pub struct FFI_PhysicalOptimizerRule { /// Return the major DataFusion version number of this rule. pub version: unsafe extern "C" fn() -> u64, + pub optimize_with_context: unsafe extern "C" fn( + &Self, + plan: &FFI_ExecutionPlan, + context: &FFI_PhysicalOptimizerContext, + ) -> FFI_Result<FFI_ExecutionPlan>, + /// Internal data. This is only to be accessed by the provider of the rule. /// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data. pub private_data: *mut c_void, @@ -98,6 +182,23 @@ unsafe extern "C" fn optimize_fn_wrapper( FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime)) } +unsafe extern "C" fn optimize_with_context_fn_wrapper( + rule: &FFI_PhysicalOptimizerRule, + plan: &FFI_ExecutionPlan, + context: &FFI_PhysicalOptimizerContext, +) -> FFI_Result<FFI_ExecutionPlan> { + let runtime = rule.runtime(); + let inner = rule.inner(); + let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into()); + let config = sresult_return!(ConfigOptions::try_from(unsafe { + (context.config_options)(context) + })); + let foreign_ctx = ForeignOptimizerContext { config }; + let optimized_plan = sresult_return!(inner.optimize_with_context(plan, &foreign_ctx)); + + FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime)) +} + unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString { let rule = rule.inner(); rule.name().into() @@ -127,6 +228,7 @@ unsafe extern "C" fn clone_fn_wrapper( FFI_PhysicalOptimizerRule { optimize: optimize_fn_wrapper, + optimize_with_context: optimize_with_context_fn_wrapper, name: name_fn_wrapper, schema_check: schema_check_fn_wrapper, clone: clone_fn_wrapper, @@ -160,6 +262,7 @@ impl FFI_PhysicalOptimizerRule { Self { optimize: optimize_fn_wrapper, + optimize_with_context: optimize_with_context_fn_wrapper, name: name_fn_wrapper, schema_check: schema_check_fn_wrapper, clone: clone_fn_wrapper, @@ -220,6 +323,24 @@ impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule { (&optimized_plan).try_into() } + fn optimize_with_context( + &self, + plan: Arc<dyn ExecutionPlan>, + context: &dyn PhysicalOptimizerContext, + ) -> Result<Arc<dyn ExecutionPlan>> { + let ffi_context = FFI_PhysicalOptimizerContext::new(context); + let plan = FFI_ExecutionPlan::new(plan, None); + + let optimized_plan = unsafe { + df_result!((self.rule.optimize_with_context)( + &self.rule, + &plan, + &ffi_context + ))? + }; + (&optimized_plan).try_into() + } + fn name(&self) -> &str { &self.name } @@ -236,8 +357,11 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; - use datafusion_physical_optimizer::PhysicalOptimizerRule; + use datafusion_physical_optimizer::{ + ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule, + }; use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::operator_statistics::StatisticsRegistry; use super::*; use crate::execution_plan::tests::EmptyExec; @@ -265,6 +389,39 @@ mod tests { } } + /// A rule that returns an error from `optimize` but succeeds when + /// called via `optimize_with_context`, proving the context path is taken. + #[derive(Debug)] + struct ContextAwareRule; + + impl PhysicalOptimizerRule for ContextAwareRule { + fn optimize( + &self, + _plan: Arc<dyn ExecutionPlan>, + _config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + Err(datafusion_common::DataFusionError::Plan( + "optimize should not be called directly".to_string(), + )) + } + + fn optimize_with_context( + &self, + plan: Arc<dyn ExecutionPlan>, + _context: &dyn PhysicalOptimizerContext, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(plan) + } + + fn name(&self) -> &str { + "context_aware_rule" + } + + fn schema_check(&self) -> bool { + true + } + } + fn create_test_plan() -> Arc<dyn ExecutionPlan> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)])); @@ -374,4 +531,70 @@ mod tests { Ok(()) } + + #[test] + fn test_optimize_with_context_round_trip() -> Result<()> { + let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = + Arc::new(ContextAwareRule); + + let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None); + ffi_rule.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = + (&ffi_rule).into(); + + let plan = create_test_plan(); + let config = ConfigOptions::new(); + let context = ConfigOnlyContext::new(&config); + + let optimized = foreign_rule.optimize_with_context(plan, &context)?; + assert_eq!(optimized.name(), "empty-exec"); + + Ok(()) + } + + /// Tests that `optimize_with_context` works even when the caller supplies a + /// statistics registry. The registry cannot survive the FFI round-trip (it + /// contains trait object vtables that are library-local), so the provider + /// side will always see `None`. This test verifies the context-aware path + /// still succeeds in that scenario. + #[test] + fn test_optimize_with_context_with_registry() -> Result<()> { + let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = + Arc::new(ContextAwareRule); + + let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None); + ffi_rule.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = + (&ffi_rule).into(); + + struct ContextWithRegistry { + config: ConfigOptions, + registry: StatisticsRegistry, + } + + impl PhysicalOptimizerContext for ContextWithRegistry { + fn config_options(&self) -> &ConfigOptions { + &self.config + } + + fn statistics_registry(&self) -> Option<&StatisticsRegistry> { + Some(&self.registry) + } + } + + let ctx = ContextWithRegistry { + config: ConfigOptions::new(), + registry: StatisticsRegistry::default_with_builtin_providers(), + }; + + let plan = create_test_plan(); + // The optimize_with_context path works, but the registry is not + // available on the provider side (it will be None). + let optimized = foreign_rule.optimize_with_context(plan, &ctx)?; + assert_eq!(optimized.name(), "empty-exec"); + + Ok(()) + } } diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 62e62d8235..03b3a7ab24 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -113,6 +113,8 @@ pub struct ForeignLibraryModule { pub create_physical_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule, + pub create_context_aware_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule, + pub version: extern "C" fn() -> u64, } @@ -259,6 +261,8 @@ pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule { create_table_with_statistics, create_physical_optimizer_rule: physical_optimizer::create_physical_optimizer_rule, + create_context_aware_optimizer_rule: + physical_optimizer::create_context_aware_optimizer_rule, version: super::version, } } diff --git a/datafusion/ffi/src/tests/physical_optimizer.rs b/datafusion/ffi/src/tests/physical_optimizer.rs index 2476526125..581f454e52 100644 --- a/datafusion/ffi/src/tests/physical_optimizer.rs +++ b/datafusion/ffi/src/tests/physical_optimizer.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; -use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::{PhysicalOptimizerContext, PhysicalOptimizerRule}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::limit::GlobalLimitExec; @@ -52,3 +52,45 @@ pub(crate) extern "C" fn create_physical_optimizer_rule() -> FFI_PhysicalOptimiz let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(AddLimitRule); FFI_PhysicalOptimizerRule::new(rule, None) } + +/// A rule that returns an error from `optimize()` (proving the context path must +/// be taken) but succeeds in `optimize_with_context()` by wrapping the plan in a +/// `GlobalLimitExec`. +#[derive(Debug)] +struct ContextAwareAddLimitRule; + +impl PhysicalOptimizerRule for ContextAwareAddLimitRule { + fn optimize( + &self, + _plan: Arc<dyn ExecutionPlan>, + _config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + Err(datafusion_common::DataFusionError::Plan( + "optimize should not be called directly; use optimize_with_context" + .to_string(), + )) + } + + fn optimize_with_context( + &self, + plan: Arc<dyn ExecutionPlan>, + _context: &dyn PhysicalOptimizerContext, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(10)))) + } + + fn name(&self) -> &str { + "context_aware_add_limit_rule" + } + + fn schema_check(&self) -> bool { + true + } +} + +pub(crate) extern "C" fn create_context_aware_optimizer_rule() -> FFI_PhysicalOptimizerRule +{ + let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = + Arc::new(ContextAwareAddLimitRule); + FFI_PhysicalOptimizerRule::new(rule, None) +} diff --git a/datafusion/ffi/tests/ffi_physical_optimizer.rs b/datafusion/ffi/tests/ffi_physical_optimizer.rs index d860fda340..d8baf52288 100644 --- a/datafusion/ffi/tests/ffi_physical_optimizer.rs +++ b/datafusion/ffi/tests/ffi_physical_optimizer.rs @@ -25,7 +25,7 @@ mod tests { use datafusion_ffi::execution_plan::tests::EmptyExec; use datafusion_ffi::physical_optimizer::ForeignPhysicalOptimizerRule; use datafusion_ffi::tests::utils::get_module; - use datafusion_physical_optimizer::PhysicalOptimizerRule; + use datafusion_physical_optimizer::{ConfigOnlyContext, PhysicalOptimizerRule}; use datafusion_physical_plan::ExecutionPlan; fn create_test_plan() -> Arc<dyn ExecutionPlan> { @@ -66,4 +66,30 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_physical_optimizer_rule_with_context() -> Result<(), DataFusionError> { + let module = get_module()?; + + let ffi_rule = (module.create_context_aware_optimizer_rule)(); + + let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = + (&ffi_rule).into(); + + // Verify that plain optimize fails (proving we need context path) + let plan = create_test_plan(); + let config = ConfigOptions::new(); + assert!(foreign_rule.optimize(plan, &config).is_err()); + + // Verify context-aware path works + let plan = create_test_plan(); + let context = ConfigOnlyContext::new(&config); + let optimized = foreign_rule.optimize_with_context(plan, &context)?; + + assert_eq!(optimized.name(), "GlobalLimitExec"); + assert_eq!(optimized.children().len(), 1); + assert_eq!(optimized.children()[0].name(), "empty-exec"); + + Ok(()) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
