This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ec6abece2d Fix None Projections in Projection Pushdown (#9005)
ec6abece2d is described below
commit ec6abece2dcfa68007b87c69eefa6b0d7333f628
Author: Berkay Şahin <[email protected]>
AuthorDate: Fri Jan 26 14:40:20 2024 +0300
Fix None Projections in Projection Pushdown (#9005)
* Fix none projections
* Update select.slt
---
.../src/physical_optimizer/projection_pushdown.rs | 26 +++++++++++++++-------
datafusion/sqllogictest/test_files/select.slt | 14 ++++++++++++
2 files changed, 32 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 34d1af8556..1d1bee6180 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -163,8 +163,12 @@ fn try_swapping_with_csv(
// This process can be moved into CsvExec, but it would be an overlap of
their responsibility.
all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = csv.base_config().clone();
- let new_projections =
- new_projections_for_columns(projection, &file_scan.projection);
+ let new_projections = new_projections_for_columns(
+ projection,
+ &file_scan
+ .projection
+ .unwrap_or((0..csv.schema().fields().len()).collect()),
+ );
file_scan.projection = Some(new_projections);
Arc::new(CsvExec::new(
@@ -188,8 +192,11 @@ fn try_swapping_with_memory(
// This process can be moved into MemoryExec, but it would be an overlap
of their responsibility.
all_alias_free_columns(projection.expr())
.then(|| {
- let new_projections =
- new_projections_for_columns(projection, memory.projection());
+ let all_projections =
(0..memory.schema().fields().len()).collect();
+ let new_projections = new_projections_for_columns(
+ projection,
+ memory.projection().as_ref().unwrap_or(&all_projections),
+ );
MemoryExec::try_new(
memory.partitions(),
@@ -216,8 +223,11 @@ fn try_swapping_with_streaming_table(
.projection()
.as_ref()
.map(|i| i.as_ref().to_vec());
- let new_projections =
- new_projections_for_columns(projection, &streaming_table_projections);
+ let new_projections = new_projections_for_columns(
+ projection,
+ &streaming_table_projections
+ .unwrap_or((0..streaming_table.schema().fields().len()).collect()),
+ );
let mut lex_orderings = vec![];
for lex_ordering in
streaming_table.projected_output_ordering().into_iter() {
@@ -833,7 +843,7 @@ fn all_alias_free_columns(exprs: &[(Arc<dyn PhysicalExpr>,
String)]) -> bool {
/// ensure that all expressions are `Column` expressions without aliases.
fn new_projections_for_columns(
projection: &ProjectionExec,
- source: &Option<Vec<usize>>,
+ source: &[usize],
) -> Vec<usize> {
projection
.expr()
@@ -841,7 +851,7 @@ fn new_projections_for_columns(
.filter_map(|(expr, _)| {
expr.as_any()
.downcast_ref::<Column>()
- .and_then(|expr| source.as_ref().map(|proj|
proj[expr.index()]))
+ .map(|expr| source[expr.index()])
})
.collect()
}
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index 2b47fec7ac..b7bbc07065 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1569,3 +1569,17 @@ query I
select count(1) from v;
----
1
+
+# run below query without logical optimizations
+statement ok
+set datafusion.optimizer.max_passes=0;
+
+statement ok
+CREATE TABLE t(a int, b int);
+
+query I
+select a from t;
+----
+
+statement ok
+set datafusion.optimizer.max_passes=3;
\ No newline at end of file