This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b0d24912ac Extend Ordering Equivalence Support (#6956)
b0d24912ac is described below
commit b0d24912acd9d86ca6ce3af26b4d845ec250048d
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jul 17 15:41:09 2023 +0300
Extend Ordering Equivalence Support (#6956)
* initial implemantation
* minor changes
* Fix bug, ordering equivalence random head
* minor changes
* Add ordering equivalence for sort merge join
* Add ordering equivalence for sort merge join
* Update comment
* Add ordering equivalence support for hash join
* Code enhancements/comment improvements
* Add projection cast handling
* Fix output ordering for sort merge join
* projection bug fix
* Minor changes
* minor changes
* simplify sort_merge_join
* Update equivalence implementation
* Update cast implementation
* More idiomatic code
* simplifications
* remove unnecessary code
* Comment improvements
* Fix formatting
* Address reviews
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../core/src/physical_plan/coalesce_batches.rs | 12 +-
.../core/src/physical_plan/coalesce_partitions.rs | 9 +-
datafusion/core/src/physical_plan/filter.rs | 15 +-
.../core/src/physical_plan/joins/hash_join.rs | 101 ++++++----
.../src/physical_plan/joins/sort_merge_join.rs | 163 ++++++++++++----
datafusion/core/src/physical_plan/joins/utils.rs | 98 ++++++++--
datafusion/core/src/physical_plan/limit.rs | 29 +--
datafusion/core/src/physical_plan/projection.rs | 22 ++-
.../core/src/physical_plan/repartition/mod.rs | 25 ++-
.../physical_plan/sorts/sort_preserving_merge.rs | 21 ++-
.../core/tests/sqllogictests/test_files/joins.slt | 206 +++++++++++++++++++++
.../core/tests/sqllogictests/test_files/window.slt | 50 +++++
datafusion/physical-expr/src/equivalence.rs | 201 ++++++++++++--------
datafusion/physical-expr/src/expressions/cast.rs | 2 +-
datafusion/physical-expr/src/lib.rs | 3 +-
datafusion/physical-expr/src/sort_expr.rs | 9 +-
datafusion/physical-expr/src/utils.rs | 20 ++
17 files changed, 769 insertions(+), 217 deletions(-)
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 9454a36af7..994f75ce4c 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -27,20 +27,22 @@ use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream,
};
-use datafusion_common::Result;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
use datafusion_execution::TaskContext;
-use futures::stream::{Stream, StreamExt};
-use log::trace;
+use datafusion_physical_expr::OrderingEquivalenceProperties;
use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, MetricsSet};
use super::DisplayAs;
use super::{metrics::ExecutionPlanMetricsSet, Statistics};
+use futures::stream::{Stream, StreamExt};
+use log::trace;
+
/// CoalesceBatchesExec combines small batches into larger batches for more
efficient use of
/// vectorized processing by upstream operators.
#[derive(Debug)]
@@ -134,6 +136,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
self.input.equivalence_properties()
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ self.input.ordering_equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 82f74a62bb..bc48b5f5e1 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -21,18 +21,17 @@
use std::any::Any;
use std::sync::Arc;
-use arrow::datatypes::SchemaRef;
-
use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::stream::{ObservedStream, RecordBatchReceiverStream};
-use super::{DisplayAs, Statistics};
+use super::{DisplayAs, SendableRecordBatchStream, Statistics};
+
use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
};
-use datafusion_common::{DataFusionError, Result};
-use super::SendableRecordBatchStream;
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{DataFusionError, Result};
use datafusion_execution::TaskContext;
/// Merge execution plan executes partitions in parallel and combines them
into a single
diff --git a/datafusion/core/src/physical_plan/filter.rs
b/datafusion/core/src/physical_plan/filter.rs
index d2cfc3ce8a..6cb2490ee4 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -27,24 +27,27 @@ use super::expressions::PhysicalSortExpr;
use super::{
ColumnStatistics, DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics,
};
+
use crate::physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
Partitioning,
PhysicalExpr,
};
+
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::{split_conjunction, AnalysisContext};
-
-use log::trace;
+use datafusion_physical_expr::{
+ split_conjunction, AnalysisContext, OrderingEquivalenceProperties,
+};
-use datafusion_execution::TaskContext;
use futures::stream::{Stream, StreamExt};
+use log::trace;
/// FilterExec evaluates a boolean predicate against all input batches to
determine which rows to
/// include in its output batches.
@@ -148,6 +151,10 @@ impl ExecutionPlan for FilterExec {
input_properties
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ self.input.ordering_equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 47403a70f4..ce1d6dbcc0 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -18,46 +18,15 @@
//! Defines the join plan for executing partitions in parallel and then
joining the results
//! into a set of partitions.
-use ahash::RandomState;
-use arrow::array::Array;
-use arrow::array::{
- Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
- StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array,
UInt64Array,
- UInt8Array,
-};
-use arrow::buffer::BooleanBuffer;
-use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder};
-use arrow::datatypes::{ArrowNativeType, DataType};
-use arrow::datatypes::{Schema, SchemaRef};
-use arrow::record_batch::RecordBatch;
-use arrow::{
- array::{
- ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
- DictionaryArray, FixedSizeBinaryArray, LargeStringArray,
PrimitiveArray,
- Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
- Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
- TimestampSecondArray, UInt32BufferBuilder, UInt64BufferBuilder,
- },
- datatypes::{
- Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type,
UInt64Type,
- UInt8Type,
- },
- util::bit_util,
-};
-use arrow_array::cast::downcast_array;
-use arrow_schema::ArrowError;
-use futures::{ready, Stream, StreamExt, TryStreamExt};
use std::fmt;
use std::mem::size_of;
use std::sync::Arc;
use std::task::Poll;
use std::{any::Any, usize, vec};
-use datafusion_common::cast::{as_dictionary_array, as_string_array};
-use datafusion_execution::memory_pool::MemoryReservation;
-
use crate::physical_plan::joins::utils::{
- adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
+ add_offset_to_ordering_equivalence_classes, adjust_indices_by_join_type,
+ apply_join_filter_to_indices, build_batch_from_indices,
calculate_hash_join_output_order, get_final_indices_from_bit_map,
need_produce_result_in_final, JoinSide,
};
@@ -68,6 +37,7 @@ use crate::physical_plan::{
expressions::Column,
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
+ joins::hash_join_utils::JoinHashMap,
joins::utils::{
adjust_right_output_partitioning, build_join_schema,
check_join_is_valid,
combine_join_equivalence_properties, estimate_join_statistics,
@@ -78,17 +48,42 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning,
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
-use arrow::array::BooleanBufferBuilder;
-use arrow::datatypes::TimeUnit;
-use datafusion_common::JoinType;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_execution::{memory_pool::MemoryConsumer, TaskContext};
use super::{
utils::{OnceAsync, OnceFut},
PartitionMode,
};
-use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
+
+use arrow::buffer::BooleanBuffer;
+use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder};
+use arrow::record_batch::RecordBatch;
+use arrow::{
+ array::{
+ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Date32Array,
Date64Array,
+ Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array,
+ Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
LargeStringArray,
+ PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
+ Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
+ UInt16Array, UInt32Array, UInt32BufferBuilder, UInt64Array,
UInt64BufferBuilder,
+ UInt8Array,
+ },
+ datatypes::{
+ ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, Int8Type,
Schema,
+ SchemaRef, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+ },
+ util::bit_util,
+};
+use arrow_array::cast::downcast_array;
+use arrow_schema::ArrowError;
+use datafusion_common::cast::{as_dictionary_array, as_string_array};
+use datafusion_common::{DataFusionError, JoinType, Result};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::OrderingEquivalenceProperties;
+
+use ahash::RandomState;
+use futures::{ready, Stream, StreamExt, TryStreamExt};
type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
@@ -381,6 +376,34 @@ impl ExecutionPlan for HashJoinExec {
)
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ let mut new_properties =
OrderingEquivalenceProperties::new(self.schema());
+ let left_columns_len = self.left.schema().fields.len();
+ let right_oeq_properties =
self.right.ordering_equivalence_properties();
+ match self.join_type {
+ JoinType::RightAnti | JoinType::RightSemi => {
+ // For `RightAnti` and `RightSemi` joins, the right table
schema remains valid.
+ // Hence, its ordering equivalence properties can be used as
is.
+
new_properties.extend(right_oeq_properties.classes().iter().cloned());
+ }
+ JoinType::Inner => {
+ // For `Inner` joins, the right table schema is no longer
valid.
+ // Size of the left table is added as an offset to the right
table
+ // columns when constructing the join output schema.
+ let updated_right_classes =
add_offset_to_ordering_equivalence_classes(
+ right_oeq_properties.classes(),
+ left_columns_len,
+ )
+ .unwrap();
+ new_properties.extend(updated_right_classes);
+ }
+ // In other cases, we cannot propagate ordering equivalences as
+ // the output ordering is not preserved.
+ _ => {}
+ }
+ new_properties
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.left.clone(), self.right.clone()]
}
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 2a628a6146..9b8e9e85cd 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -30,17 +30,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use arrow::array::*;
-use arrow::compute::{concat_batches, take, SortOptions};
-use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
-use arrow::error::ArrowError;
-use arrow::record_batch::RecordBatch;
-use datafusion_physical_expr::PhysicalSortRequirement;
-use futures::{Stream, StreamExt};
-
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::joins::utils::{
+ add_offset_to_lex_ordering, add_offset_to_ordering_equivalence_classes,
build_join_schema, check_join_is_valid,
combine_join_equivalence_properties,
estimate_join_statistics, partitioned_join_output_partitioning, JoinOn,
};
@@ -50,13 +43,20 @@ use crate::physical_plan::{
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
-use datafusion_common::DataFusionError;
-use datafusion_common::JoinType;
-use datafusion_common::Result;
+
+use arrow::array::*;
+use arrow::compute::{concat_batches, take, SortOptions};
+use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
+use arrow::error::ArrowError;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, JoinType, Result};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
+use datafusion_physical_expr::utils::normalize_sort_exprs;
+use datafusion_physical_expr::{OrderingEquivalenceProperties,
PhysicalSortRequirement};
-use datafusion_common::tree_node::{Transformed, TreeNode};
+use futures::{Stream, StreamExt};
+use itertools::Itertools;
/// join execution plan executes partitions in parallel and combines them into
a set of
/// partitions.
@@ -86,6 +86,38 @@ pub struct SortMergeJoinExec {
pub(crate) null_equals_null: bool,
}
+/// Replaces the right column (first index in the `on_column` tuple) with
+/// the left column (zeroth index in the tuple) inside `right_ordering`.
+fn replace_on_columns_of_right_ordering(
+ on_columns: &[(Column, Column)],
+ right_ordering: &mut [PhysicalSortExpr],
+ left_columns_len: usize,
+) {
+ for (left_col, right_col) in on_columns {
+ let right_col =
+ Column::new(right_col.name(), right_col.index() +
left_columns_len);
+ for item in right_ordering.iter_mut() {
+ if let Some(col) = item.expr.as_any().downcast_ref::<Column>() {
+ if right_col.eq(col) {
+ item.expr = Arc::new(left_col.clone()) as _;
+ }
+ }
+ }
+ }
+}
+
+/// Merge left and right sort expressions, checking for duplicates.
+fn merge_vectors(
+ left: &[PhysicalSortExpr],
+ right: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortExpr> {
+ left.iter()
+ .cloned()
+ .chain(right.iter().cloned())
+ .unique()
+ .collect()
+}
+
impl SortMergeJoinExec {
/// Tries to create a new [SortMergeJoinExec].
/// The inputs are sorted using `sort_options` are applied to the columns
in the `on`
@@ -134,10 +166,26 @@ impl SortMergeJoinExec {
.unzip();
let output_ordering = match join_type {
- JoinType::Inner
- | JoinType::Left
- | JoinType::LeftSemi
- | JoinType::LeftAnti => {
+ JoinType::Inner => {
+ match (left.output_ordering(), right.output_ordering()) {
+ // If both sides have orderings, ordering of the right
hand side
+ // can be appended to the left side ordering for inner
joins.
+ (Some(left_ordering), Some(right_ordering)) => {
+ let left_columns_len = left.schema().fields.len();
+ let mut right_ordering =
+ add_offset_to_lex_ordering(right_ordering,
left_columns_len)?;
+ replace_on_columns_of_right_ordering(
+ &on,
+ &mut right_ordering,
+ left_columns_len,
+ );
+ Some(merge_vectors(left_ordering, &right_ordering))
+ }
+ (Some(left_ordering), _) => Some(left_ordering.to_vec()),
+ _ => None,
+ }
+ }
+ JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
left.output_ordering().map(|sort_exprs| sort_exprs.to_vec())
}
JoinType::RightSemi | JoinType::RightAnti => right
@@ -148,29 +196,7 @@ impl SortMergeJoinExec {
right
.output_ordering()
.map(|sort_exprs| {
- let new_sort_exprs: Result<Vec<PhysicalSortExpr>> =
sort_exprs
- .iter()
- .map(|e| {
- let new_expr =
- e.expr.clone().transform_down(&|e| match e
- .as_any()
- .downcast_ref::<Column>(
- ) {
- Some(col) => {
-
Ok(Transformed::Yes(Arc::new(Column::new(
- col.name(),
- left_columns_len + col.index(),
- ))))
- }
- None => Ok(Transformed::No(e)),
- });
- Ok(PhysicalSortExpr {
- expr: new_expr?,
- options: e.options,
- })
- })
- .collect();
- new_sort_exprs
+ add_offset_to_lex_ordering(sort_exprs,
left_columns_len)
})
.map_or(Ok(None), |v| v.map(Some))?
}
@@ -295,6 +321,65 @@ impl ExecutionPlan for SortMergeJoinExec {
)
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ let mut new_properties =
OrderingEquivalenceProperties::new(self.schema());
+ let left_columns_len = self.left.schema().fields.len();
+ let left_oeq_properties = self.left.ordering_equivalence_properties();
+ let right_oeq_properties =
self.right.ordering_equivalence_properties();
+ match self.join_type {
+ JoinType::Inner => {
+ // Since left side is the stream side for this `SortMergeJoin`
implementation,
+ // global ordering of the left table is preserved at the
output. Hence, left
+ // side ordering equivalences are still valid.
+
new_properties.extend(left_oeq_properties.classes().iter().cloned());
+ if let Some(output_ordering) = &self.output_ordering {
+ // Update right table ordering equivalence expression
indices; i.e.
+ // add left table size as an offset.
+ let updated_right_oeq_classes =
+ add_offset_to_ordering_equivalence_classes(
+ right_oeq_properties.classes(),
+ left_columns_len,
+ )
+ .unwrap();
+ let left_output_ordering =
self.left.output_ordering().unwrap_or(&[]);
+ // Right side ordering equivalence properties should be
prepended with
+ // those of the left side while constructing output
ordering equivalence
+ // properties for `SortMergeJoin`. As an example;
+ //
+ // If the right table ordering equivalences contain `b
ASC`, and the output
+ // ordering of the left table is `a ASC`, then the
ordering equivalence `b ASC`
+ // for the right table should be converted to `a ASC, b
ASC` before it is added
+ // to the ordering equivalences of `SortMergeJoinExec`.
+ for oeq_class in updated_right_oeq_classes {
+ for ordering in oeq_class.others() {
+ // Entries inside ordering equivalence should be
normalized before insertion.
+ let normalized_ordering = normalize_sort_exprs(
+ ordering,
+ self.equivalence_properties().classes(),
+ &[],
+ );
+ let new_oeq_ordering =
+ merge_vectors(left_output_ordering,
&normalized_ordering);
+ new_properties.add_equal_conditions((
+ output_ordering,
+ &new_oeq_ordering,
+ ));
+ }
+ }
+ }
+ }
+ JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
+
new_properties.extend(left_oeq_properties.classes().iter().cloned());
+ }
+ JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
+
new_properties.extend(right_oeq_properties.classes().iter().cloned());
+ }
+ // All ordering equivalences from left and/or right sides are
invalidated.
+ _ => {}
+ }
+ new_properties
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.left.clone(), self.right.clone()]
}
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index d4be2cfe10..ea9c11a3ed 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -17,17 +17,6 @@
//! Join related functionality used both on logical and physical plans
-use arrow::array::{
- downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
- UInt32Builder, UInt64Array,
-};
-use arrow::compute;
-use arrow::datatypes::{Field, Schema, SchemaBuilder};
-use arrow::record_batch::{RecordBatch, RecordBatchOptions};
-use datafusion_physical_expr::expressions::Column;
-use futures::future::{BoxFuture, Shared};
-use futures::{ready, FutureExt};
-use parking_lot::Mutex;
use std::cmp::max;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
@@ -36,21 +25,32 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::usize;
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{ScalarValue, SharedResult};
-
-use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_physical_expr::{EquivalentClass, PhysicalExpr,
PhysicalSortExpr};
-
-use datafusion_common::JoinType;
-use datafusion_common::{DataFusionError, Result};
-
use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet,
MetricBuilder};
use crate::physical_plan::SchemaRef;
use crate::physical_plan::{
ColumnStatistics, EquivalenceProperties, ExecutionPlan, Partitioning,
Statistics,
};
+use arrow::array::{
+ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
+ UInt32Builder, UInt64Array,
+};
+use arrow::compute;
+use arrow::datatypes::{Field, Schema, SchemaBuilder};
+use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{DataFusionError, JoinType, Result, ScalarValue,
SharedResult};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+ EquivalentClass, LexOrdering, LexOrderingRef, OrderingEquivalentClass,
PhysicalExpr,
+ PhysicalSortExpr,
+};
+
+use futures::future::{BoxFuture, Shared};
+use futures::{ready, FutureExt};
+use parking_lot::Mutex;
+
/// The on clause of the join, as vector of (left, right) columns.
pub type JoinOn = Vec<(Column, Column)>;
/// Reference for JoinOn.
@@ -283,6 +283,64 @@ pub fn cross_join_equivalence_properties(
new_properties
}
+/// Adds the `offset` value to `Column` indices inside `expr`. This function is
+/// generally used during the update of the right table schema in join
operations.
+pub(crate) fn add_offset_to_expr(
+ expr: Arc<dyn PhysicalExpr>,
+ offset: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+ Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
+ col.name(),
+ offset + col.index(),
+ )))),
+ None => Ok(Transformed::No(e)),
+ })
+}
+
+/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`.
+pub(crate) fn add_offset_to_sort_expr(
+ sort_expr: &PhysicalSortExpr,
+ offset: usize,
+) -> Result<PhysicalSortExpr> {
+ Ok(PhysicalSortExpr {
+ expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?,
+ options: sort_expr.options,
+ })
+}
+
+/// Adds the `offset` value to `Column` indices for each `sort_expr.expr`
+/// inside `sort_exprs`.
+pub(crate) fn add_offset_to_lex_ordering(
+ sort_exprs: LexOrderingRef,
+ offset: usize,
+) -> Result<LexOrdering> {
+ sort_exprs
+ .iter()
+ .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset))
+ .collect()
+}
+
+/// Adds the `offset` value to `Column` indices for all expressions inside the
+/// given `OrderingEquivalentClass`es.
+pub(crate) fn add_offset_to_ordering_equivalence_classes(
+ oeq_classes: &[OrderingEquivalentClass],
+ offset: usize,
+) -> Result<Vec<OrderingEquivalentClass>> {
+ oeq_classes
+ .iter()
+ .map(|prop| {
+ let new_head = add_offset_to_lex_ordering(prop.head(), offset)?;
+ let new_others = prop
+ .others()
+ .iter()
+ .map(|ordering| add_offset_to_lex_ordering(ordering, offset))
+ .collect::<Result<Vec<_>>>()?;
+ Ok(OrderingEquivalentClass::new(new_head, new_others))
+ })
+ .collect()
+}
+
impl Display for JoinSide {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index 572cea006c..93f6cd7c2c 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -17,9 +17,6 @@
//! Defines the LIMIT plan
-use futures::stream::Stream;
-use futures::stream::StreamExt;
-use log::trace;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
@@ -28,19 +25,21 @@ use std::task::{Context, Poll};
use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning,
};
+
+use super::expressions::PhysicalSortExpr;
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics};
+
use arrow::array::ArrayRef;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use datafusion_common::{DataFusionError, Result};
-
-use super::expressions::PhysicalSortExpr;
-use super::DisplayAs;
-use super::{
- metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
- RecordBatchStream, SendableRecordBatchStream, Statistics,
-};
-
use datafusion_execution::TaskContext;
+use datafusion_physical_expr::OrderingEquivalenceProperties;
+
+use futures::stream::Stream;
+use futures::stream::StreamExt;
+use log::trace;
/// Limit execution plan
#[derive(Debug)]
@@ -140,6 +139,10 @@ impl ExecutionPlan for GlobalLimitExec {
self.input.equivalence_properties()
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ self.input.ordering_equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -317,6 +320,10 @@ impl ExecutionPlan for LocalLimitExec {
self.input.equivalence_properties()
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ self.input.ordering_equivalence_properties()
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index c6845c7afe..dac5227503 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -41,6 +41,8 @@ use super::expressions::{Column, PhysicalSortExpr};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics};
+use
datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast;
+use datafusion_physical_expr::expressions::CastExpr;
use datafusion_physical_expr::{
normalize_out_expr_with_columns_map, project_equivalence_properties,
project_ordering_equivalence_properties, OrderingEquivalenceProperties,
@@ -245,11 +247,29 @@ impl ExecutionPlan for ProjectionExec {
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
let mut new_properties =
OrderingEquivalenceProperties::new(self.schema());
+ if self.output_ordering.is_none() {
+ // If there is no output ordering, return an "empty" equivalence
set:
+ return new_properties;
+ }
+
+ let mut input_oeq = self.input().ordering_equivalence_properties();
+ // Stores cast expression and its `Column` version in the output:
+ let mut cast_exprs: Vec<(CastExpr, Column)> = vec![];
+ for (idx, (expr, name)) in self.expr.iter().enumerate() {
+ if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
+ let target_col = Column::new(name, idx);
+ cast_exprs.push((cast_expr.clone(), target_col));
+ }
+ }
+
+ update_ordering_equivalence_with_cast(&cast_exprs, &mut input_oeq);
+
project_ordering_equivalence_properties(
- self.input.ordering_equivalence_properties(),
+ input_oeq,
&self.columns_map,
&mut new_properties,
);
+
new_properties
}
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 6175c19576..c5b8b2da42 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -24,19 +24,16 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, vec};
+use crate::physical_plan::common::transpose;
use crate::physical_plan::hash_utils::create_hashes;
+use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::repartition::distributor_channels::{
channels, partition_aware_channels,
};
+use crate::physical_plan::sorts::streaming_merge;
use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
Statistics,
};
-use arrow::array::{ArrayRef, UInt64Builder};
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_execution::memory_pool::MemoryConsumer;
-use log::trace;
use self::distributor_channels::{DistributionReceiver, DistributionSender};
@@ -45,14 +42,18 @@ use super::expressions::PhysicalSortExpr;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream};
-use crate::physical_plan::common::transpose;
-use crate::physical_plan::metrics::BaselineMetrics;
-use crate::physical_plan::sorts::streaming_merge;
+use arrow::array::{ArrayRef, UInt64Builder};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalExpr};
+
use futures::stream::Stream;
use futures::{FutureExt, StreamExt};
use hashbrown::HashMap;
+use log::trace;
use parking_lot::Mutex;
use tokio::task::JoinHandle;
@@ -399,6 +400,10 @@ impl ExecutionPlan for RepartitionExec {
self.input.equivalence_properties()
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ self.input.ordering_equivalence_properties()
+ }
+
fn execute(
&self,
partition: usize,
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 8262a18f23..0dad1d30dd 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -20,22 +20,25 @@
use std::any::Any;
use std::sync::Arc;
-use arrow::datatypes::SchemaRef;
-use log::{debug, trace};
-
use crate::physical_plan::common::spawn_buffered;
+use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::sorts::streaming_merge;
-use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
- expressions::PhysicalSortExpr, DisplayFormatType, Distribution,
ExecutionPlan,
- Partitioning, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
};
+
+use arrow::datatypes::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
+use datafusion_physical_expr::{
+ EquivalenceProperties, OrderingEquivalenceProperties,
PhysicalSortRequirement,
+};
+
+use log::{debug, trace};
/// Sort preserving merge execution plan
///
@@ -163,6 +166,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
self.input.equivalence_properties()
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ self.input.ordering_equivalence_properties()
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index c2517ead0f..fdb76064ab 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -3060,3 +3060,209 @@ set datafusion.execution.target_partitions = 2;
statement ok
set datafusion.execution.batch_size = 4096;
+
+
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+CREATE EXTERNAL TABLE annotated_data (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# sort merge join should propagate ordering equivalence of the left side
+# for inner join. Hence final requirement rn1 ASC is already satisfied at
+# the end of SortMergeJoinExec.
+query TT
+EXPLAIN SELECT *
+ FROM (SELECT *, ROW_NUMBER() OVER() as rn1
+ FROM annotated_data ) as l_table
+ JOIN annotated_data as r_table
+ ON l_table.a = r_table.a
+ ORDER BY l_table.rn1
+----
+logical_plan
+Sort: l_table.rn1 ASC NULLS LAST
+--Inner Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b,
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [rn1@5 ASC NULLS LAST]
+--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+
+# sort merge join should propagate ordering equivalence of the right side
+# for right join. Hence final requirement rn1 ASC is already satisfied at
+# the end of SortMergeJoinExec.
+query TT
+EXPLAIN SELECT *
+ FROM annotated_data as l_table
+ RIGHT JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+ FROM annotated_data ) as r_table
+ ON l_table.a = r_table.a
+ ORDER BY r_table.rn1
+----
+logical_plan
+Sort: r_table.rn1 ASC NULLS LAST
+--Right Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b,
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [rn1@10 ASC NULLS LAST]
+--SortMergeJoin: join_type=Right, on=[(a@1, a@1)]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+
+# SortMergeJoin should add ordering equivalences of
+# right table as lexicographical append to the global ordering
+# below query shouldn't add any SortExec for order by clause.
+# since its requirement is already satisfied at the output of SortMergeJoinExec
+query TT
+EXPLAIN SELECT *
+ FROM (SELECT *, ROW_NUMBER() OVER() as rn1
+ FROM annotated_data ) as l_table
+ JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+ FROM annotated_data ) as r_table
+ ON l_table.a = r_table.a
+ ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1
+----
+logical_plan
+Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST, l_table.c ASC NULLS
LAST, r_table.rn1 ASC NULLS LAST
+--Inner Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b,
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b,
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11
ASC NULLS LAST]
+--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as
d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as
rn1]
+----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+# to preserve ordering until Hash join set target partition to 1.
+# Otherwise RepartitionExec s inserted may broke ordering.
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+# hash join should propagate ordering equivalence of the right side for INNER
join.
+# Hence final requirement rn1 ASC is already satisfied at the end of
HashJoinExec.
+query TT
+EXPLAIN SELECT *
+ FROM annotated_data as l_table
+ JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+ FROM annotated_data) as r_table
+ ON l_table.a = r_table.a
+ ORDER BY r_table.rn1
+----
+logical_plan
+Sort: r_table.rn1 ASC NULLS LAST
+--Inner Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------TableScan: annotated_data projection=[a0, a, b, c, d]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b,
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+CoalesceBatchesExec: target_batch_size=4096
+--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@1, a@1)]
+----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+
+# hash join should propagate ordering equivalence of the right side for RIGHT
ANTI join.
+# Hence final requirement rn1 ASC is already satisfied at the end of
HashJoinExec.
+query TT
+EXPLAIN SELECT *
+ FROM annotated_data as l_table
+ RIGHT ANTI JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
+ FROM annotated_data) as r_table
+ ON l_table.a = r_table.a
+ ORDER BY r_table.rn1
+----
+logical_plan
+Sort: r_table.rn1 ASC NULLS LAST
+--RightAnti Join: l_table.a = r_table.a
+----SubqueryAlias: l_table
+------TableScan: annotated_data projection=[a]
+----SubqueryAlias: r_table
+------Projection: annotated_data.a0, annotated_data.a, annotated_data.b,
annotated_data.c, annotated_data.d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
+----------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+CoalesceBatchesExec: target_batch_size=4096
+--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(a@0, a@1)]
+----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a],
output_ordering=[a@0 ASC], has_header=true
+----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+
+####
+# Config teardown
+####
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+drop table annotated_data;
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 21f76eb9cb..5d97b311d1 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2429,6 +2429,29 @@ GlobalLimitExec: skip=0, fetch=5
------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5,
c9], has_header=true
+# Ordering equivalence should be preserved during cast expression
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+ CAST(ROW_NUMBER() OVER(ORDER BY c9 DESC) as BIGINT) as
rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1 ASC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, CAST(ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS Int64) AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[c9@0 as c9, CAST(ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 AS Int64) as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field {
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }],
mode=[Sorted]
+------SortExec: expr=[c9@0 DESC]
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9],
has_header=true
+
# The following query has type error. We should test the error could be
detected
# from either the logical plan (when `skip_failed_rules` is set to `false`) or
# the physical plan (when `skip_failed_rules` is set to `true`).
@@ -3055,6 +3078,33 @@ statement error DataFusion error: Error during planning:
Aggregate ORDER BY is n
EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a ASC) OVER (order by a ASC) as
last_c
FROM annotated_data_infinite2
+# ordering equivalence information
+# should propagate through FilterExec, LimitExec, CoalesceBatchesExec, etc.
+# Below query should work without breaking pipeline
+query TT
+EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
+ FROM annotated_data_infinite2
+ ORDER BY rn1 ASC
+ LIMIT 5)
+ WHERE rn1 < 50
+ ORDER BY rn1 ASC
+----
+logical_plan
+Sort: rn1 ASC NULLS LAST
+--Filter: rn1 < UInt64(50)
+----Limit: skip=0, fetch=5
+------Sort: rn1 ASC NULLS LAST, fetch=5
+--------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.c,
annotated_data_infinite2.d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+----------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+------------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
+physical_plan
+CoalesceBatchesExec: target_batch_size=4096
+--FilterExec: rn1@5 < 50
+----GlobalLimitExec: skip=0, fetch=5
+------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as
d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC
NULLS LAST, c@3 ASC NULLS LAST], has_header=true
+
statement ok
drop table annotated_data_finite2
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index 4253917927..f458f4c709 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use crate::expressions::{BinaryExpr, Column};
+use crate::expressions::{CastExpr, Column};
use crate::{
normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
PhysicalSortExpr,
};
use arrow::datatypes::SchemaRef;
+use arrow_schema::Fields;
+use crate::utils::collect_columns;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::Arc;
@@ -114,26 +116,6 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
}
}
-// Helper function to calculate column info recursively
-fn get_column_indices_helper(
- indices: &mut Vec<(usize, String)>,
- expr: &Arc<dyn PhysicalExpr>,
-) {
- if let Some(col) = expr.as_any().downcast_ref::<Column>() {
- indices.push((col.index(), col.name().to_string()))
- } else if let Some(binary_expr) =
expr.as_any().downcast_ref::<BinaryExpr>() {
- get_column_indices_helper(indices, binary_expr.left());
- get_column_indices_helper(indices, binary_expr.right());
- };
-}
-
-/// Get index and name of each column that is in the expression (Can return
multiple entries for `BinaryExpr`s)
-fn get_column_indices(expr: &Arc<dyn PhysicalExpr>) -> Vec<(usize, String)> {
- let mut result = vec![];
- get_column_indices_helper(&mut result, expr);
- result
-}
-
/// `OrderingEquivalenceProperties` keeps track of columns that describe the
/// global ordering of the schema. These columns are not necessarily same; e.g.
/// ```text
@@ -232,34 +214,54 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
/// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]`
are ordering equivalent.
pub type OrderingEquivalentClass = EquivalentClass<LexOrdering>;
-impl OrderingEquivalentClass {
- /// This function extends ordering equivalences with alias information.
- /// For instance, assume column a and b are aliases,
- /// and column (a ASC), (c DESC) are ordering equivalent. We append (b
ASC) to ordering equivalence,
- /// since b is alias of colum a. After this function (a ASC), (c DESC), (b
ASC) would be ordering equivalent.
- fn update_with_aliases(&mut self, columns_map: &HashMap<Column,
Vec<Column>>) {
- for (column, columns) in columns_map {
- let col_expr = Arc::new(column.clone()) as Arc<dyn PhysicalExpr>;
- let mut to_insert = vec![];
- for ordering in
std::iter::once(&self.head).chain(self.others.iter()) {
- for (idx, item) in ordering.iter().enumerate() {
- if item.expr.eq(&col_expr) {
- for col in columns {
- let col_expr = Arc::new(col.clone()) as Arc<dyn
PhysicalExpr>;
- let mut normalized = self.head.clone();
- // Change the corresponding entry in the head with
the alias column:
- let entry = &mut normalized[idx];
- (entry.expr, entry.options) = (col_expr,
item.options);
- to_insert.push(normalized);
- }
- }
- }
- }
- for items in to_insert {
- self.insert(items);
+/// Update each expression in `ordering` with alias expressions. Assume
+/// `ordering` is `a ASC, b ASC` and `c` is alias of `b`. Then, the result
+/// will be `a ASC, c ASC`.
+fn update_with_alias(
+ mut ordering: LexOrdering,
+ oeq_alias_map: &[(Column, Column)],
+) -> LexOrdering {
+ for (source_col, target_col) in oeq_alias_map {
+ let source_col: Arc<dyn PhysicalExpr> = Arc::new(source_col.clone());
+ // Replace invalidated columns with its alias in the ordering
expression.
+ let target_col: Arc<dyn PhysicalExpr> = Arc::new(target_col.clone());
+ for item in ordering.iter_mut() {
+ if item.expr.eq(&source_col) {
+ // Change the corresponding entry with alias expression
+ item.expr = target_col.clone();
}
}
}
+ ordering
+}
+
+impl OrderingEquivalentClass {
+ /// This function updates ordering equivalences with alias information.
+ /// For instance, assume columns `a` and `b` are aliases (a as b), and
+ /// orderings `a ASC` and `c DESC` are equivalent. Here, we replace column
+ /// `a` with `b` in ordering equivalence expressions. After this function,
+ /// `a ASC`, `c DESC` will be converted to the `b ASC`, `c DESC`.
+ fn update_with_aliases(
+ &mut self,
+ oeq_alias_map: &[(Column, Column)],
+ fields: &Fields,
+ ) {
+ let is_head_invalid = self.head.iter().any(|sort_expr| {
+ collect_columns(&sort_expr.expr)
+ .iter()
+ .any(|col| is_column_invalid_in_new_schema(col, fields))
+ });
+ // If head is invalidated, update head with alias expressions
+ if is_head_invalid {
+ self.head = update_with_alias(self.head.clone(), oeq_alias_map);
+ } else {
+ let new_oeq_expr = update_with_alias(self.head.clone(),
oeq_alias_map);
+ self.insert(new_oeq_expr);
+ }
+ for ordering in self.others.clone().into_iter() {
+ self.insert(update_with_alias(ordering, oeq_alias_map));
+ }
+ }
}
/// This is a builder object facilitating incremental construction
@@ -342,6 +344,22 @@ impl OrderingEquivalenceBuilder {
}
}
+/// Checks whether column is still valid after projection.
+fn is_column_invalid_in_new_schema(column: &Column, fields: &Fields) -> bool {
+ let idx = column.index();
+ idx >= fields.len() || fields[idx].name() != column.name()
+}
+
+/// Gets first aliased version of `col` found in `alias_map`.
+fn get_alias_column(
+ col: &Column,
+ alias_map: &HashMap<Column, Vec<Column>>,
+) -> Option<Column> {
+ alias_map
+ .iter()
+ .find_map(|(column, columns)| column.eq(col).then(||
columns[0].clone()))
+}
+
/// This function applies the given projection to the given equivalence
/// properties to compute the resulting (projected) equivalence properties;
e.g.
/// 1) Adding an alias, which can introduce additional equivalence properties,
@@ -352,10 +370,21 @@ pub fn project_equivalence_properties(
alias_map: &HashMap<Column, Vec<Column>>,
output_eq: &mut EquivalenceProperties,
) {
+ // Get schema and fields of projection output
+ let schema = output_eq.schema();
+ let fields = schema.fields();
+
let mut eq_classes = input_eq.classes().to_vec();
for (column, columns) in alias_map {
let mut find_match = false;
for class in eq_classes.iter_mut() {
+ // If `self.head` is invalidated in the new schema, update head
+ // with this change `self.head` is not randomly assigned by one of
the entries from `self.others`
+ if is_column_invalid_in_new_schema(&class.head, fields) {
+ if let Some(alias_col) = get_alias_column(&class.head,
alias_map) {
+ class.head = alias_col;
+ }
+ }
if class.contains(column) {
for col in columns {
class.insert(col.clone());
@@ -370,15 +399,10 @@ pub fn project_equivalence_properties(
}
// Prune columns that are no longer in the schema from equivalences.
- let schema = output_eq.schema();
- let fields = schema.fields();
for class in eq_classes.iter_mut() {
let columns_to_remove = class
.iter()
- .filter(|column| {
- let idx = column.index();
- idx >= fields.len() || fields[idx].name() != column.name()
- })
+ .filter(|column| is_column_invalid_in_new_schema(column, fields))
.cloned()
.collect::<Vec<_>>();
for column in columns_to_remove {
@@ -402,25 +426,33 @@ pub fn project_ordering_equivalence_properties(
columns_map: &HashMap<Column, Vec<Column>>,
output_eq: &mut OrderingEquivalenceProperties,
) {
+ // Get schema and fields of projection output
+ let schema = output_eq.schema();
+ let fields = schema.fields();
+
let mut eq_classes = input_eq.classes().to_vec();
+ let mut oeq_alias_map = vec![];
+ for (column, columns) in columns_map {
+ if is_column_invalid_in_new_schema(column, fields) {
+ oeq_alias_map.push((column.clone(), columns[0].clone()));
+ }
+ }
for class in eq_classes.iter_mut() {
- class.update_with_aliases(columns_map);
+ class.update_with_aliases(&oeq_alias_map, fields);
}
// Prune columns that no longer is in the schema from from the
OrderingEquivalenceProperties.
- let schema = output_eq.schema();
- let fields = schema.fields();
for class in eq_classes.iter_mut() {
let sort_exprs_to_remove = class
.iter()
.filter(|sort_exprs| {
sort_exprs.iter().any(|sort_expr| {
- let col_infos = get_column_indices(&sort_expr.expr);
+ let cols_in_expr = collect_columns(&sort_expr.expr);
// If any one of the columns, used in Expression is
invalid, remove expression
// from ordering equivalences
- col_infos.into_iter().any(|(idx, name)| {
- idx >= fields.len() || fields[idx].name() != &name
- })
+ cols_in_expr
+ .iter()
+ .any(|col| is_column_invalid_in_new_schema(col,
fields))
})
})
.cloned()
@@ -434,6 +466,42 @@ pub fn project_ordering_equivalence_properties(
output_eq.extend(eq_classes);
}
+/// Update `ordering` if it contains cast expression with target column
+/// after projection, if there is no cast expression among `ordering`
expressions,
+/// returns `None`.
+fn update_with_cast_exprs(
+ cast_exprs: &[(CastExpr, Column)],
+ mut ordering: LexOrdering,
+) -> Option<LexOrdering> {
+ let mut is_changed = false;
+ for sort_expr in ordering.iter_mut() {
+ for (cast_expr, target_col) in cast_exprs.iter() {
+ if sort_expr.expr.eq(cast_expr.expr()) {
+ sort_expr.expr = Arc::new(target_col.clone()) as _;
+ is_changed = true;
+ }
+ }
+ }
+ is_changed.then_some(ordering)
+}
+
+/// Update cast expressions inside ordering equivalence
+/// properties with its target column after projection
+pub fn update_ordering_equivalence_with_cast(
+ cast_exprs: &[(CastExpr, Column)],
+ input_oeq: &mut OrderingEquivalenceProperties,
+) {
+ for cls in input_oeq.classes.iter_mut() {
+ for ordering in
+
std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter())
+ {
+ if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs,
ordering) {
+ cls.insert(updated_ordering);
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -441,7 +509,6 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
- use datafusion_expr::Operator;
use std::sync::Arc;
#[test]
@@ -534,18 +601,4 @@ mod tests {
Ok(())
}
-
- #[test]
- fn test_get_column_infos() -> Result<()> {
- let expr1 = Arc::new(Column::new("col1", 2)) as _;
- assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]);
- let expr2 = Arc::new(Column::new("col2", 5)) as _;
- assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]);
- let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as
_;
- assert_eq!(
- get_column_indices(&expr3),
- vec![(2, "col1".to_string()), (5, "col2".to_string())]
- );
- Ok(())
- }
}
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index b5916def86..b6c3536a1e 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -41,7 +41,7 @@ fn default_cast_options() -> CastOptions<'static> {
}
/// CAST expression casts an expression to a specific data type and returns a
runtime error on invalid cast
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct CastExpr {
/// The expression to cast
expr: Arc<dyn PhysicalExpr>,
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index b695ee169e..165faba442 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -59,7 +59,8 @@ pub use physical_expr::{AnalysisContext, ExprBoundaries,
PhysicalExpr, PhysicalE
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{
- LexOrdering, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
+ LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalSortExpr,
+ PhysicalSortRequirement,
};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
diff --git a/datafusion/physical-expr/src/sort_expr.rs
b/datafusion/physical-expr/src/sort_expr.rs
index 659e8c85e8..0ba4627bc7 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -17,13 +17,15 @@
//! Sort expressions
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
+
use crate::PhysicalExpr;
+
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
-use std::hash::{Hash, Hasher};
-use std::sync::Arc;
/// Represents Sort operation for a column in a RecordBatch
#[derive(Clone, Debug)]
@@ -227,5 +229,8 @@ fn to_str(options: &SortOptions) -> &str {
///`LexOrdering` is a type alias for lexicographical ordering
definition`Vec<PhysicalSortExpr>`
pub type LexOrdering = Vec<PhysicalSortExpr>;
+///`LexOrderingRef` is a type alias for lexicographical ordering reference
&`[PhysicalSortExpr]`
+pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
+
///`LexOrderingReq` is a type alias for lexicographical ordering requirement
definition`Vec<PhysicalSortRequirement>`
pub type LexOrderingReq = Vec<PhysicalSortRequirement>;
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index f95ec032eb..3fbb6ab9f9 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -1464,4 +1464,24 @@ mod tests {
assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
Ok(())
}
+
+ #[test]
+ fn test_collect_columns() -> Result<()> {
+ let expr1 = Arc::new(Column::new("col1", 2)) as _;
+ let mut expected = HashSet::new();
+ expected.insert(Column::new("col1", 2));
+ assert_eq!(collect_columns(&expr1), expected);
+
+ let expr2 = Arc::new(Column::new("col2", 5)) as _;
+ let mut expected = HashSet::new();
+ expected.insert(Column::new("col2", 5));
+ assert_eq!(collect_columns(&expr2), expected);
+
+ let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as
_;
+ let mut expected = HashSet::new();
+ expected.insert(Column::new("col1", 2));
+ expected.insert(Column::new("col2", 5));
+ assert_eq!(collect_columns(&expr3), expected);
+ Ok(())
+ }
}