This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 453a45aa79 fix: issue #8838 discard extra sort when sorted element is
wrapped (#9127)
453a45aa79 is described below
commit 453a45aa796fe798987057ef70a184a16d454660
Author: Lordworms <[email protected]>
AuthorDate: Tue Feb 20 01:47:46 2024 -0600
fix: issue #8838 discard extra sort when sorted element is wrapped (#9127)
* fix: issue #8838 discard extra sort when sorted element is wrapped
fix: issue #8838 discard extra sort when sorted element is wrapped
fix: issue #8838 discard extra sort when sorted element is wrapped
* fix bugs
* fix bugs
* fix bugs
* fix:bugs
* adding tests
* adding cast UTF8 type and diable scalarfunction situation
* fix typo
---
.../physical-expr/src/equivalence/properties.rs | 93 ++++++++++++++++++++--
datafusion/physical-expr/src/expressions/cast.rs | 30 +++++--
datafusion/physical-plan/src/projection.rs | 16 ++--
.../test_files/monotonic_projection_test.slt | 86 ++++++++++++++++++++
4 files changed, 209 insertions(+), 16 deletions(-)
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 386e74b4a6..9e60ffe748 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::expressions::CastExpr;
+use arrow_schema::SchemaRef;
+use datafusion_common::{JoinSide, JoinType};
+use indexmap::IndexSet;
+use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
@@ -31,12 +36,8 @@ use crate::{
PhysicalSortRequirement,
};
-use arrow_schema::{SchemaRef, SortOptions};
+use arrow_schema::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{JoinSide, JoinType};
-
-use indexmap::IndexSet;
-use itertools::Itertools;
/// A `EquivalenceProperties` object stores useful information related to a
schema.
/// Currently, it keeps track of:
@@ -426,6 +427,87 @@ impl EquivalenceProperties {
(!meet.is_empty()).then_some(meet)
}
+ /// we substitute the ordering according to input expression type, this is
a simplified version
+ /// In this case, we just substitute when the expression satisfy the
following confition
+ /// I. just have one column and is a CAST expression
+ /// II. just have one parameter and is a ScalarFUnctionexpression and it
is monotonic
+ /// TODO: we could precompute all the senario that is computable, for
example: atan(x + 1000) should also be substituted if
+ /// x is DESC or ASC
+ pub fn substitute_ordering_component(
+ matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
+ sort_expr: &[PhysicalSortExpr],
+ schema: SchemaRef,
+ ) -> Vec<PhysicalSortExpr> {
+ sort_expr
+ .iter()
+ .filter(|sort_expr| {
+ matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
+ })
+ .map(|sort_expr| {
+ let referring_exprs: Vec<_> = matching_exprs
+ .iter()
+ .filter(|matched| expr_refers(matched, &sort_expr.expr))
+ .cloned()
+ .collect();
+ // does not referring to any matching component, we just skip
it
+
+ if referring_exprs.len() == 1 {
+ // we check whether this expression is substitutable or not
+ let r_expr = referring_exprs[0].clone();
+ if let Some(cast_expr) =
r_expr.as_any().downcast_ref::<CastExpr>() {
+ // we need to know whether the Cast Expr matches or not
+ let expr_type =
+ sort_expr.expr.data_type(schema.as_ref()).unwrap();
+ if cast_expr.expr.eq(&sort_expr.expr)
+ && cast_expr.is_bigger_cast(expr_type)
+ {
+ PhysicalSortExpr {
+ expr: r_expr.clone(),
+ options: sort_expr.options,
+ }
+ } else {
+ sort_expr.clone()
+ }
+ } else {
+ sort_expr.clone()
+ }
+ } else {
+ sort_expr.clone()
+ }
+ })
+ .collect()
+ }
+ /// In projection, supposed we have a input function 'A DESC B DESC' and
the output shares the same expression
+ /// with A and B, we could surely use the ordering of the original
ordering, However, if the A has been changed,
+ /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we
continue using the original ordering
+ /// Since it would cause bug in dependency constructions, we should
substitute the input order in order to get correct
+ /// dependency map, happen in issue 8838:
<https://github.com/apache/arrow-datafusion/issues/8838>
+ pub fn substitute_oeq_class(
+ &mut self,
+ exprs: &[(Arc<dyn PhysicalExpr>, String)],
+ mapping: &ProjectionMapping,
+ schema: SchemaRef,
+ ) {
+ let matching_exprs: Arc<Vec<_>> = Arc::new(
+ exprs
+ .iter()
+ .filter(|(expr, _)| mapping.iter().any(|(source, _)|
source.eq(expr)))
+ .map(|(source, _)| source)
+ .collect(),
+ );
+ let orderings = std::mem::take(&mut self.oeq_class.orderings);
+ let new_order = orderings
+ .into_iter()
+ .map(move |order| {
+ Self::substitute_ordering_component(
+ matching_exprs.clone(),
+ &order,
+ schema.clone(),
+ )
+ })
+ .collect();
+ self.oeq_class = OrderingEquivalenceClass::new(new_order);
+ }
/// Projects argument `expr` according to `projection_mapping`, taking
/// equivalences into account.
///
@@ -564,7 +646,6 @@ impl EquivalenceProperties {
// Get dependency map for existing orderings:
let dependency_map = self.construct_dependency_map(&mapping);
-
let orderings = mapping.iter().flat_map(|(source, target)| {
referred_dependencies(&dependency_map, source)
.into_iter()
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index 0c4ed3c125..b0e175e711 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
+use crate::PhysicalExpr;
use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-
-use crate::physical_expr::down_cast_any_ref;
-use crate::sort_properties::SortProperties;
-use crate::PhysicalExpr;
+use DataType::*;
use arrow::compute::{can_cast_types, kernels, CastOptions};
use arrow::datatypes::{DataType, Schema};
@@ -41,7 +41,7 @@ const DEFAULT_CAST_OPTIONS: CastOptions<'static> =
CastOptions {
#[derive(Debug, Clone)]
pub struct CastExpr {
/// The expression to cast
- expr: Arc<dyn PhysicalExpr>,
+ pub expr: Arc<dyn PhysicalExpr>,
/// The data type to cast to
cast_type: DataType,
/// Cast options
@@ -76,6 +76,26 @@ impl CastExpr {
pub fn cast_options(&self) -> &CastOptions<'static> {
&self.cast_options
}
+ pub fn is_bigger_cast(&self, src: DataType) -> bool {
+ if src == self.cast_type {
+ return true;
+ }
+ matches!(
+ (src, &self.cast_type),
+ (Int8, Int16 | Int32 | Int64)
+ | (Int16, Int32 | Int64)
+ | (Int32, Int64)
+ | (UInt8, UInt16 | UInt32 | UInt64)
+ | (UInt16, UInt32 | UInt64)
+ | (UInt32, UInt64)
+ | (
+ Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
+ Float32 | Float64
+ )
+ | (Int64 | UInt64, Float64)
+ | (Utf8, LargeUtf8)
+ )
+ }
}
impl fmt::Display for CastExpr {
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index cc2ab62049..51423d37e7 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -70,7 +70,6 @@ impl ProjectionExec {
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();
-
let fields: Result<Vec<Field>> = expr
.iter()
.map(|(e, name)| {
@@ -95,7 +94,10 @@ impl ProjectionExec {
// construct a map from the input expressions to the output expression
of the Projection
let projection_mapping = ProjectionMapping::try_new(&expr,
&input_schema)?;
- let input_eqs = input.equivalence_properties();
+ let mut input_eqs = input.equivalence_properties();
+
+ input_eqs.substitute_oeq_class(&expr, &projection_mapping,
input_schema.clone());
+
let project_eqs = input_eqs.project(&projection_mapping,
schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();
@@ -201,9 +203,13 @@ impl ExecutionPlan for ProjectionExec {
}
fn equivalence_properties(&self) -> EquivalenceProperties {
- self.input
- .equivalence_properties()
- .project(&self.projection_mapping, self.schema())
+ let mut equi_properties = self.input.equivalence_properties();
+ equi_properties.substitute_oeq_class(
+ &self.expr,
+ &self.projection_mapping,
+ self.input.schema().clone(),
+ );
+ equi_properties.project(&self.projection_mapping, self.schema())
}
fn with_new_children(
diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt
b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt
new file mode 100644
index 0000000000..57b8106733
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# prepare the table
+statement ok
+CREATE EXTERNAL TABLE delta_encoding_required_column (
+ c_customer_sk INT NOT NULL,
+ c_current_cdemo_sk INT NOT NULL
+)
+STORED AS CSV
+WITH ORDER (
+ c_customer_sk DESC,
+ c_current_cdemo_sk DESC
+)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+# test for substitute CAST senario
+query TT
+EXPLAIN
+SELECT
+ CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big,
+ c_current_cdemo_sk
+FROM delta_encoding_required_column
+ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
+----
+logical_plan
+Sort: c_customer_sk_big DESC NULLS FIRST,
delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
+--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS
c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
+----TableScan: delta_encoding_required_column projection=[c_customer_sk,
c_current_cdemo_sk]
+physical_plan
+SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
+--ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big,
c_current_cdemo_sk@1 as c_current_cdemo_sk]
+----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]},
projection=[c_customer_sk, c_current_cdemo_sk],
output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC],
has_header=false
+
+# test for commom rename
+query TT
+EXPLAIN
+SELECT
+ c_customer_sk AS c_customer_sk_big,
+ c_current_cdemo_sk
+FROM delta_encoding_required_column
+ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
+----
+logical_plan
+Sort: c_customer_sk_big DESC NULLS FIRST,
delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
+--Projection: delta_encoding_required_column.c_customer_sk AS
c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
+----TableScan: delta_encoding_required_column projection=[c_customer_sk,
c_current_cdemo_sk]
+physical_plan
+ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big,
c_current_cdemo_sk@1 as c_current_cdemo_sk]
+--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]},
projection=[c_customer_sk, c_current_cdemo_sk],
output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC],
has_header=false
+
+
+# test for cast Utf8
+query TT
+EXPLAIN
+SELECT
+ CAST(c_customer_sk AS STRING) AS c_customer_sk_big,
+ c_current_cdemo_sk
+FROM delta_encoding_required_column
+ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC;
+----
+logical_plan
+Sort: c_customer_sk_big DESC NULLS FIRST,
delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST
+--Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS
c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk
+----TableScan: delta_encoding_required_column projection=[c_customer_sk,
c_current_cdemo_sk]
+physical_plan
+SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
+--SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC]
+----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big,
c_current_cdemo_sk@1 as c_current_cdemo_sk]
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]},
projection=[c_customer_sk, c_current_cdemo_sk],
output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC],
has_header=false