This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 580e622d73 Early exit on column normalisation to improve DataFrame
performance (#14636)
580e622d73 is described below
commit 580e622d7388669df6fd2e0e69f45a11c7ad4208
Author: Dmitrii Blaginin <[email protected]>
AuthorDate: Mon Feb 17 11:57:12 2025 +0000
Early exit on column normalisation to improve DataFrame performance (#14636)
* Exit early if the column is normalized
* `Alias` original column
* Add references test
---
datafusion/core/src/dataframe/mod.rs | 3 ++-
datafusion/core/tests/dataframe/mod.rs | 26 +++++++++++++++++++++-----
datafusion/expr/src/logical_plan/builder.rs | 8 +++++++-
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index fd0765236e..4e5ccab14f 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -1811,7 +1811,8 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() ==
field_rename {
- col(Column::from((qualifier, field))).alias(new_name)
+ col(Column::from((qualifier, field)))
+ .alias_qualified(qualifier.cloned(), new_name)
} else {
col(Column::from((qualifier, field)))
}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index b0d469d751..d545157607 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -63,7 +63,7 @@ use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_catalog::TableProvider;
use datafusion_common::{
assert_contains, Constraint, Constraints, DataFusionError, ParamValues,
ScalarValue,
- UnnestOptions,
+ TableReference, UnnestOptions,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::config::SessionConfig;
@@ -1617,9 +1617,25 @@ async fn with_column_renamed() -> Result<()> {
// accepts table qualifier
.with_column_renamed("aggregate_test_100.c2", "two")?
// no-op for missing column
- .with_column_renamed("c4", "boom")?
- .collect()
- .await?;
+ .with_column_renamed("c4", "boom")?;
+
+ let references: Vec<_> = df_sum_renamed
+ .schema()
+ .iter()
+ .map(|(a, _)| a.cloned())
+ .collect();
+
+ assert_eq!(
+ references,
+ vec![
+ Some(TableReference::bare("aggregate_test_100")), // table name is
preserved
+ Some(TableReference::bare("aggregate_test_100")),
+ Some(TableReference::bare("aggregate_test_100")),
+ None // total column
+ ]
+ );
+
+ let batches = &df_sum_renamed.collect().await?;
assert_batches_sorted_eq!(
[
@@ -1629,7 +1645,7 @@ async fn with_column_renamed() -> Result<()> {
"| a | 3 | -72 | -69 |",
"+-----+-----+-----+-------+",
],
- &df_sum_renamed
+ batches
);
Ok(())
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index b1107f2add..45889e96b5 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -847,10 +847,16 @@ impl LogicalPlanBuilder {
plan: &LogicalPlan,
column: impl Into<Column>,
) -> Result<Column> {
+ let column = column.into();
+ if column.relation.is_some() {
+ // column is already normalized
+ return Ok(column);
+ }
+
let schema = plan.schema();
let fallback_schemas = plan.fallback_normalize_schemas();
let using_columns = plan.using_columns()?;
- column.into().normalize_with_schemas_and_ambiguity_check(
+ column.normalize_with_schemas_and_ambiguity_check(
&[&[schema], &fallback_schemas],
&using_columns,
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]