This is an automated email from the ASF dual-hosted git repository.
gabotechs pushed a commit to branch branch-54
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-54 by this push:
new 50aa138564 [branch-54] Support transparent ExecutionPlan downcasts
(#22565)
50aa138564 is described below
commit 50aa138564580ff7abaaf3d689fab831d0dd2ce2
Author: Geoffrey Claude <[email protected]>
AuthorDate: Thu May 28 10:52:41 2026 +0200
[branch-54] Support transparent ExecutionPlan downcasts (#22565)
## Which issue does this PR close?
- Refs #22557.
- Companion PR to #22559, targeting `branch-54`.
## Rationale for this change
DataFusion 54 changed `ExecutionPlan` downcasting to use the `Any`
supertrait directly. That removes `ExecutionPlan::as_any`, which had
also served as a customization point for wrapper nodes: wrappers could
identify as themselves internally while exposing the wrapped plan type
to normal downcast-based inspection.
This PR adds an explicit `ExecutionPlan::downcast_delegate()` hook for
wrapper nodes that want their public `ExecutionPlan` downcast identity
to be delegated to another plan.
The proposed behavior intentionally preserves the old `as_any` override
semantics: when a node opts into downcast delegation, intermediate
delegating wrappers are invisible to `dyn ExecutionPlan::is::<T>()` and
`downcast_ref::<T>()`.
## What changes are included in this PR?
- Adds `ExecutionPlan::downcast_delegate()` with a default
implementation returning `None`.
- Updates `dyn ExecutionPlan::is::<T>()` and `downcast_ref::<T>()` to
delegate to `downcast_delegate()` when present, otherwise use the
current concrete plan type.
- Documents that `downcast_delegate()` is only for type introspection
and is independent from `children()` / plan traversal.
- Adds tests for direct and nested downcast-delegating wrappers,
including that intermediate delegating wrappers remain invisible to
normal downcast-based inspection.
## Are these changes tested?
Yes.
- `cargo test -p datafusion-physical-plan execution_plan_downcast`
- `cargo test -p datafusion-physical-plan --lib`
- `cargo fmt --all -- --check`
- `git diff --check`
## Are there any user-facing changes?
Yes. This adds a new public `ExecutionPlan` trait method with a default
implementation, and it changes `ExecutionPlan` downcast helpers to honor
wrappers that explicitly opt into delegating public downcast identity.
---
datafusion/physical-plan/src/execution_plan.rs | 110 ++++++++++++++++++++++++-
1 file changed, 108 insertions(+), 2 deletions(-)
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index b55d3c32cb..50eac566d9 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -115,6 +115,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send +
Sync {
}
}
+ /// Returns the plan that provides this plan's public
+ /// [`ExecutionPlan`] downcast identity.
+ ///
+ /// This hook is for wrapper nodes that delegate their public downcast
+ /// identity to another plan while adding cross-cutting behavior such as
+ /// instrumentation. The default implementation returns `None`, meaning
this
+ /// plan's concrete type is used for type introspection.
+ ///
+ /// Most `ExecutionPlan` implementations should use the default `None`;
+ /// override this only for wrapper plans that intentionally delegate their
+ /// public downcast identity to another plan.
+ ///
+ /// The `is` and `downcast_ref` helpers follow the returned delegate
instead
+ /// of checking the current concrete type, making intermediate delegating
+ /// wrappers invisible to normal downcast-based inspection.
+ ///
+ /// Implementations that opt in should return the delegate plan, not
`self`.
+ ///
+ /// This is independent from [`Self::children`] and should not be used for
+ /// plan traversal or optimizer rewrites.
+ fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
+ None
+ }
+
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
@@ -718,20 +742,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send +
Sync {
impl dyn ExecutionPlan {
/// Returns `true` if the plan is of type `T`.
///
+ /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
+ /// to it.
+ ///
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
/// called on `Arc<dyn ExecutionPlan>` via auto-deref.
pub fn is<T: ExecutionPlan>(&self) -> bool {
- (self as &dyn Any).is::<T>()
+ match self.downcast_delegate() {
+ Some(delegate) => delegate.is::<T>(),
+ None => (self as &dyn Any).is::<T>(),
+ }
}
/// Attempts to downcast this plan to a concrete type `T`, returning `None`
/// if the plan is not of that type.
///
+ /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
+ /// to it.
+ ///
/// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
/// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
/// downcast the `Arc` itself.
pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
- (self as &dyn Any).downcast_ref()
+ match self.downcast_delegate() {
+ Some(delegate) => delegate.downcast_ref::<T>(),
+ None => (self as &dyn Any).downcast_ref(),
+ }
}
}
@@ -1642,6 +1678,58 @@ mod tests {
}
}
+ #[derive(Debug)]
+ struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);
+
+ impl DisplayAs for DowncastDelegatingExec {
+ fn fmt_as(
+ &self,
+ _t: DisplayFormatType,
+ _f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ unimplemented!()
+ }
+ }
+
+ impl ExecutionPlan for DowncastDelegatingExec {
+ fn name(&self) -> &'static str {
+ Self::static_name()
+ }
+
+ fn properties(&self) -> &Arc<PlanProperties> {
+ unimplemented!()
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
+ Some(self.0.as_ref())
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ unimplemented!()
+ }
+
+ fn partition_statistics(
+ &self,
+ _partition: Option<usize>,
+ ) -> Result<Arc<Statistics>> {
+ unimplemented!()
+ }
+ }
#[test]
fn test_execution_plan_name() {
let schema1 = Arc::new(Schema::empty());
@@ -1654,6 +1742,24 @@ mod tests {
assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
}
+ #[test]
+ fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
+ let schema = Arc::new(Schema::empty());
+ let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
+ let wrapped: Arc<dyn ExecutionPlan> =
Arc::new(DowncastDelegatingExec(inner));
+ let nested: Arc<dyn ExecutionPlan> =
+ Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));
+
+ for plan in [wrapped.as_ref(), nested.as_ref()] {
+ assert!(!plan.is::<DowncastDelegatingExec>());
+ assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
+ assert!(plan.is::<EmptyExec>());
+ assert!(plan.downcast_ref::<EmptyExec>().is_some());
+ assert!(!plan.is::<RenamedEmptyExec>());
+ assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
+ }
+ }
+
/// A compilation test to ensure that the `ExecutionPlan::name()` method
can
/// be called from a trait object.
/// Related ticket: https://github.com/apache/datafusion/pull/11047
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]