This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e1d8d463b5 Add optimize_with_context to FFI_PhysicalOptimizerRule
(#22584)
e1d8d463b5 is described below
commit e1d8d463b51e67e777b3ef744e80fa75593b3e5b
Author: Nathan <[email protected]>
AuthorDate: Thu Jun 4 21:41:07 2026 -0400
Add optimize_with_context to FFI_PhysicalOptimizerRule (#22584)
## Which issue does this PR close?
Closes #22334
## Rationale for this change
`FFI_PhysicalOptimizerRule` only plumbed `optimize`, `name`, and
`schema_check` — not `optimize_with_context`. Foreign rules that
override the context-aware variant had their override silently
discarded.
## What changes are included in this PR?
- Added `FFI_PhysicalOptimizerContext` struct to pass optimizer context
(config + statistics registry) across FFI
- Added `optimize_with_context` function pointer to
`FFI_PhysicalOptimizerRule`
- `ForeignPhysicalOptimizerRule` now overrides `optimize_with_context`
to route through FFI
- Unit tests for context-aware round-trip (with and without statistics
registry)
## Are these changes tested?
Yes — two new tests (`test_optimize_with_context_round_trip`,
`test_optimize_with_context_with_registry`) plus all existing tests
continue to pass.
## Are there any user-facing changes?
API change: `FFI_PhysicalOptimizerRule` gains a new field
(`optimize_with_context`). This is a layout change for any external
consumer of this struct.
---------
Co-authored-by: Nathan Bezualem <[email protected]>
---
datafusion/ffi/src/physical_optimizer.rs | 227 ++++++++++++++++++++++++-
datafusion/ffi/src/tests/mod.rs | 4 +
datafusion/ffi/src/tests/physical_optimizer.rs | 44 ++++-
datafusion/ffi/tests/ffi_physical_optimizer.rs | 28 ++-
4 files changed, 299 insertions(+), 4 deletions(-)
diff --git a/datafusion/ffi/src/physical_optimizer.rs
b/datafusion/ffi/src/physical_optimizer.rs
index 84dc40ce8f..3fb2132083 100644
--- a/datafusion/ffi/src/physical_optimizer.rs
+++ b/datafusion/ffi/src/physical_optimizer.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{PhysicalOptimizerContext,
PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
use stabby::string::String as SString;
use tokio::runtime::Handle;
@@ -31,6 +31,84 @@ use crate::execution_plan::FFI_ExecutionPlan;
use crate::util::FFI_Result;
use crate::{df_result, sresult_return};
+/// A stable struct for sharing [`PhysicalOptimizerContext`] across FFI
boundaries.
+///
+/// This provides access to configuration options for optimizer rules that need
+/// extended context beyond the plan itself.
+#[repr(C)]
+#[derive(Debug)]
+pub struct FFI_PhysicalOptimizerContext {
+ pub config_options:
+ unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) ->
FFI_ConfigOptions,
+
+ /// Release the memory of the private data.
+ pub release: unsafe extern "C" fn(&mut FFI_PhysicalOptimizerContext),
+
+ /// Internal data. Only accessed by the provider.
+ pub private_data: *const c_void,
+}
+
+unsafe impl Send for FFI_PhysicalOptimizerContext {}
+unsafe impl Sync for FFI_PhysicalOptimizerContext {}
+
+struct OptimizerContextPrivateData {
+ config: ConfigOptions,
+}
+
+impl FFI_PhysicalOptimizerContext {
+ pub fn new(context: &dyn PhysicalOptimizerContext) -> Self {
+ let private_data = Box::new(OptimizerContextPrivateData {
+ config: context.config_options().clone(),
+ });
+ let private_data = Box::into_raw(private_data) as *const c_void;
+
+ Self {
+ config_options: context_config_options_fn,
+ release: context_release_fn,
+ private_data,
+ }
+ }
+
+ fn inner(&self) -> &OptimizerContextPrivateData {
+ unsafe { &*(self.private_data as *const OptimizerContextPrivateData) }
+ }
+}
+
+impl Drop for FFI_PhysicalOptimizerContext {
+ fn drop(&mut self) {
+ unsafe { (self.release)(self) }
+ }
+}
+
+unsafe extern "C" fn context_config_options_fn(
+ ctx: &FFI_PhysicalOptimizerContext,
+) -> FFI_ConfigOptions {
+ FFI_ConfigOptions::from(&ctx.inner().config)
+}
+
+unsafe extern "C" fn context_release_fn(ctx: &mut
FFI_PhysicalOptimizerContext) {
+ if !ctx.private_data.is_null() {
+ unsafe {
+ let _ = Box::from_raw(ctx.private_data as *mut
OptimizerContextPrivateData);
+ }
+ ctx.private_data = std::ptr::null();
+ }
+}
+
+/// Reconstructed [`PhysicalOptimizerContext`] on the consumer side of FFI.
+///
+/// `StatisticsRegistry` is not plumbed because it contains trait object
vtables
+/// that are only valid within the originating library.
+struct ForeignOptimizerContext {
+ config: ConfigOptions,
+}
+
+impl PhysicalOptimizerContext for ForeignOptimizerContext {
+ fn config_options(&self) -> &ConfigOptions {
+ &self.config
+ }
+}
+
/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI
boundaries.
#[repr(C)]
#[derive(Debug)]
@@ -55,6 +133,12 @@ pub struct FFI_PhysicalOptimizerRule {
/// Return the major DataFusion version number of this rule.
pub version: unsafe extern "C" fn() -> u64,
+ pub optimize_with_context: unsafe extern "C" fn(
+ &Self,
+ plan: &FFI_ExecutionPlan,
+ context: &FFI_PhysicalOptimizerContext,
+ ) -> FFI_Result<FFI_ExecutionPlan>,
+
/// Internal data. This is only to be accessed by the provider of the rule.
/// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this
data.
pub private_data: *mut c_void,
@@ -98,6 +182,23 @@ unsafe extern "C" fn optimize_fn_wrapper(
FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
}
+unsafe extern "C" fn optimize_with_context_fn_wrapper(
+ rule: &FFI_PhysicalOptimizerRule,
+ plan: &FFI_ExecutionPlan,
+ context: &FFI_PhysicalOptimizerContext,
+) -> FFI_Result<FFI_ExecutionPlan> {
+ let runtime = rule.runtime();
+ let inner = rule.inner();
+ let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
+ let config = sresult_return!(ConfigOptions::try_from(unsafe {
+ (context.config_options)(context)
+ }));
+ let foreign_ctx = ForeignOptimizerContext { config };
+ let optimized_plan = sresult_return!(inner.optimize_with_context(plan,
&foreign_ctx));
+
+ FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
+}
+
unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) ->
SString {
let rule = rule.inner();
rule.name().into()
@@ -127,6 +228,7 @@ unsafe extern "C" fn clone_fn_wrapper(
FFI_PhysicalOptimizerRule {
optimize: optimize_fn_wrapper,
+ optimize_with_context: optimize_with_context_fn_wrapper,
name: name_fn_wrapper,
schema_check: schema_check_fn_wrapper,
clone: clone_fn_wrapper,
@@ -160,6 +262,7 @@ impl FFI_PhysicalOptimizerRule {
Self {
optimize: optimize_fn_wrapper,
+ optimize_with_context: optimize_with_context_fn_wrapper,
name: name_fn_wrapper,
schema_check: schema_check_fn_wrapper,
clone: clone_fn_wrapper,
@@ -220,6 +323,24 @@ impl PhysicalOptimizerRule for
ForeignPhysicalOptimizerRule {
(&optimized_plan).try_into()
}
+ fn optimize_with_context(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ context: &dyn PhysicalOptimizerContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let ffi_context = FFI_PhysicalOptimizerContext::new(context);
+ let plan = FFI_ExecutionPlan::new(plan, None);
+
+ let optimized_plan = unsafe {
+ df_result!((self.rule.optimize_with_context)(
+ &self.rule,
+ &plan,
+ &ffi_context
+ ))?
+ };
+ (&optimized_plan).try_into()
+ }
+
fn name(&self) -> &str {
&self.name
}
@@ -236,8 +357,11 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
- use datafusion_physical_optimizer::PhysicalOptimizerRule;
+ use datafusion_physical_optimizer::{
+ ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule,
+ };
use datafusion_physical_plan::ExecutionPlan;
+ use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
use super::*;
use crate::execution_plan::tests::EmptyExec;
@@ -265,6 +389,39 @@ mod tests {
}
}
+ /// A rule that returns an error from `optimize` but succeeds when
+ /// called via `optimize_with_context`, proving the context path is taken.
+ #[derive(Debug)]
+ struct ContextAwareRule;
+
+ impl PhysicalOptimizerRule for ContextAwareRule {
+ fn optimize(
+ &self,
+ _plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Err(datafusion_common::DataFusionError::Plan(
+ "optimize should not be called directly".to_string(),
+ ))
+ }
+
+ fn optimize_with_context(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _context: &dyn PhysicalOptimizerContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(plan)
+ }
+
+ fn name(&self) -> &str {
+ "context_aware_rule"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+ }
+
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
@@ -374,4 +531,70 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_optimize_with_context_round_trip() -> Result<()> {
+ let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ Arc::new(ContextAwareRule);
+
+ let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
+ ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
+
+ let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ (&ffi_rule).into();
+
+ let plan = create_test_plan();
+ let config = ConfigOptions::new();
+ let context = ConfigOnlyContext::new(&config);
+
+ let optimized = foreign_rule.optimize_with_context(plan, &context)?;
+ assert_eq!(optimized.name(), "empty-exec");
+
+ Ok(())
+ }
+
+ /// Tests that `optimize_with_context` works even when the caller supplies
a
+ /// statistics registry. The registry cannot survive the FFI round-trip (it
+ /// contains trait object vtables that are library-local), so the provider
+ /// side will always see `None`. This test verifies the context-aware path
+ /// still succeeds in that scenario.
+ #[test]
+ fn test_optimize_with_context_with_registry() -> Result<()> {
+ let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ Arc::new(ContextAwareRule);
+
+ let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
+ ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
+
+ let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ (&ffi_rule).into();
+
+ struct ContextWithRegistry {
+ config: ConfigOptions,
+ registry: StatisticsRegistry,
+ }
+
+ impl PhysicalOptimizerContext for ContextWithRegistry {
+ fn config_options(&self) -> &ConfigOptions {
+ &self.config
+ }
+
+ fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
+ Some(&self.registry)
+ }
+ }
+
+ let ctx = ContextWithRegistry {
+ config: ConfigOptions::new(),
+ registry: StatisticsRegistry::default_with_builtin_providers(),
+ };
+
+ let plan = create_test_plan();
+ // The optimize_with_context path works, but the registry is not
+ // available on the provider side (it will be None).
+ let optimized = foreign_rule.optimize_with_context(plan, &ctx)?;
+ assert_eq!(optimized.name(), "empty-exec");
+
+ Ok(())
+ }
}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 62e62d8235..03b3a7ab24 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -113,6 +113,8 @@ pub struct ForeignLibraryModule {
pub create_physical_optimizer_rule: extern "C" fn() ->
FFI_PhysicalOptimizerRule,
+ pub create_context_aware_optimizer_rule: extern "C" fn() ->
FFI_PhysicalOptimizerRule,
+
pub version: extern "C" fn() -> u64,
}
@@ -259,6 +261,8 @@ pub extern "C" fn datafusion_ffi_get_module() ->
ForeignLibraryModule {
create_table_with_statistics,
create_physical_optimizer_rule:
physical_optimizer::create_physical_optimizer_rule,
+ create_context_aware_optimizer_rule:
+ physical_optimizer::create_context_aware_optimizer_rule,
version: super::version,
}
}
diff --git a/datafusion/ffi/src/tests/physical_optimizer.rs
b/datafusion/ffi/src/tests/physical_optimizer.rs
index 2476526125..581f454e52 100644
--- a/datafusion/ffi/src/tests/physical_optimizer.rs
+++ b/datafusion/ffi/src/tests/physical_optimizer.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{PhysicalOptimizerContext,
PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::limit::GlobalLimitExec;
@@ -52,3 +52,45 @@ pub(crate) extern "C" fn create_physical_optimizer_rule() ->
FFI_PhysicalOptimiz
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(AddLimitRule);
FFI_PhysicalOptimizerRule::new(rule, None)
}
+
+/// A rule that returns an error from `optimize()` (proving the context path
must
+/// be taken) but succeeds in `optimize_with_context()` by wrapping the plan
in a
+/// `GlobalLimitExec`.
+#[derive(Debug)]
+struct ContextAwareAddLimitRule;
+
+impl PhysicalOptimizerRule for ContextAwareAddLimitRule {
+ fn optimize(
+ &self,
+ _plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Err(datafusion_common::DataFusionError::Plan(
+ "optimize should not be called directly; use optimize_with_context"
+ .to_string(),
+ ))
+ }
+
+ fn optimize_with_context(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _context: &dyn PhysicalOptimizerContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(10))))
+ }
+
+ fn name(&self) -> &str {
+ "context_aware_add_limit_rule"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+pub(crate) extern "C" fn create_context_aware_optimizer_rule() ->
FFI_PhysicalOptimizerRule
+{
+ let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ Arc::new(ContextAwareAddLimitRule);
+ FFI_PhysicalOptimizerRule::new(rule, None)
+}
diff --git a/datafusion/ffi/tests/ffi_physical_optimizer.rs
b/datafusion/ffi/tests/ffi_physical_optimizer.rs
index d860fda340..d8baf52288 100644
--- a/datafusion/ffi/tests/ffi_physical_optimizer.rs
+++ b/datafusion/ffi/tests/ffi_physical_optimizer.rs
@@ -25,7 +25,7 @@ mod tests {
use datafusion_ffi::execution_plan::tests::EmptyExec;
use datafusion_ffi::physical_optimizer::ForeignPhysicalOptimizerRule;
use datafusion_ffi::tests::utils::get_module;
- use datafusion_physical_optimizer::PhysicalOptimizerRule;
+ use datafusion_physical_optimizer::{ConfigOnlyContext,
PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
@@ -66,4 +66,30 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_ffi_physical_optimizer_rule_with_context() -> Result<(),
DataFusionError> {
+ let module = get_module()?;
+
+ let ffi_rule = (module.create_context_aware_optimizer_rule)();
+
+ let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
+ (&ffi_rule).into();
+
+ // Verify that plain optimize fails (proving we need context path)
+ let plan = create_test_plan();
+ let config = ConfigOptions::new();
+ assert!(foreign_rule.optimize(plan, &config).is_err());
+
+ // Verify context-aware path works
+ let plan = create_test_plan();
+ let context = ConfigOnlyContext::new(&config);
+ let optimized = foreign_rule.optimize_with_context(plan, &context)?;
+
+ assert_eq!(optimized.name(), "GlobalLimitExec");
+ assert_eq!(optimized.children().len(), 1);
+ assert_eq!(optimized.children()[0].name(), "empty-exec");
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]