This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 094e7ee90a physical plan: add `reset_plan_states `, plan re-use
benchmark (#19806)
094e7ee90a is described below
commit 094e7ee90ad2b0ee993b8682b34828152958bc29
Author: Albert Skalt <[email protected]>
AuthorDate: Thu Jan 15 21:10:30 2026 +0300
physical plan: add `reset_plan_states `, plan re-use benchmark (#19806)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Part of https://github.com/apache/datafusion/issues/19796
## What changes are included in this PR?
This patch adds a benchmark which is intended to measure overhead of
actions, required to perform making an independent instance of the
execution plan to re-execute it, avoiding re-planning stage. There are
several typical plans that are tested, covering projection, aggregation,
filtration, re-partition.
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
The function `reset_plan_states(...)` is publically exported.
---
datafusion/core/Cargo.toml | 4 +
datafusion/core/benches/reset_plan_states.rs | 198 ++++++++++++++++++++++++
datafusion/physical-plan/src/execution_plan.rs | 25 +++
datafusion/physical-plan/src/recursive_query.rs | 16 +-
datafusion/physical-plan/src/test.rs | 2 +-
5 files changed, 229 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index d485c04e38..5c7e944e59 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -281,3 +281,7 @@ name = "spm"
harness = false
name = "preserve_file_partitioning"
required-features = ["parquet"]
+
+[[bench]]
+harness = false
+name = "reset_plan_states"
diff --git a/datafusion/core/benches/reset_plan_states.rs
b/datafusion/core/benches/reset_plan_states.rs
new file mode 100644
index 0000000000..f2f81f755b
--- /dev/null
+++ b/datafusion/core/benches/reset_plan_states.rs
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{Arc, LazyLock};
+
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
+use criterion::{Criterion, criterion_group, criterion_main};
+use datafusion::prelude::SessionContext;
+use datafusion_catalog::MemTable;
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::displayable;
+use datafusion_physical_plan::execution_plan::reset_plan_states;
+use tokio::runtime::Runtime;
+
+const NUM_FIELDS: usize = 1000;
+const PREDICATE_LEN: usize = 50;
+
+static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
+ Arc::new(Schema::new(
+ (0..NUM_FIELDS)
+ .map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64,
false)))
+ .collect::<Fields>(),
+ ))
+});
+
+fn col_name(i: usize) -> String {
+ format!("x_{i}")
+}
+
+fn aggr_name(i: usize) -> String {
+ format!("aggr_{i}")
+}
+
+fn physical_plan(
+ ctx: &SessionContext,
+ rt: &Runtime,
+ sql: &str,
+) -> Arc<dyn ExecutionPlan> {
+ rt.block_on(async {
+ ctx.sql(sql)
+ .await
+ .unwrap()
+ .create_physical_plan()
+ .await
+ .unwrap()
+ })
+}
+
+fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
+ let mut predicate = String::new();
+ for i in 0..len {
+ if i > 0 {
+ predicate.push_str(" AND ");
+ }
+ predicate.push_str(&col_name(i));
+ predicate.push_str(" = ");
+ predicate.push_str(&i.to_string());
+ }
+ predicate
+}
+
+/// Returns a typical plan for the query like:
+///
+/// ```sql
+/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
+/// WHERE p1
+/// HAVING p2
+/// ```
+///
+/// Where `p1` and `p2` some long predicates.
+///
+fn query1() -> String {
+ let mut query = String::new();
+ query.push_str("SELECT ");
+ for i in 0..NUM_FIELDS {
+ if i > 0 {
+ query.push_str(", ");
+ }
+ query.push_str("AVG(");
+ query.push_str(&col_name(i));
+ query.push_str(") AS ");
+ query.push_str(&aggr_name(i));
+ }
+ query.push_str(" FROM t WHERE ");
+ query.push_str(&predicate(col_name, PREDICATE_LEN));
+ query.push_str(" HAVING ");
+ query.push_str(&predicate(aggr_name, PREDICATE_LEN));
+ query
+}
+
+/// Returns a typical plan for the query like:
+///
+/// ```sql
+/// SELECT projection FROM t JOIN v ON t.a = v.a
+/// WHERE p1
+/// ```
+///
+fn query2() -> String {
+ let mut query = String::new();
+ query.push_str("SELECT ");
+ for i in (0..NUM_FIELDS).step_by(2) {
+ if i > 0 {
+ query.push_str(", ");
+ }
+ if (i / 2) % 2 == 0 {
+ query.push_str(&format!("t.{}", col_name(i)));
+ } else {
+ query.push_str(&format!("v.{}", col_name(i)));
+ }
+ }
+ query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");
+
+ fn qualified_name(i: usize) -> String {
+ format!("t.{}", col_name(i))
+ }
+
+ query.push_str(&predicate(qualified_name, PREDICATE_LEN));
+ query
+}
+
+/// Returns a typical plan for the query like:
+///
+/// ```sql
+/// SELECT projection FROM t
+/// WHERE p
+/// ```
+///
+fn query3() -> String {
+ let mut query = String::new();
+ query.push_str("SELECT ");
+
+ // Create non-trivial projection.
+ for i in 0..NUM_FIELDS / 2 {
+ if i > 0 {
+ query.push_str(", ");
+ }
+ query.push_str(&col_name(i * 2));
+ query.push_str(" + ");
+ query.push_str(&col_name(i * 2 + 1));
+ }
+
+ query.push_str(" FROM t WHERE ");
+ query.push_str(&predicate(col_name, PREDICATE_LEN));
+ query
+}
+
+fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>)
{
+ b.iter(||
std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
+}
+
+/// Benchmark is intended to measure overhead of actions, required to perform
+/// making an independent instance of the execution plan to re-execute it,
avoiding
+/// re-planning stage.
+fn bench_reset_plan_states(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table(
+ "t",
+ Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![],
vec![]]).unwrap()),
+ )
+ .unwrap();
+
+ ctx.register_table(
+ "v",
+ Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![],
vec![]]).unwrap()),
+ )
+ .unwrap();
+
+ macro_rules! bench_query {
+ ($query_producer: expr) => {{
+ let sql = $query_producer();
+ let plan = physical_plan(&ctx, &rt, &sql);
+ log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
+ move |b| run_reset_states(b, &plan)
+ }};
+ }
+
+ c.bench_function("query1", bench_query!(query1));
+ c.bench_function("query2", bench_query!(query2));
+ c.bench_function("query3", bench_query!(query3));
+}
+
+criterion_group!(benches, bench_reset_plan_states);
+criterion_main!(benches);
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 06da0b8933..8d72921d06 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;
pub use datafusion_common::hash_utils;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -1384,6 +1385,30 @@ pub fn check_not_null_constraints(
Ok(batch)
}
+/// Make plan ready to be re-executed returning its clone with state reset for
all nodes.
+///
+/// Some plans will change their internal states after execution, making them
unable to be executed again.
+/// This function uses [`ExecutionPlan::reset_state`] to reset any internal
state within the plan.
+///
+/// An example is `CrossJoinExec`, which loads the left table into memory and
stores it in the plan.
+/// However, if the data of the left table is derived from the work table, it
will become outdated
+/// as the work table changes. When the next iteration executes this plan
again, we must clear the left table.
+///
+/// # Limitations
+///
+/// While this function enables plan reuse, it does not allow the same plan to
be executed if it (OR):
+///
+/// * uses dynamic filters,
+/// * represents a recursive query.
+///
+pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn
ExecutionPlan>> {
+ plan.transform_up(|plan| {
+ let new_plan = Arc::clone(&plan).reset_state()?;
+ Ok(Transformed::yes(new_plan))
+ })
+ .data()
+}
+
/// Utility function yielding a string representation of the given
[`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index 683dbb4e49..936a02581e 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
use super::work_table::{ReservedBatches, WorkTable};
use crate::aggregates::group_values::{GroupValues, new_group_values};
use crate::aggregates::order::GroupOrdering;
-use crate::execution_plan::{Boundedness, EmissionType};
+use crate::execution_plan::{Boundedness, EmissionType, reset_plan_states};
use crate::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
};
@@ -387,20 +387,6 @@ fn assign_work_table(
.data()
}
-/// Some plans will change their internal states after execution, making them
unable to be executed again.
-/// This function uses [`ExecutionPlan::reset_state`] to reset any internal
state within the plan.
-///
-/// An example is `CrossJoinExec`, which loads the left table into memory and
stores it in the plan.
-/// However, if the data of the left table is derived from the work table, it
will become outdated
-/// as the work table changes. When the next iteration executes this plan
again, we must clear the left table.
-fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn
ExecutionPlan>> {
- plan.transform_up(|plan| {
- let new_plan = Arc::clone(&plan).reset_state()?;
- Ok(Transformed::yes(new_plan))
- })
- .data()
-}
-
impl Stream for RecursiveQueryStream {
type Item = Result<RecordBatch>;
diff --git a/datafusion/physical-plan/src/test.rs
b/datafusion/physical-plan/src/test.rs
index c94b5a4131..c6d0940c35 100644
--- a/datafusion/physical-plan/src/test.rs
+++ b/datafusion/physical-plan/src/test.rs
@@ -146,7 +146,7 @@ impl ExecutionPlan for TestMemoryExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- unimplemented!()
+ Ok(self)
}
fn repartitioned(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]