This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 453a45aa79 fix: issue #8838 discard extra sort when sorted element is 
wrapped (#9127)
453a45aa79 is described below

commit 453a45aa796fe798987057ef70a184a16d454660
Author: Lordworms <[email protected]>
AuthorDate: Tue Feb 20 01:47:46 2024 -0600

    fix: issue #8838 discard extra sort when sorted element is wrapped (#9127)
    
    * fix: issue #8838 discard extra sort when sorted element is wrapped
    
    fix: issue #8838 discard extra sort when sorted element is wrapped
    
    fix: issue #8838 discard extra sort when sorted element is wrapped
    
    * fix bugs
    
    * fix bugs
    
    * fix bugs
    
    * fix:bugs
    
    * adding tests
    
    * adding cast UTF8 type and diable scalarfunction situation
    
    * fix typo
---
 .../physical-expr/src/equivalence/properties.rs    | 93 ++++++++++++++++++++--
 datafusion/physical-expr/src/expressions/cast.rs   | 30 +++++--
 datafusion/physical-plan/src/projection.rs         | 16 ++--
 .../test_files/monotonic_projection_test.slt       | 86 ++++++++++++++++++++
 4 files changed, 209 insertions(+), 16 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 386e74b4a6..9e60ffe748 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::expressions::CastExpr;
+use arrow_schema::SchemaRef;
+use datafusion_common::{JoinSide, JoinType};
+use indexmap::IndexSet;
+use itertools::Itertools;
 use std::collections::{HashMap, HashSet};
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
@@ -31,12 +36,8 @@ use crate::{
     PhysicalSortRequirement,
 };
 
-use arrow_schema::{SchemaRef, SortOptions};
+use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{JoinSide, JoinType};
-
-use indexmap::IndexSet;
-use itertools::Itertools;
 
 /// A `EquivalenceProperties` object stores useful information related to a 
schema.
 /// Currently, it keeps track of:
@@ -426,6 +427,87 @@ impl EquivalenceProperties {
         (!meet.is_empty()).then_some(meet)
     }
 
+    /// we substitute the ordering according to input expression type, this is 
a simplified version
+    /// In this case, we just substitute when the expression satisfy the 
following confition
+    /// I. just have one column and is a CAST expression
+    /// II. just have one parameter and is a ScalarFUnctionexpression and it 
is monotonic
+    /// TODO: we could precompute all the senario that is computable, for 
example: atan(x + 1000) should also be substituted if
+    /// x is DESC or ASC
+    pub fn substitute_ordering_component(
+        matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
+        sort_expr: &[PhysicalSortExpr],
+        schema: SchemaRef,
+    ) -> Vec<PhysicalSortExpr> {
+        sort_expr
+            .iter()
+            .filter(|sort_expr| {
+                matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
+            })
+            .map(|sort_expr| {
+                let referring_exprs: Vec<_> = matching_exprs
+                    .iter()
+                    .filter(|matched| expr_refers(matched, &sort_expr.expr))
+                    .cloned()
+                    .collect();
+                // does not referring to any matching component, we just skip 
it
+
+                if referring_exprs.len() == 1 {
+                    // we check whether this expression is substitutable or not
+                    let r_expr = referring_exprs[0].clone();
+                    if let Some(cast_expr) = 
r_expr.as_any().downcast_ref::<CastExpr>() {
+                        // we need to know whether the Cast Expr matches or not
+                        let expr_type =
+                            sort_expr.expr.data_type(schema.as_ref()).unwrap();
+                        if cast_expr.expr.eq(&sort_expr.expr)
+                            && cast_expr.is_bigger_cast(expr_type)
+                        {
+                            PhysicalSortExpr {
+                                expr: r_expr.clone(),
+                                options: sort_expr.options,
+                            }
+                        } else {
+                            sort_expr.clone()
+                        }
+                    } else {
+                        sort_expr.clone()
+                    }
+                } else {
+                    sort_expr.clone()
+                }
+            })
+            .collect()
+    }
+    /// In projection, supposed we have a input function 'A DESC B DESC' and 
the output shares the same expression
+    /// with A and B, we could surely use the ordering of the original 
ordering, However, if the A has been changed,
+    /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we 
continue using the original ordering
+    /// Since it would cause bug in dependency constructions, we should 
substitute the input order in order to get correct
+    /// dependency map, happen in issue 8838: 
<https://github.com/apache/arrow-datafusion/issues/8838>
+    pub fn substitute_oeq_class(
+        &mut self,
+        exprs: &[(Arc<dyn PhysicalExpr>, String)],
+        mapping: &ProjectionMapping,
+        schema: SchemaRef,
+    ) {
+        let matching_exprs: Arc<Vec<_>> = Arc::new(
+            exprs
+                .iter()
+                .filter(|(expr, _)| mapping.iter().any(|(source, _)| 
source.eq(expr)))
+                .map(|(source, _)| source)
+                .collect(),
+        );
+        let orderings = std::mem::take(&mut self.oeq_class.orderings);
+        let new_order = orderings
+            .into_iter()
+            .map(move |order| {
+                Self::substitute_ordering_component(
+                    matching_exprs.clone(),
+                    &order,
+                    schema.clone(),
+                )
+            })
+            .collect();
+        self.oeq_class = OrderingEquivalenceClass::new(new_order);
+    }
     /// Projects argument `expr` according to `projection_mapping`, taking
     /// equivalences into account.
     ///
@@ -564,7 +646,6 @@ impl EquivalenceProperties {
 
         // Get dependency map for existing orderings:
         let dependency_map = self.construct_dependency_map(&mapping);
-
         let orderings = mapping.iter().flat_map(|(source, target)| {
             referred_dependencies(&dependency_map, source)
                 .into_iter()
diff --git a/datafusion/physical-expr/src/expressions/cast.rs 
b/datafusion/physical-expr/src/expressions/cast.rs
index 0c4ed3c125..b0e175e711 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
+use crate::PhysicalExpr;
 use std::any::Any;
 use std::fmt;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
-
-use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
-use crate::PhysicalExpr;
+use DataType::*;
 
 use arrow::compute::{can_cast_types, kernels, CastOptions};
 use arrow::datatypes::{DataType, Schema};
@@ -41,7 +41,7 @@ const DEFAULT_CAST_OPTIONS: CastOptions<'static> = 
CastOptions {
 #[derive(Debug, Clone)]
 pub struct CastExpr {
     /// The expression to cast
-    expr: Arc<dyn PhysicalExpr>,
+    pub expr: Arc<dyn PhysicalExpr>,
     /// The data type to cast to
     cast_type: DataType,
     /// Cast options
@@ -76,6 +76,26 @@ impl CastExpr {
     pub fn cast_options(&self) -> &CastOptions<'static> {
         &self.cast_options
     }
+    pub fn is_bigger_cast(&self, src: DataType) -> bool {
+        if src == self.cast_type {
+            return true;
+        }
+        matches!(
+            (src, &self.cast_type),
+            (Int8, Int16 | Int32 | Int64)
+                | (Int16, Int32 | Int64)
+                | (Int32, Int64)
+                | (UInt8, UInt16 | UInt32 | UInt64)
+                | (UInt16, UInt32 | UInt64)
+                | (UInt32, UInt64)
+                | (
+                    Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
+                    Float32 | Float64
+                )
+                | (Int64 | UInt64, Float64)
+                | (Utf8, LargeUtf8)
+        )
+    }
 }
 
 impl fmt::Display for CastExpr {
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index cc2ab62049..51423d37e7 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -70,7 +70,6 @@ impl ProjectionExec {
         input: Arc<dyn ExecutionPlan>,
     ) -> Result<Self> {
         let input_schema = input.schema();
-
         let fields: Result<Vec<Field>> = expr
             .iter()
             .map(|(e, name)| {
@@ -95,7 +94,10 @@ impl ProjectionExec {
         // construct a map from the input expressions to the output expression 
of the Projection
         let projection_mapping = ProjectionMapping::try_new(&expr, 
&input_schema)?;
 
-        let input_eqs = input.equivalence_properties();
+        let mut input_eqs = input.equivalence_properties();
+
+        input_eqs.substitute_oeq_class(&expr, &projection_mapping, 
input_schema.clone());
+
         let project_eqs = input_eqs.project(&projection_mapping, 
schema.clone());
         let output_ordering = project_eqs.oeq_class().output_ordering();
 
@@ -201,9 +203,13 @@ impl ExecutionPlan for ProjectionExec {
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
-        self.input
-            .equivalence_properties()
-            .project(&self.projection_mapping, self.schema())
+        let mut equi_properties = self.input.equivalence_properties();
+        equi_properties.substitute_oeq_class(
+            &self.expr,
+            &self.projection_mapping,
+            self.input.schema().clone(),
+        );
+        equi_properties.project(&self.projection_mapping, self.schema())
     }
 
     fn with_new_children(
diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt 
b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt
new file mode 100644
index 0000000000..57b8106733
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# prepare the table
+statement ok
+CREATE EXTERNAL TABLE delta_encoding_required_column (
+    c_customer_sk INT NOT NULL,
+    c_current_cdemo_sk INT NOT NULL
+)
+STORED AS CSV
+WITH ORDER (
+    c_customer_sk DESC,
+    c_current_cdemo_sk DESC
+)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+# test for substitute CAST senario
+query TT
+EXPLAIN
+SELECT 
+    CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big,
+    c_current_cdemo_sk
+FROM delta_encoding_required_column
+ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
+----
+logical_plan
+Sort: c_customer_sk_big DESC NULLS FIRST, 
delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
+--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS 
c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
+----TableScan: delta_encoding_required_column projection=[c_customer_sk, 
c_current_cdemo_sk]
+physical_plan
+SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
+--ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, 
c_current_cdemo_sk@1 as c_current_cdemo_sk]
+----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, 
projection=[c_customer_sk, c_current_cdemo_sk], 
output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], 
has_header=false
+
+# test for commom rename
+query TT
+EXPLAIN
+SELECT 
+    c_customer_sk AS c_customer_sk_big,
+    c_current_cdemo_sk
+FROM delta_encoding_required_column
+ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
+----
+logical_plan
+Sort: c_customer_sk_big DESC NULLS FIRST, 
delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
+--Projection: delta_encoding_required_column.c_customer_sk AS 
c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
+----TableScan: delta_encoding_required_column projection=[c_customer_sk, 
c_current_cdemo_sk]
+physical_plan
+ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, 
c_current_cdemo_sk@1 as c_current_cdemo_sk]
+--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, 
projection=[c_customer_sk, c_current_cdemo_sk], 
output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], 
has_header=false
+
+
+# test for cast Utf8
+query TT
+EXPLAIN
+SELECT 
+    CAST(c_customer_sk AS STRING) AS c_customer_sk_big,
+    c_current_cdemo_sk
+FROM delta_encoding_required_column
+ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
+----
+logical_plan
+Sort: c_customer_sk_big DESC NULLS FIRST, 
delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
+--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS 
c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
+----TableScan: delta_encoding_required_column projection=[c_customer_sk, 
c_current_cdemo_sk]
+physical_plan
+SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
+--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
+----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, 
c_current_cdemo_sk@1 as c_current_cdemo_sk]
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, 
projection=[c_customer_sk, c_current_cdemo_sk], 
output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], 
has_header=false

Reply via email to