This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new eb13f598fa Adding Constant Check for FilterExec (#9649)
eb13f598fa is described below

commit eb13f598fa722382cb6580ff9fc31e458ad7a8a9
Author: Lordworms <[email protected]>
AuthorDate: Mon Mar 18 09:48:12 2024 -0500

    Adding Constant Check for FilterExec (#9649)
    
    * fix bugs in adding extra SortExec
    
    * adding tests
    
    * optimize code
    
    * Update datafusion/physical-plan/src/filter.rs
    
    Co-authored-by: Mustafa Akur 
<[email protected]>
    
    * optimize code
    
    * optimize code
    
    * optimize code
    
    * optimize code
    
    * fix clippy
    
    ---------
    
    Co-authored-by: Mustafa Akur 
<[email protected]>
---
 datafusion/physical-plan/src/filter.rs             | 27 +++++++++-
 .../test_files/filter_without_sort_exec.slt        | 61 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 4155b00820..72f885a939 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -159,6 +159,27 @@ impl FilterExec {
         })
     }
 
+    fn extend_constants(
+        input: &Arc<dyn ExecutionPlan>,
+        predicate: &Arc<dyn PhysicalExpr>,
+    ) -> Vec<Arc<dyn PhysicalExpr>> {
+        let mut res_constants = Vec::new();
+        let input_eqs = input.equivalence_properties();
+
+        let conjunctions = split_conjunction(predicate);
+        for conjunction in conjunctions {
+            if let Some(binary) = 
conjunction.as_any().downcast_ref::<BinaryExpr>() {
+                if binary.op() == &Operator::Eq {
+                    if input_eqs.is_expr_constant(binary.left()) {
+                        res_constants.push(binary.right().clone())
+                    } else if input_eqs.is_expr_constant(binary.right()) {
+                        res_constants.push(binary.left().clone())
+                    }
+                }
+            }
+        }
+        res_constants
+    }
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
@@ -181,8 +202,12 @@ impl FilterExec {
             .into_iter()
             .filter(|column| 
stats.column_statistics[column.index()].is_singleton())
             .map(|column| Arc::new(column) as _);
+        // this is for statistics
         eq_properties = eq_properties.add_constants(constants);
-
+        // this is for logical constant (for example: a = '1', then a could be 
marked as a constant)
+        // to do: how to deal with multiple situation to represent = (for 
example c1 between 0 and 0)
+        eq_properties =
+            eq_properties.add_constants(Self::extend_constants(input, 
predicate));
         Ok(PlanProperties::new(
             eq_properties,
             input.output_partitioning().clone(), // Output Partitioning
diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt 
b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
new file mode 100644
index 0000000000..05e622db8a
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# prepare table
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE data (
+    "date"   VARCHAR, 
+    "ticker" VARCHAR, 
+    "time"   VARCHAR,
+) STORED AS CSV
+WITH ORDER ("date", "ticker", "time")
+LOCATION './a.parquet';
+
+
+# query
+query TT
+explain SELECT * FROM data 
+WHERE ticker = 'A' 
+ORDER BY "date", "time";
+----
+logical_plan
+Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A")
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# query
+query TT
+explain SELECT * FROM data 
+WHERE date = 'A' 
+ORDER BY "ticker", "time";
+----
+logical_plan
+Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
+--Filter: data.date = Utf8("A")
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: date@0 = A
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]

Reply via email to