This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new eb13f598fa Adding Constant Check for FilterExec (#9649)
eb13f598fa is described below
commit eb13f598fa722382cb6580ff9fc31e458ad7a8a9
Author: Lordworms <[email protected]>
AuthorDate: Mon Mar 18 09:48:12 2024 -0500
Adding Constant Check for FilterExec (#9649)
* fix bugs in adding extra SortExec
* adding tests
* optimize code
* Update datafusion/physical-plan/src/filter.rs
Co-authored-by: Mustafa Akur
<[email protected]>
* optimize code
* optimize code
* optimize code
* optimize code
* fix clippy
---------
Co-authored-by: Mustafa Akur
<[email protected]>
---
datafusion/physical-plan/src/filter.rs | 27 +++++++++-
.../test_files/filter_without_sort_exec.slt | 61 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 1 deletion(-)
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 4155b00820..72f885a939 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -159,6 +159,27 @@ impl FilterExec {
})
}
+ fn extend_constants(
+ input: &Arc<dyn ExecutionPlan>,
+ predicate: &Arc<dyn PhysicalExpr>,
+ ) -> Vec<Arc<dyn PhysicalExpr>> {
+ let mut res_constants = Vec::new();
+ let input_eqs = input.equivalence_properties();
+
+ let conjunctions = split_conjunction(predicate);
+ for conjunction in conjunctions {
+ if let Some(binary) =
conjunction.as_any().downcast_ref::<BinaryExpr>() {
+ if binary.op() == &Operator::Eq {
+ if input_eqs.is_expr_constant(binary.left()) {
+ res_constants.push(binary.right().clone())
+ } else if input_eqs.is_expr_constant(binary.right()) {
+ res_constants.push(binary.left().clone())
+ }
+ }
+ }
+ }
+ res_constants
+ }
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
@@ -181,8 +202,12 @@ impl FilterExec {
.into_iter()
.filter(|column|
stats.column_statistics[column.index()].is_singleton())
.map(|column| Arc::new(column) as _);
+ // this is for statistics
eq_properties = eq_properties.add_constants(constants);
-
+ // this is for logical constant (for example: a = '1', then a could be
marked as a constant)
+ // to do: how to deal with multiple situation to represent = (for
example c1 between 0 and 0)
+ eq_properties =
+ eq_properties.add_constants(Self::extend_constants(input,
predicate));
Ok(PlanProperties::new(
eq_properties,
input.output_partitioning().clone(), // Output Partitioning
diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
new file mode 100644
index 0000000000..05e622db8a
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# prepare table
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE data (
+ "date" VARCHAR,
+ "ticker" VARCHAR,
+ "time" VARCHAR,
+) STORED AS CSV
+WITH ORDER ("date", "ticker", "time")
+LOCATION './a.parquet';
+
+
+# query
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A'
+ORDER BY "date", "time";
+----
+logical_plan
+Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A")
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# query
+query TT
+explain SELECT * FROM data
+WHERE date = 'A'
+ORDER BY "ticker", "time";
+----
+logical_plan
+Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
+--Filter: data.date = Utf8("A")
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: date@0 = A
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker,
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1
ASC NULLS LAST, time@2 ASC NULLS LAST]