This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 897b5c1150 feat: support repartitioning of FFI execution plans (#20449)
897b5c1150 is described below
commit 897b5c1150a89f5d64cbe84e70859101641ca9c8
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Mar 19 11:16:21 2026 -0400
feat: support repartitioning of FFI execution plans (#20449)
## Which issue does this PR close?
This is a blocker for https://github.com/apache/datafusion/issues/20450
## Rationale for this change
This PR introduces an important concept in the FFI work to avoids
creating wrappers upon wrappers of plans. It was discovered as part of
the work to create FFI physical optimizer rules. Suppose we have a
foreign plan. Then we attempt to turn this into an FFI plan. What we
will end up with currently is a FFI plan where the underlying private
data is a foreign plan that additionally contains a FFI plan. Instead
*any* time we are creating an FFI object we should check to see if it is
locally downcastable to a Foreign plan and if so to just access the
already existing FFI object.
This pattern is adapted across all FFI objects in this PR.
With this work in place we can also properly support repartioning via
FFI as well as `new_with_children` via FFI.
## What changes are included in this PR?
- Adds access pattern for creating new FFI objects. When they are
already a locally downcastable to a Foreign wrapper then we simply get
the underlying existing FFI object instead of creating a wrapper around
a wrapper.
- Implement repartitioning and new_with_children via FFI on execution
plans.
## Are these changes tested?
Integration tests are added.
## Are there any user-facing changes?
The one use facing change is that for some of the aggregates and
accumulators that take in closures we require these closures to be
static so that we can downcast the boxed traits.
---------
Co-authored-by: Copilot <[email protected]>
---
datafusion/catalog/src/table.rs | 2 +-
datafusion/expr-common/src/accumulator.rs | 2 +-
datafusion/expr-common/src/groups_accumulator.rs | 2 +-
datafusion/expr/src/partition_evaluator.rs | 2 +-
datafusion/ffi/src/catalog_provider.rs | 5 +
datafusion/ffi/src/catalog_provider_list.rs | 7 +
datafusion/ffi/src/execution_plan.rs | 248 ++++++++++++++++-----
datafusion/ffi/src/physical_expr/mod.rs | 4 +
.../ffi/src/proto/logical_extension_codec.rs | 7 +
.../ffi/src/proto/physical_extension_codec.rs | 37 +--
datafusion/ffi/src/schema_provider.rs | 5 +
datafusion/ffi/src/session/mod.rs | 4 +
datafusion/ffi/src/table_provider.rs | 3 +
datafusion/ffi/src/tests/mod.rs | 12 +
datafusion/ffi/src/udaf/accumulator.rs | 8 +
datafusion/ffi/src/udaf/groups_accumulator.rs | 8 +
datafusion/ffi/src/udaf/mod.rs | 4 +
datafusion/ffi/src/udf/mod.rs | 4 +
datafusion/ffi/src/udtf.rs | 7 +
datafusion/ffi/src/udwf/mod.rs | 4 +
datafusion/ffi/src/udwf/partition_evaluator.rs | 8 +
datafusion/ffi/tests/ffi_execution_plan.rs | 108 +++++++++
datafusion/ffi/tests/ffi_integration.rs | 6 +-
.../src/aggregate/groups_accumulator/bool_op.rs | 6 +-
.../src/aggregate/groups_accumulator/prim_op.rs | 6 +-
datafusion/functions-aggregate/src/average.rs | 6 +-
datafusion/proto/src/logical_plan/mod.rs | 2 +-
datafusion/proto/src/physical_plan/mod.rs | 3 +-
datafusion/spark/src/function/aggregate/avg.rs | 6 +-
29 files changed, 429 insertions(+), 97 deletions(-)
diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index f31d4d52ce..c9b4e974c8 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -486,7 +486,7 @@ pub trait TableProviderFactory: Debug + Sync + Send {
}
/// A trait for table function implementations
-pub trait TableFunctionImpl: Debug + Sync + Send {
+pub trait TableFunctionImpl: Debug + Sync + Send + Any {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
}
diff --git a/datafusion/expr-common/src/accumulator.rs
b/datafusion/expr-common/src/accumulator.rs
index 3acf110a0b..59fb6a5952 100644
--- a/datafusion/expr-common/src/accumulator.rs
+++ b/datafusion/expr-common/src/accumulator.rs
@@ -48,7 +48,7 @@ use std::fmt::Debug;
/// [`evaluate`]: Self::evaluate
/// [`merge_batch`]: Self::merge_batch
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
-pub trait Accumulator: Send + Sync + Debug {
+pub trait Accumulator: Send + Sync + Debug + std::any::Any {
/// Updates the accumulator's state from its input.
///
/// `values` contains the arguments to this aggregate function.
diff --git a/datafusion/expr-common/src/groups_accumulator.rs
b/datafusion/expr-common/src/groups_accumulator.rs
index 08c9f01f13..9053f7a8ea 100644
--- a/datafusion/expr-common/src/groups_accumulator.rs
+++ b/datafusion/expr-common/src/groups_accumulator.rs
@@ -108,7 +108,7 @@ impl EmitTo {
///
/// [`Accumulator`]: crate::accumulator::Accumulator
/// [Aggregating Millions of Groups Fast blog]:
https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
-pub trait GroupsAccumulator: Send {
+pub trait GroupsAccumulator: Send + std::any::Any {
/// Updates the accumulator's state from its arguments, encoded as
/// a vector of [`ArrayRef`]s.
///
diff --git a/datafusion/expr/src/partition_evaluator.rs
b/datafusion/expr/src/partition_evaluator.rs
index 0671f31f6d..5a4e20e5ac 100644
--- a/datafusion/expr/src/partition_evaluator.rs
+++ b/datafusion/expr/src/partition_evaluator.rs
@@ -90,7 +90,7 @@ use crate::window_state::WindowAggState;
/// For more background, please also see the [User defined Window Functions in
DataFusion blog]
///
/// [User defined Window Functions in DataFusion blog]:
https://datafusion.apache.org/blog/2025/04/19/user-defined-window-functions
-pub trait PartitionEvaluator: Debug + Send {
+pub trait PartitionEvaluator: Debug + Send + std::any::Any {
/// When the window frame has a fixed beginning (e.g UNBOUNDED
/// PRECEDING), some functions such as FIRST_VALUE, LAST_VALUE and
/// NTH_VALUE do not need the (unbounded) input once they have
diff --git a/datafusion/ffi/src/catalog_provider.rs
b/datafusion/ffi/src/catalog_provider.rs
index 61e26f1663..ff588a89a7 100644
--- a/datafusion/ffi/src/catalog_provider.rs
+++ b/datafusion/ffi/src/catalog_provider.rs
@@ -250,6 +250,11 @@ impl FFI_CatalogProvider {
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
+ if let Some(provider) =
provider.as_any().downcast_ref::<ForeignCatalogProvider>()
+ {
+ return provider.0.clone();
+ }
+
let private_data = Box::new(ProviderPrivateData { provider, runtime });
Self {
diff --git a/datafusion/ffi/src/catalog_provider_list.rs
b/datafusion/ffi/src/catalog_provider_list.rs
index 40f8be3871..65574a7ac3 100644
--- a/datafusion/ffi/src/catalog_provider_list.rs
+++ b/datafusion/ffi/src/catalog_provider_list.rs
@@ -212,6 +212,13 @@ impl FFI_CatalogProviderList {
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
+ if let Some(provider) = provider
+ .as_any()
+ .downcast_ref::<ForeignCatalogProviderList>()
+ {
+ return provider.0.clone();
+ }
+
let private_data = Box::new(ProviderPrivateData { provider, runtime });
Self {
diff --git a/datafusion/ffi/src/execution_plan.rs
b/datafusion/ffi/src/execution_plan.rs
index 064e4a8953..eba16d9390 100644
--- a/datafusion/ffi/src/execution_plan.rs
+++ b/datafusion/ffi/src/execution_plan.rs
@@ -20,7 +20,8 @@ use std::pin::Pin;
use std::sync::Arc;
use abi_stable::StableAbi;
-use abi_stable::std_types::{RString, RVec};
+use abi_stable::std_types::{ROption, RResult, RString, RVec};
+use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -29,11 +30,12 @@ use datafusion_physical_plan::{
};
use tokio::runtime::Handle;
+use crate::config::FFI_ConfigOptions;
use crate::execution::FFI_TaskContext;
use crate::plan_properties::FFI_PlanProperties;
use crate::record_batch_stream::FFI_RecordBatchStream;
use crate::util::FFIResult;
-use crate::{df_result, rresult};
+use crate::{df_result, rresult, rresult_return};
/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
#[repr(C)]
@@ -45,6 +47,9 @@ pub struct FFI_ExecutionPlan {
/// Return a vector of children plans
pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
+ pub with_new_children:
+ unsafe extern "C" fn(plan: &Self, children: RVec<Self>) ->
FFIResult<Self>,
+
/// Return the plan name.
pub name: unsafe extern "C" fn(plan: &Self) -> RString,
@@ -56,6 +61,12 @@ pub struct FFI_ExecutionPlan {
context: FFI_TaskContext,
) -> FFIResult<FFI_RecordBatchStream>,
+ pub repartitioned: unsafe extern "C" fn(
+ plan: &Self,
+ target_partitions: usize,
+ config: FFI_ConfigOptions,
+ ) -> FFIResult<ROption<FFI_ExecutionPlan>>,
+
/// Used to create a clone on the provider of the execution plan. This
should
/// only need to be called by the receiver of the plan.
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -86,6 +97,11 @@ impl FFI_ExecutionPlan {
let private_data = self.private_data as *const
ExecutionPlanPrivateData;
unsafe { &(*private_data).plan }
}
+
+ fn runtime(&self) -> Option<Handle> {
+ let private_data = self.private_data as *const
ExecutionPlanPrivateData;
+ unsafe { (*private_data).runtime.clone() }
+ }
}
unsafe extern "C" fn properties_fn_wrapper(
@@ -97,19 +113,34 @@ unsafe extern "C" fn properties_fn_wrapper(
unsafe extern "C" fn children_fn_wrapper(
plan: &FFI_ExecutionPlan,
) -> RVec<FFI_ExecutionPlan> {
- unsafe {
- let private_data = plan.private_data as *const
ExecutionPlanPrivateData;
- let plan = &(*private_data).plan;
- let runtime = &(*private_data).runtime;
+ let runtime = plan.runtime();
+ let plan = plan.inner();
- let children: Vec<_> = plan
- .children()
- .into_iter()
- .map(|child| FFI_ExecutionPlan::new(Arc::clone(child),
runtime.clone()))
- .collect();
+ let children: Vec<_> = plan
+ .children()
+ .into_iter()
+ .map(|child| FFI_ExecutionPlan::new(Arc::clone(child),
runtime.clone()))
+ .collect();
- children.into()
- }
+ children.into()
+}
+
+unsafe extern "C" fn with_new_children_fn_wrapper(
+ plan: &FFI_ExecutionPlan,
+ children: RVec<FFI_ExecutionPlan>,
+) -> FFIResult<FFI_ExecutionPlan> {
+ let runtime = plan.runtime();
+ let plan = Arc::clone(plan.inner());
+ let children = rresult_return!(
+ children
+ .iter()
+ .map(<Arc<dyn ExecutionPlan>>::try_from)
+ .collect::<Result<Vec<_>>>()
+ );
+
+ let new_plan = rresult_return!(plan.with_new_children(children));
+
+ RResult::ROk(FFI_ExecutionPlan::new(new_plan, runtime))
}
unsafe extern "C" fn execute_fn_wrapper(
@@ -117,17 +148,34 @@ unsafe extern "C" fn execute_fn_wrapper(
partition: usize,
context: FFI_TaskContext,
) -> FFIResult<FFI_RecordBatchStream> {
- unsafe {
- let ctx = context.into();
- let private_data = plan.private_data as *const
ExecutionPlanPrivateData;
- let plan = &(*private_data).plan;
- let runtime = (*private_data).runtime.clone();
-
- rresult!(
- plan.execute(partition, ctx)
- .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
- )
- }
+ let ctx = context.into();
+ let runtime = plan.runtime();
+ let plan = plan.inner();
+
+ let _runtime_guard = runtime.as_ref().map(|rt| rt.enter());
+
+ rresult!(
+ plan.execute(partition, ctx)
+ .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
+ )
+}
+
+unsafe extern "C" fn repartitioned_fn_wrapper(
+ plan: &FFI_ExecutionPlan,
+ target_partitions: usize,
+ config: FFI_ConfigOptions,
+) -> FFIResult<ROption<FFI_ExecutionPlan>> {
+ let maybe_config: Result<ConfigOptions, DataFusionError> =
config.try_into();
+ let config = rresult_return!(maybe_config);
+ let runtime = plan.runtime();
+ let plan = plan.inner();
+
+ rresult!(
+ plan.repartitioned(target_partitions, &config)
+ .map(|maybe_plan| maybe_plan
+ .map(|plan| FFI_ExecutionPlan::new(plan, runtime))
+ .into())
+ )
}
unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
@@ -145,12 +193,10 @@ unsafe extern "C" fn release_fn_wrapper(plan: &mut
FFI_ExecutionPlan) {
}
unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) ->
FFI_ExecutionPlan {
- unsafe {
- let private_data = plan.private_data as *const
ExecutionPlanPrivateData;
- let plan_data = &(*private_data);
+ let runtime = plan.runtime();
+ let plan = plan.inner();
- FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan),
plan_data.runtime.clone())
- }
+ FFI_ExecutionPlan::new(Arc::clone(plan), runtime)
}
impl Clone for FFI_ExecutionPlan {
@@ -159,16 +205,74 @@ impl Clone for FFI_ExecutionPlan {
}
}
+/// Helper function to recursively identify any children that do not
+/// have a runtime set but should because they are local to this same
+/// library. This does imply a restriction that all execution plans
+/// in this chain that are within the same library use the same runtime.
+fn pass_runtime_to_children(
+ plan: &Arc<dyn ExecutionPlan>,
+ runtime: &Handle,
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let mut updated_children = false;
+ let plan_is_foreign = plan.as_any().is::<ForeignExecutionPlan>();
+
+ let children = plan
+ .children()
+ .into_iter()
+ .map(|child| {
+ let child = match pass_runtime_to_children(child, runtime)? {
+ Some(child) => {
+ updated_children = true;
+ child
+ }
+ None => Arc::clone(child),
+ };
+
+ // If the parent is foreign and the child is local to this
library, then when
+ // we called `children()` above we will get something other than a
+ // `ForeignExecutionPlan`. In this case wrap the plan in a
`ForeignExecutionPlan`
+ // because when we call `with_new_children` below it will extract
the
+ // FFI plan that does contain the runtime.
+ if plan_is_foreign && !child.as_any().is::<ForeignExecutionPlan>()
{
+ updated_children = true;
+ let ffi_child = FFI_ExecutionPlan::new(child,
Some(runtime.clone()));
+ let foreign_child = ForeignExecutionPlan::try_from(ffi_child);
+ foreign_child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
+ } else {
+ Ok(child)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+ if updated_children {
+ Arc::clone(plan).with_new_children(children).map(Some)
+ } else {
+ Ok(None)
+ }
+}
+
impl FFI_ExecutionPlan {
/// This function is called on the provider's side.
- pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
- let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime
});
+ pub fn new(mut plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) ->
Self {
+ // Note to developers: `pass_runtime_to_children` relies on the logic
here to
+ // get the underlying FFI plan during calls to `new_with_children`.
+ if let Some(plan) =
plan.as_any().downcast_ref::<ForeignExecutionPlan>() {
+ return plan.plan.clone();
+ }
+
+ if let Some(rt) = &runtime
+ && let Ok(Some(p)) = pass_runtime_to_children(&plan, rt)
+ {
+ plan = p;
+ }
+ let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime
});
Self {
properties: properties_fn_wrapper,
children: children_fn_wrapper,
+ with_new_children: with_new_children_fn_wrapper,
name: name_fn_wrapper,
execute: execute_fn_wrapper,
+ repartitioned: repartitioned_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -228,28 +332,34 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn
ExecutionPlan> {
fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
if (plan.library_marker_id)() == crate::get_library_marker_id() {
- return Ok(Arc::clone(plan.inner()));
+ Ok(Arc::clone(plan.inner()))
+ } else {
+ let plan = ForeignExecutionPlan::try_from(plan.clone())?;
+ Ok(Arc::new(plan))
}
+ }
+}
+impl TryFrom<FFI_ExecutionPlan> for ForeignExecutionPlan {
+ type Error = DataFusionError;
+ fn try_from(plan: FFI_ExecutionPlan) -> Result<Self, Self::Error> {
unsafe {
- let name = (plan.name)(plan).into();
+ let name = (plan.name)(&plan).into();
- let properties: PlanProperties =
(plan.properties)(plan).try_into()?;
+ let properties: PlanProperties =
(plan.properties)(&plan).try_into()?;
- let children_rvec = (plan.children)(plan);
+ let children_rvec = (plan.children)(&plan);
let children = children_rvec
.iter()
.map(<Arc<dyn ExecutionPlan>>::try_from)
.collect::<Result<Vec<_>>>()?;
- let plan = ForeignExecutionPlan {
+ Ok(ForeignExecutionPlan {
name,
- plan: plan.clone(),
+ plan,
properties: Arc::new(properties),
children,
- };
-
- Ok(Arc::new(plan))
+ })
}
}
}
@@ -275,12 +385,14 @@ impl ExecutionPlan for ForeignExecutionPlan {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(ForeignExecutionPlan {
- plan: self.plan.clone(),
- name: self.name.clone(),
- children,
- properties: Arc::clone(&self.properties),
- }))
+ let children = children
+ .into_iter()
+ .map(|child| FFI_ExecutionPlan::new(child, None))
+ .collect::<RVec<_>>();
+ let new_plan =
+ unsafe { df_result!((self.plan.with_new_children)(&self.plan,
children))? };
+
+ (&new_plan).try_into()
}
fn execute(
@@ -310,13 +422,28 @@ impl ExecutionPlan for ForeignExecutionPlan {
}
Ok(tnr)
}
+
+ fn repartitioned(
+ &self,
+ target_partitions: usize,
+ config: &ConfigOptions,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let config = config.into();
+ let maybe_plan: Option<FFI_ExecutionPlan> = df_result!(unsafe {
+ (self.plan.repartitioned)(&self.plan, target_partitions, config)
+ })?
+ .into();
+
+ maybe_plan
+ .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
+ .transpose()
+ }
}
-#[cfg(test)]
-pub(crate) mod tests {
- use arrow::datatypes::{DataType, Field, Schema};
- use datafusion::physical_plan::Partitioning;
- use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+#[cfg(any(test, feature = "integration-tests"))]
+pub mod tests {
+ use datafusion_physical_plan::Partitioning;
+ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use super::*;
@@ -330,7 +457,7 @@ pub(crate) mod tests {
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
Self {
props: Arc::new(PlanProperties::new(
-
datafusion::physical_expr::EquivalenceProperties::new(schema),
+
datafusion_physical_expr::EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
EmissionType::Incremental,
Boundedness::Bounded,
@@ -404,8 +531,9 @@ pub(crate) mod tests {
#[test]
fn test_round_trip_ffi_execution_plan() -> Result<()> {
- let schema =
- Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
+ let schema = Arc::new(arrow::datatypes::Schema::new(vec![
+ arrow::datatypes::Field::new("a",
arrow::datatypes::DataType::Float32, false),
+ ]));
let original_plan = Arc::new(EmptyExec::new(schema));
let original_name = original_plan.name().to_string();
@@ -417,7 +545,7 @@ pub(crate) mod tests {
assert_eq!(original_name, foreign_plan.name());
- let display =
datafusion::physical_plan::display::DisplayableExecutionPlan::new(
+ let display =
datafusion_physical_plan::display::DisplayableExecutionPlan::new(
foreign_plan.as_ref(),
);
@@ -432,8 +560,9 @@ pub(crate) mod tests {
#[test]
fn test_ffi_execution_plan_children() -> Result<()> {
- let schema =
- Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
+ let schema = Arc::new(arrow::datatypes::Schema::new(vec![
+ arrow::datatypes::Field::new("a",
arrow::datatypes::DataType::Float32, false),
+ ]));
// Version 1: Adding child to the foreign plan
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
@@ -471,8 +600,9 @@ pub(crate) mod tests {
#[test]
fn test_ffi_execution_plan_local_bypass() {
- let schema =
- Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
+ let schema = Arc::new(arrow::datatypes::Schema::new(vec![
+ arrow::datatypes::Field::new("a",
arrow::datatypes::DataType::Float32, false),
+ ]));
let plan = Arc::new(EmptyExec::new(schema));
diff --git a/datafusion/ffi/src/physical_expr/mod.rs
b/datafusion/ffi/src/physical_expr/mod.rs
index d268dd613f..189a1e4782 100644
--- a/datafusion/ffi/src/physical_expr/mod.rs
+++ b/datafusion/ffi/src/physical_expr/mod.rs
@@ -448,6 +448,10 @@ impl Drop for FFI_PhysicalExpr {
impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
/// Creates a new [`FFI_PhysicalExpr`].
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
+ if let Some(expr) =
expr.as_any().downcast_ref::<ForeignPhysicalExpr>() {
+ return expr.expr.clone();
+ }
+
let private_data = Box::new(PhysicalExprPrivateData { expr });
Self {
diff --git a/datafusion/ffi/src/proto/logical_extension_codec.rs
b/datafusion/ffi/src/proto/logical_extension_codec.rs
index 3781a40539..2beeead703 100644
--- a/datafusion/ffi/src/proto/logical_extension_codec.rs
+++ b/datafusion/ffi/src/proto/logical_extension_codec.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::ffi::c_void;
use std::sync::Arc;
@@ -296,6 +297,12 @@ impl FFI_LogicalExtensionCodec {
runtime: Option<Handle>,
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
) -> Self {
+ if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
+ .downcast_ref::<ForeignLogicalExtensionCodec>()
+ {
+ return codec.0.clone();
+ }
+
let task_ctx_provider = task_ctx_provider.into();
let private_data = Box::new(LogicalExtensionCodecPrivateData { codec,
runtime });
diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs
b/datafusion/ffi/src/proto/physical_extension_codec.rs
index 0577e72366..9d5c2e8af3 100644
--- a/datafusion/ffi/src/proto/physical_extension_codec.rs
+++ b/datafusion/ffi/src/proto/physical_extension_codec.rs
@@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::ffi::c_void;
-use std::sync::Arc;
-
use abi_stable::StableAbi;
use abi_stable::std_types::{RResult, RSlice, RStr, RVec};
use datafusion_common::error::Result;
@@ -27,6 +24,7 @@ use datafusion_expr::{
};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
+use std::{any::Any, ffi::c_void, sync::Arc};
use tokio::runtime::Handle;
use crate::execution::FFI_TaskContextProvider;
@@ -111,14 +109,14 @@ unsafe impl Send for FFI_PhysicalExtensionCodec {}
unsafe impl Sync for FFI_PhysicalExtensionCodec {}
struct PhysicalExtensionCodecPrivateData {
- provider: Arc<dyn PhysicalExtensionCodec>,
+ codec: Arc<dyn PhysicalExtensionCodec>,
runtime: Option<Handle>,
}
impl FFI_PhysicalExtensionCodec {
fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
let private_data = self.private_data as *const
PhysicalExtensionCodecPrivateData;
- unsafe { &(*private_data).provider }
+ unsafe { &(*private_data).codec }
}
fn runtime(&self) -> &Option<Handle> {
@@ -132,6 +130,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
buf: RSlice<u8>,
inputs: RVec<FFI_ExecutionPlan>,
) -> FFIResult<FFI_ExecutionPlan> {
+ let runtime = codec.runtime().clone();
let task_ctx: Arc<TaskContext> =
rresult_return!((&codec.task_ctx_provider).try_into());
let codec = codec.inner();
@@ -144,7 +143,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
let plan =
rresult_return!(codec.try_decode(buf.as_ref(), &inputs,
task_ctx.as_ref()));
- RResult::ROk(FFI_ExecutionPlan::new(plan, None))
+ RResult::ROk(FFI_ExecutionPlan::new(plan, runtime))
}
unsafe extern "C" fn try_encode_fn_wrapper(
@@ -240,11 +239,10 @@ unsafe extern "C" fn try_encode_udwf_fn_wrapper(
RResult::ROk(bytes.into())
}
-unsafe extern "C" fn release_fn_wrapper(provider: &mut
FFI_PhysicalExtensionCodec) {
+unsafe extern "C" fn release_fn_wrapper(codec: &mut
FFI_PhysicalExtensionCodec) {
unsafe {
- let private_data = Box::from_raw(
- provider.private_data as *mut PhysicalExtensionCodecPrivateData,
- );
+ let private_data =
+ Box::from_raw(codec.private_data as *mut
PhysicalExtensionCodecPrivateData);
drop(private_data);
}
}
@@ -267,13 +265,18 @@ impl Drop for FFI_PhysicalExtensionCodec {
impl FFI_PhysicalExtensionCodec {
/// Creates a new [`FFI_PhysicalExtensionCodec`].
pub fn new(
- provider: Arc<dyn PhysicalExtensionCodec + Send>,
+ codec: Arc<dyn PhysicalExtensionCodec + Send>,
runtime: Option<Handle>,
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
) -> Self {
+ if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
+ .downcast_ref::<ForeignPhysicalExtensionCodec>()
+ {
+ return codec.0.clone();
+ }
+
let task_ctx_provider = task_ctx_provider.into();
- let private_data =
- Box::new(PhysicalExtensionCodecPrivateData { provider, runtime });
+ let private_data = Box::new(PhysicalExtensionCodecPrivateData { codec,
runtime });
Self {
try_decode: try_decode_fn_wrapper,
@@ -306,11 +309,11 @@ unsafe impl Send for ForeignPhysicalExtensionCodec {}
unsafe impl Sync for ForeignPhysicalExtensionCodec {}
impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
- fn from(provider: &FFI_PhysicalExtensionCodec) -> Self {
- if (provider.library_marker_id)() == crate::get_library_marker_id() {
- Arc::clone(provider.inner())
+ fn from(codec: &FFI_PhysicalExtensionCodec) -> Self {
+ if (codec.library_marker_id)() == crate::get_library_marker_id() {
+ Arc::clone(codec.inner())
} else {
- Arc::new(ForeignPhysicalExtensionCodec(provider.clone()))
+ Arc::new(ForeignPhysicalExtensionCodec(codec.clone()))
}
}
}
diff --git a/datafusion/ffi/src/schema_provider.rs
b/datafusion/ffi/src/schema_provider.rs
index b8e44b134f..5d1348e232 100644
--- a/datafusion/ffi/src/schema_provider.rs
+++ b/datafusion/ffi/src/schema_provider.rs
@@ -259,6 +259,11 @@ impl FFI_SchemaProvider {
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
+ if let Some(provider) =
provider.as_any().downcast_ref::<ForeignSchemaProvider>()
+ {
+ return provider.0.clone();
+ }
+
let owner_name = provider.owner_name().map(|s| s.into()).into();
let private_data = Box::new(ProviderPrivateData { provider, runtime });
diff --git a/datafusion/ffi/src/session/mod.rs
b/datafusion/ffi/src/session/mod.rs
index 6b8664a437..007181356e 100644
--- a/datafusion/ffi/src/session/mod.rs
+++ b/datafusion/ffi/src/session/mod.rs
@@ -339,6 +339,10 @@ impl FFI_SessionRef {
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
+ if let Some(session) =
session.as_any().downcast_ref::<ForeignSession>() {
+ return session.session.clone();
+ }
+
let private_data = Box::new(SessionPrivateData { session, runtime });
Self {
diff --git a/datafusion/ffi/src/table_provider.rs
b/datafusion/ffi/src/table_provider.rs
index 1559549e63..4a89bb025a 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -391,6 +391,9 @@ impl FFI_TableProvider {
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
+ if let Some(provider) =
provider.as_any().downcast_ref::<ForeignTableProvider>() {
+ return provider.0.clone();
+ }
let private_data = Box::new(ProviderPrivateData { provider, runtime });
Self {
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index f594993e35..378d75b956 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -37,6 +37,8 @@ use udf_udaf_udwf::{
use crate::catalog_provider::FFI_CatalogProvider;
use crate::catalog_provider_list::FFI_CatalogProviderList;
use crate::config::extension_options::FFI_ExtensionOptions;
+use crate::execution_plan::FFI_ExecutionPlan;
+use crate::execution_plan::tests::EmptyExec;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use crate::table_provider::FFI_TableProvider;
use crate::table_provider_factory::FFI_TableProviderFactory;
@@ -100,6 +102,8 @@ pub struct ForeignLibraryModule {
/// Create extension options, for either ConfigOptions or TableOptions
pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions,
+ pub create_empty_exec: extern "C" fn() -> FFI_ExecutionPlan,
+
pub version: extern "C" fn() -> u64,
}
@@ -149,6 +153,13 @@ extern "C" fn construct_table_provider_factory(
table_provider_factory::create(codec)
}
+pub(crate) extern "C" fn create_empty_exec() -> FFI_ExecutionPlan {
+ let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
+
+ let plan = Arc::new(EmptyExec::new(schema));
+ FFI_ExecutionPlan::new(plan, None)
+}
+
#[export_root_module]
/// This defines the entry point for using the module.
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
@@ -165,6 +176,7 @@ pub fn get_foreign_library_module() ->
ForeignLibraryModuleRef {
create_stddev_udaf: create_ffi_stddev_func,
create_rank_udwf: create_ffi_rank_func,
create_extension_options: config::create_extension_options,
+ create_empty_exec,
version: super::version,
}
.leak_into_prefix()
diff --git a/datafusion/ffi/src/udaf/accumulator.rs
b/datafusion/ffi/src/udaf/accumulator.rs
index 6d2b86a3f2..125b28598b 100644
--- a/datafusion/ffi/src/udaf/accumulator.rs
+++ b/datafusion/ffi/src/udaf/accumulator.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::ffi::c_void;
use std::ops::Deref;
use std::ptr::null_mut;
@@ -204,6 +205,13 @@ unsafe extern "C" fn release_fn_wrapper(accumulator: &mut
FFI_Accumulator) {
impl From<Box<dyn Accumulator>> for FFI_Accumulator {
fn from(accumulator: Box<dyn Accumulator>) -> Self {
+ if (accumulator.as_ref() as &dyn Any).is::<ForeignAccumulator>() {
+ let accumulator = (accumulator as Box<dyn Any>)
+ .downcast::<ForeignAccumulator>()
+ .expect("already checked type");
+ return accumulator.accumulator;
+ }
+
let supports_retract_batch = accumulator.supports_retract_batch();
let private_data = AccumulatorPrivateData { accumulator };
diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs
b/datafusion/ffi/src/udaf/groups_accumulator.rs
index fc4ce4b8ba..0dc8edbfe5 100644
--- a/datafusion/ffi/src/udaf/groups_accumulator.rs
+++ b/datafusion/ffi/src/udaf/groups_accumulator.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::ffi::c_void;
use std::ops::Deref;
use std::ptr::null_mut;
@@ -245,6 +246,13 @@ unsafe extern "C" fn release_fn_wrapper(accumulator: &mut
FFI_GroupsAccumulator)
impl From<Box<dyn GroupsAccumulator>> for FFI_GroupsAccumulator {
fn from(accumulator: Box<dyn GroupsAccumulator>) -> Self {
+ if (accumulator.as_ref() as &dyn Any).is::<ForeignGroupsAccumulator>()
{
+ let accumulator = (accumulator as Box<dyn Any>)
+ .downcast::<ForeignGroupsAccumulator>()
+ .expect("already checked type");
+ return accumulator.accumulator;
+ }
+
let supports_convert_to_state =
accumulator.supports_convert_to_state();
let private_data = GroupsAccumulatorPrivateData { accumulator };
diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs
index 22cbe8cff0..8e791b28b1 100644
--- a/datafusion/ffi/src/udaf/mod.rs
+++ b/datafusion/ffi/src/udaf/mod.rs
@@ -371,6 +371,10 @@ impl Clone for FFI_AggregateUDF {
impl From<Arc<AggregateUDF>> for FFI_AggregateUDF {
fn from(udaf: Arc<AggregateUDF>) -> Self {
+ if let Some(udaf) =
udaf.inner().as_any().downcast_ref::<ForeignAggregateUDF>() {
+ return udaf.udaf.clone();
+ }
+
let name = udaf.name().into();
let aliases = udaf.aliases().iter().map(|a|
a.to_owned().into()).collect();
let is_nullable = udaf.is_nullable();
diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs
index c6ef0f2a50..0202fd8bcf 100644
--- a/datafusion/ffi/src/udf/mod.rs
+++ b/datafusion/ffi/src/udf/mod.rs
@@ -230,6 +230,10 @@ impl Clone for FFI_ScalarUDF {
impl From<Arc<ScalarUDF>> for FFI_ScalarUDF {
fn from(udf: Arc<ScalarUDF>) -> Self {
+ if let Some(udf) =
udf.inner().as_any().downcast_ref::<ForeignScalarUDF>() {
+ return udf.udf.clone();
+ }
+
let name = udf.name().into();
let aliases = udf.aliases().iter().map(|a|
a.to_owned().into()).collect();
let volatility = udf.signature().volatility.into();
diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs
index 6024ec755d..35c13c1169 100644
--- a/datafusion/ffi/src/udtf.rs
+++ b/datafusion/ffi/src/udtf.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::ffi::c_void;
use std::sync::Arc;
@@ -166,6 +167,12 @@ impl FFI_TableFunction {
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
+ if let Some(udtf) =
+ (Arc::clone(&udtf) as Arc<dyn
Any>).downcast_ref::<ForeignTableFunction>()
+ {
+ return udtf.0.clone();
+ }
+
let private_data = Box::new(TableFunctionPrivateData { udtf, runtime
});
Self {
diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs
index dbac00fd43..2e4bd0a294 100644
--- a/datafusion/ffi/src/udwf/mod.rs
+++ b/datafusion/ffi/src/udwf/mod.rs
@@ -222,6 +222,10 @@ impl Clone for FFI_WindowUDF {
impl From<Arc<WindowUDF>> for FFI_WindowUDF {
fn from(udf: Arc<WindowUDF>) -> Self {
+ if let Some(udwf) =
udf.inner().as_any().downcast_ref::<ForeignWindowUDF>() {
+ return udwf.udf.clone();
+ }
+
let name = udf.name().into();
let aliases = udf.aliases().iter().map(|a|
a.to_owned().into()).collect();
let volatility = udf.signature().volatility.into();
diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs
b/datafusion/ffi/src/udwf/partition_evaluator.rs
index 8df02511aa..6820c6e335 100644
--- a/datafusion/ffi/src/udwf/partition_evaluator.rs
+++ b/datafusion/ffi/src/udwf/partition_evaluator.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::ffi::c_void;
use std::ops::Range;
@@ -205,6 +206,13 @@ unsafe extern "C" fn release_fn_wrapper(evaluator: &mut
FFI_PartitionEvaluator)
impl From<Box<dyn PartitionEvaluator>> for FFI_PartitionEvaluator {
fn from(evaluator: Box<dyn PartitionEvaluator>) -> Self {
+ if (evaluator.as_ref() as &dyn Any).is::<ForeignPartitionEvaluator>() {
+ let evaluator = (evaluator as Box<dyn Any>)
+ .downcast::<ForeignPartitionEvaluator>()
+ .expect("already checked type");
+ return evaluator.evaluator;
+ }
+
let is_causal = evaluator.is_causal();
let supports_bounded_execution =
evaluator.supports_bounded_execution();
let include_rank = evaluator.include_rank();
diff --git a/datafusion/ffi/tests/ffi_execution_plan.rs
b/datafusion/ffi/tests/ffi_execution_plan.rs
new file mode 100644
index 0000000000..d81f947dc8
--- /dev/null
+++ b/datafusion/ffi/tests/ffi_execution_plan.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(feature = "integration-tests")]
+mod tests {
+ use arrow::datatypes::Field;
+ use arrow::datatypes::Schema;
+ use arrow_schema::DataType;
+ use datafusion_common::DataFusionError;
+ use datafusion_ffi::execution_plan::FFI_ExecutionPlan;
+ use datafusion_ffi::execution_plan::ForeignExecutionPlan;
+ use datafusion_ffi::execution_plan::{ExecutionPlanPrivateData,
tests::EmptyExec};
+ use datafusion_ffi::tests::utils::get_module;
+ use datafusion_physical_plan::ExecutionPlan;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_ffi_execution_plan_new_sets_runtimes_on_children()
+ -> Result<(), DataFusionError> {
+ // We want to test the case where we have two libraries.
+ // Library A will have a foreign plan from Library B, called
child_plan.
+ // Library A will add a plan called grandchild_plan under child_plan
+ // Library A will create a plan called parent_plan, that has child_plan
+ // under it. So we should have:
+ // parent_plan (local) -> child_plan (foreign) -> grandchild_plan
(local)
+ // Then we want to turn parent_plan into a FFI plan.
+ // Verify that grandchild_plan also gets the same runtime as
parent_plan.
+
+ let module = get_module()?;
+
+ fn generate_local_plan() -> Arc<dyn ExecutionPlan> {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
false)]));
+
+ Arc::new(EmptyExec::new(schema))
+ }
+
+ let child_plan =
+ module
+ .create_empty_exec()
+ .ok_or(DataFusionError::NotImplemented(
+ "External module failed to implement
create_empty_exec".to_string(),
+ ))?();
+ let child_plan: Arc<dyn ExecutionPlan> = (&child_plan)
+ .try_into()
+ .expect("should be able create plan");
+ assert!(child_plan.as_any().is::<ForeignExecutionPlan>());
+
+ let grandchild_plan = generate_local_plan();
+
+ let child_plan = child_plan.with_new_children(vec![grandchild_plan])?;
+
+ unsafe {
+ // Originally the runtime is not set. We go through the unsafe
casting
+ // of data here because the `inner()` function is private and this
is
+ // only an integration test so we do not want to expose it.
+ let ffi_child = FFI_ExecutionPlan::new(Arc::clone(&child_plan),
None);
+ let ffi_grandchild =
+ (ffi_child.children)(&ffi_child).into_iter().next().unwrap();
+
+ let grandchild_private_data =
+ ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
+ assert!((*grandchild_private_data).runtime.is_none());
+ }
+
+ let parent_plan =
generate_local_plan().with_new_children(vec![child_plan])?;
+
+ // Adding the grandchild beneath this FFI plan should get the runtime
passed down.
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+ let ffi_parent =
+ FFI_ExecutionPlan::new(parent_plan,
Some(runtime.handle().clone()));
+
+ unsafe {
+ let ffi_child = (ffi_parent.children)(&ffi_parent)
+ .into_iter()
+ .next()
+ .unwrap();
+ let ffi_grandchild =
+ (ffi_child.children)(&ffi_child).into_iter().next().unwrap();
+ assert_eq!(
+ (ffi_grandchild.library_marker_id)(),
+ (ffi_parent.library_marker_id)()
+ );
+
+ let grandchild_private_data =
+ ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
+ assert!((*grandchild_private_data).runtime.is_some());
+ }
+
+ Ok(())
+ }
+}
diff --git a/datafusion/ffi/tests/ffi_integration.rs
b/datafusion/ffi/tests/ffi_integration.rs
index 80538d4f92..1be486589b 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -58,9 +58,9 @@ mod tests {
let results = df.collect().await?;
assert_eq!(results.len(), 3);
- assert_eq!(results[0], create_record_batch(1, 5));
- assert_eq!(results[1], create_record_batch(6, 1));
- assert_eq!(results[2], create_record_batch(7, 5));
+ assert!(results.contains(&create_record_batch(1, 5)));
+ assert!(results.contains(&create_record_batch(6, 1)));
+ assert!(results.contains(&create_record_batch(7, 5)));
Ok(())
}
diff --git
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs
index f716b48f0c..d1d8924a2c 100644
---
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs
+++
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs
@@ -37,7 +37,7 @@ use super::accumulate::NullState;
#[derive(Debug)]
pub struct BooleanGroupsAccumulator<F>
where
- F: Fn(bool, bool) -> bool + Send + Sync,
+ F: Fn(bool, bool) -> bool + Send + Sync + 'static,
{
/// values per group
values: BooleanBufferBuilder,
@@ -55,7 +55,7 @@ where
impl<F> BooleanGroupsAccumulator<F>
where
- F: Fn(bool, bool) -> bool + Send + Sync,
+ F: Fn(bool, bool) -> bool + Send + Sync + 'static,
{
pub fn new(bool_fn: F, identity: bool) -> Self {
Self {
@@ -69,7 +69,7 @@ where
impl<F> GroupsAccumulator for BooleanGroupsAccumulator<F>
where
- F: Fn(bool, bool) -> bool + Send + Sync,
+ F: Fn(bool, bool) -> bool + Send + Sync + 'static,
{
fn update_batch(
&mut self,
diff --git
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs
index acf875b686..a81b89e1e4 100644
---
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs
+++
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs
@@ -41,7 +41,7 @@ use super::accumulate::NullState;
pub struct PrimitiveGroupsAccumulator<T, F>
where
T: ArrowPrimitiveType + Send,
- F: Fn(&mut T::Native, T::Native) + Send + Sync,
+ F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static,
{
/// values per group, stored as the native type
values: Vec<T::Native>,
@@ -62,7 +62,7 @@ where
impl<T, F> PrimitiveGroupsAccumulator<T, F>
where
T: ArrowPrimitiveType + Send,
- F: Fn(&mut T::Native, T::Native) + Send + Sync,
+ F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static,
{
pub fn new(data_type: &DataType, prim_fn: F) -> Self {
Self {
@@ -84,7 +84,7 @@ where
impl<T, F> GroupsAccumulator for PrimitiveGroupsAccumulator<T, F>
where
T: ArrowPrimitiveType + Send,
- F: Fn(&mut T::Native, T::Native) + Send + Sync,
+ F: Fn(&mut T::Native, T::Native) + Send + Sync + 'static,
{
fn update_batch(
&mut self,
diff --git a/datafusion/functions-aggregate/src/average.rs
b/datafusion/functions-aggregate/src/average.rs
index 543116db1d..1ddb549ae8 100644
--- a/datafusion/functions-aggregate/src/average.rs
+++ b/datafusion/functions-aggregate/src/average.rs
@@ -754,7 +754,7 @@ impl Accumulator for DurationAvgAccumulator {
struct AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
- F: Fn(T::Native, u64) -> Result<T::Native> + Send,
+ F: Fn(T::Native, u64) -> Result<T::Native> + Send + 'static,
{
/// The type of the internal sum
sum_data_type: DataType,
@@ -778,7 +778,7 @@ where
impl<T, F> AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
- F: Fn(T::Native, u64) -> Result<T::Native> + Send,
+ F: Fn(T::Native, u64) -> Result<T::Native> + Send + 'static,
{
pub fn new(sum_data_type: &DataType, return_data_type: &DataType, avg_fn:
F) -> Self {
debug!(
@@ -800,7 +800,7 @@ where
impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
- F: Fn(T::Native, u64) -> Result<T::Native> + Send,
+ F: Fn(T::Native, u64) -> Result<T::Native> + Send + 'static,
{
fn update_batch(
&mut self,
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index a5d74d7f49..1e0264690d 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -106,7 +106,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
Self: Sized;
}
-pub trait LogicalExtensionCodec: Debug + Send + Sync {
+pub trait LogicalExtensionCodec: Debug + Send + Sync + std::any::Any {
fn try_decode(
&self,
buf: &[u8],
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 99cd39d9f2..cf06b60d1c 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
@@ -3654,7 +3655,7 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
Self: Sized;
}
-pub trait PhysicalExtensionCodec: Debug + Send + Sync {
+pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any {
fn try_decode(
&self,
buf: &[u8],
diff --git a/datafusion/spark/src/function/aggregate/avg.rs
b/datafusion/spark/src/function/aggregate/avg.rs
index bbcda9b0f8..9ad712713d 100644
--- a/datafusion/spark/src/function/aggregate/avg.rs
+++ b/datafusion/spark/src/function/aggregate/avg.rs
@@ -213,7 +213,7 @@ impl Accumulator for AvgAccumulator {
struct AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
- F: Fn(T::Native, i64) -> Result<T::Native> + Send,
+ F: Fn(T::Native, i64) -> Result<T::Native> + Send + 'static,
{
/// The type of the returned average
return_data_type: DataType,
@@ -231,7 +231,7 @@ where
impl<T, F> AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
- F: Fn(T::Native, i64) -> Result<T::Native> + Send,
+ F: Fn(T::Native, i64) -> Result<T::Native> + Send + 'static,
{
pub fn new(return_data_type: &DataType, avg_fn: F) -> Self {
Self {
@@ -246,7 +246,7 @@ where
impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
- F: Fn(T::Native, i64) -> Result<T::Native> + Send,
+ F: Fn(T::Native, i64) -> Result<T::Native> + Send + 'static,
{
fn update_batch(
&mut self,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]