This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22559-0add0469eb87dbb7fd9b6f78ded41ca7c8b46b6f in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 72e3de70f638189fc12254e3ecf17c23fc0da880 Author: Geoffrey Claude <[email protected]> AuthorDate: Wed May 27 21:40:52 2026 +0200 Support transparent ExecutionPlan downcasts (#22559) ## Which issue does this PR close? - Closes #22557. ## Rationale for this change DataFusion 54 changed `ExecutionPlan` downcasting to use the `Any` supertrait directly. That removes `ExecutionPlan::as_any`, which had also served as a customization point for wrapper nodes: wrappers could identify as themselves internally while exposing the wrapped plan type to normal downcast-based inspection. This draft PR proposes one possible fix: add an explicit `ExecutionPlan::downcast_delegate()` hook for wrapper nodes that want their public `ExecutionPlan` downcast identity to be delegated to another plan. The proposed behavior intentionally preserves the old `as_any` override semantics: when a node opts into downcast delegation, intermediate delegating wrappers are invisible to `dyn ExecutionPlan::is::<T>()` and `downcast_ref::<T>()`. A different "dual identity" model, where wrappers can downcast both as themselves and as their delegates, may also be useful, but that would be a new behavior rather than a compatibility fix. This is only a suggested implementation for the linked issue. Other solution proposals, including different API names or a different shape for the hook, are very welcome. ## What changes are included in this PR? - Adds `ExecutionPlan::downcast_delegate()` with a default implementation returning `None`. - Updates `dyn ExecutionPlan::is::<T>()` and `downcast_ref::<T>()` to delegate to `downcast_delegate()` when present, otherwise use the current concrete plan type. - Documents that `downcast_delegate()` is only for type introspection and is independent from `children()` / plan traversal. - Adds tests for direct and nested downcast-delegating wrappers, including that intermediate delegating wrappers remain invisible to normal downcast-based inspection. An alternative API shape would make every plan expose an explicit downcast target. A concrete spelling could make the self-target case explicit: ```rust enum DowncastTarget<'a> { SelfTarget, Plan(&'a dyn ExecutionPlan), } fn downcast_target(&self) -> DowncastTarget<'_> { DowncastTarget::SelfTarget } pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> { match self.downcast_target() { DowncastTarget::Plan(target) => target.downcast_ref::<T>(), DowncastTarget::SelfTarget => (self as &dyn Any).downcast_ref(), } } ``` That frames every plan as having a public downcast target, which is close to the old `as_any` mental model. A simpler conceptual version would be `fn downcast_target(&self) -> &dyn ExecutionPlan` with the default target being `self`, but the real helper still needs an explicit self-target base case to avoid recursing forever. This PR keeps the primary proposal as an explicit `Option`-based opt-in. ## Are these changes tested? Yes. - `cargo test -p datafusion-physical-plan execution_plan_downcast` - `cargo test -p datafusion-physical-plan --lib` - `cargo fmt --all -- --check` - `git diff --check` ## Are there any user-facing changes? Yes. This adds a new public `ExecutionPlan` trait method with a default implementation, and it changes `ExecutionPlan` downcast helpers to honor wrappers that explicitly opt into delegating public downcast identity. --- datafusion/physical-plan/src/execution_plan.rs | 110 ++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b55d3c32cb..50eac566d9 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -115,6 +115,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { } } + /// Returns the plan that provides this plan's public + /// [`ExecutionPlan`] downcast identity. + /// + /// This hook is for wrapper nodes that delegate their public downcast + /// identity to another plan while adding cross-cutting behavior such as + /// instrumentation. The default implementation returns `None`, meaning this + /// plan's concrete type is used for type introspection. + /// + /// Most `ExecutionPlan` implementations should use the default `None`; + /// override this only for wrapper plans that intentionally delegate their + /// public downcast identity to another plan. + /// + /// The `is` and `downcast_ref` helpers follow the returned delegate instead + /// of checking the current concrete type, making intermediate delegating + /// wrappers invisible to normal downcast-based inspection. + /// + /// Implementations that opt in should return the delegate plan, not `self`. + /// + /// This is independent from [`Self::children`] and should not be used for + /// plan traversal or optimizer rewrites. + fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> { + None + } + /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { Arc::clone(self.properties().schema()) @@ -718,20 +742,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { impl dyn ExecutionPlan { /// Returns `true` if the plan is of type `T`. /// + /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates + /// to it. + /// /// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when /// called on `Arc<dyn ExecutionPlan>` via auto-deref. pub fn is<T: ExecutionPlan>(&self) -> bool { - (self as &dyn Any).is::<T>() + match self.downcast_delegate() { + Some(delegate) => delegate.is::<T>(), + None => (self as &dyn Any).is::<T>(), + } } /// Attempts to downcast this plan to a concrete type `T`, returning `None` /// if the plan is not of that type. /// + /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates + /// to it. + /// /// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref, /// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to /// downcast the `Arc` itself. pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> { - (self as &dyn Any).downcast_ref() + match self.downcast_delegate() { + Some(delegate) => delegate.downcast_ref::<T>(), + None => (self as &dyn Any).downcast_ref(), + } } } @@ -1642,6 +1678,58 @@ mod tests { } } + #[derive(Debug)] + struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>); + + impl DisplayAs for DowncastDelegatingExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + unimplemented!() + } + } + + impl ExecutionPlan for DowncastDelegatingExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn properties(&self) -> &Arc<PlanProperties> { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + unimplemented!() + } + + fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> { + Some(self.0.as_ref()) + } + + fn execute( + &self, + _partition: usize, + _context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + unimplemented!() + } + + fn partition_statistics( + &self, + _partition: Option<usize>, + ) -> Result<Arc<Statistics>> { + unimplemented!() + } + } #[test] fn test_execution_plan_name() { let schema1 = Arc::new(Schema::empty()); @@ -1654,6 +1742,24 @@ mod tests { assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec"); } + #[test] + fn test_execution_plan_downcast_delegates_to_downcast_delegate() { + let schema = Arc::new(Schema::empty()); + let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema)); + let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner)); + let nested: Arc<dyn ExecutionPlan> = + Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped))); + + for plan in [wrapped.as_ref(), nested.as_ref()] { + assert!(!plan.is::<DowncastDelegatingExec>()); + assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none()); + assert!(plan.is::<EmptyExec>()); + assert!(plan.downcast_ref::<EmptyExec>().is_some()); + assert!(!plan.is::<RenamedEmptyExec>()); + assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none()); + } + } + /// A compilation test to ensure that the `ExecutionPlan::name()` method can /// be called from a trait object. /// Related ticket: https://github.com/apache/datafusion/pull/11047 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
