This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 95ba48bd22 Better Equivalence (ordering and exact equivalence)
Propagation through ProjectionExec (#8484)
95ba48bd22 is described below
commit 95ba48bd2291dd5c303bdaf88cbb55c79d395930
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Dec 11 20:05:55 2023 +0300
Better Equivalence (ordering and exact equivalence) Propagation through
ProjectionExec (#8484)
* Better projection support complex expression support
---------
Co-authored-by: metesynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/physical-expr/src/equivalence.rs | 1746 ++++++++++++++++++++++++++-
1 file changed, 1679 insertions(+), 67 deletions(-)
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index 4a562f4ef1..defd7b5786 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use std::hash::Hash;
+use std::collections::{HashMap, HashSet};
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::expressions::{Column, Literal};
@@ -26,12 +27,14 @@ use crate::{
LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
};
+
use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{JoinSide, JoinType, Result};
use indexmap::IndexSet;
+use itertools::Itertools;
/// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known
/// to have the same value for all tuples in a relation. These are generated by
@@ -465,31 +468,6 @@ impl EquivalenceGroup {
.map(|children| expr.clone().with_new_children(children).unwrap())
}
- /// Projects `ordering` according to the given projection mapping.
- /// If the resulting ordering is invalid after projection, returns `None`.
- fn project_ordering(
- &self,
- mapping: &ProjectionMapping,
- ordering: LexOrderingRef,
- ) -> Option<LexOrdering> {
- // If any sort expression is invalid after projection, rest of the
- // ordering shouldn't be projected either. For example, if input
ordering
- // is [a ASC, b ASC, c ASC], and column b is not valid after
projection,
- // the result should be [a ASC], not [a ASC, c ASC], even if column c
is
- // valid after projection.
- let result = ordering
- .iter()
- .map_while(|sort_expr| {
- self.project_expr(mapping, &sort_expr.expr)
- .map(|expr| PhysicalSortExpr {
- expr,
- options: sort_expr.options,
- })
- })
- .collect::<Vec<_>>();
- (!result.is_empty()).then_some(result)
- }
-
/// Projects this equivalence group according to the given projection
mapping.
pub fn project(&self, mapping: &ProjectionMapping) -> Self {
let projected_classes = self.iter().filter_map(|cls| {
@@ -724,8 +702,21 @@ impl OrderingEquivalenceClass {
// Append orderings in `other` to all existing orderings in this
equivalence
// class.
pub fn join_suffix(mut self, other: &Self) -> Self {
- for ordering in other.iter() {
- for idx in 0..self.orderings.len() {
+ let n_ordering = self.orderings.len();
+ // Replicate entries before cross product
+ let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering);
+ self.orderings = self
+ .orderings
+ .iter()
+ .cloned()
+ .cycle()
+ .take(n_cross)
+ .collect();
+ // Suffix orderings of other to the current orderings.
+ for (outer_idx, ordering) in other.iter().enumerate() {
+ for idx in 0..n_ordering {
+ // Calculate cross product index
+ let idx = outer_idx * n_ordering + idx;
self.orderings[idx].extend(ordering.iter().cloned());
}
}
@@ -1196,6 +1187,181 @@ impl EquivalenceProperties {
self.eq_group.project_expr(projection_mapping, expr)
}
+ /// Constructs a dependency map based on existing orderings referred to in
+ /// the projection.
+ ///
+ /// This function analyzes the orderings in the normalized
order-equivalence
+ /// class and builds a dependency map. The dependency map captures
relationships
+ /// between expressions within the orderings, helping to identify
dependencies
+ /// and construct valid projected orderings during projection operations.
+ ///
+ /// # Parameters
+ ///
+ /// - `mapping`: A reference to the `ProjectionMapping` that defines the
+ /// relationship between source and target expressions.
+ ///
+ /// # Returns
+ ///
+ /// A [`DependencyMap`] representing the dependency map, where each
+ /// [`DependencyNode`] contains dependencies for the key
[`PhysicalSortExpr`].
+ ///
+ /// # Example
+ ///
+ /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC,
c ASC]`,
+ /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b +
c]`.
+ /// Then, the dependency map will be:
+ ///
+ /// ```text
+ /// a ASC: Node {Some(a_new ASC), HashSet{}}
+ /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
+ /// c ASC: Node {None, HashSet{a ASC}}
+ /// ```
+ fn construct_dependency_map(&self, mapping: &ProjectionMapping) ->
DependencyMap {
+ let mut dependency_map = HashMap::new();
+ for ordering in self.normalized_oeq_class().iter() {
+ for (idx, sort_expr) in ordering.iter().enumerate() {
+ let target_sort_expr =
+ self.project_expr(&sort_expr.expr, mapping).map(|expr| {
+ PhysicalSortExpr {
+ expr,
+ options: sort_expr.options,
+ }
+ });
+ let is_projected = target_sort_expr.is_some();
+ if is_projected
+ || mapping
+ .iter()
+ .any(|(source, _)| expr_refers(source,
&sort_expr.expr))
+ {
+ // Previous ordering is a dependency. Note that there is
no,
+ // dependency for a leading ordering (i.e. the first sort
+ // expression).
+ let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
+ // Add sort expressions that can be projected or referred
to
+ // by any of the projection expressions to the dependency
map:
+ dependency_map
+ .entry(sort_expr.clone())
+ .or_insert_with(|| DependencyNode {
+ target_sort_expr: target_sort_expr.clone(),
+ dependencies: HashSet::new(),
+ })
+ .insert_dependency(dependency);
+ }
+ if !is_projected {
+ // If we can not project, stop constructing the dependency
+ // map as remaining dependencies will be invalid after
projection.
+ break;
+ }
+ }
+ }
+ dependency_map
+ }
+
+ /// Returns a new `ProjectionMapping` where source expressions are
normalized.
+ ///
+ /// This normalization ensures that source expressions are transformed
into a
+ /// consistent representation. This is beneficial for algorithms that rely
on
+ /// exact equalities, as it allows for more precise and reliable
comparisons.
+ ///
+ /// # Parameters
+ ///
+ /// - `mapping`: A reference to the original `ProjectionMapping` to be
normalized.
+ ///
+ /// # Returns
+ ///
+ /// A new `ProjectionMapping` with normalized source expressions.
+ fn normalized_mapping(&self, mapping: &ProjectionMapping) ->
ProjectionMapping {
+ // Construct the mapping where source expressions are normalized. In
this way
+ // In the algorithms below we can work on exact equalities
+ ProjectionMapping {
+ map: mapping
+ .iter()
+ .map(|(source, target)| {
+ let normalized_source =
self.eq_group.normalize_expr(source.clone());
+ (normalized_source, target.clone())
+ })
+ .collect(),
+ }
+ }
+
+ /// Computes projected orderings based on a given projection mapping.
+ ///
+ /// This function takes a `ProjectionMapping` and computes the possible
+ /// orderings for the projected expressions. It considers dependencies
+ /// between expressions and generates valid orderings according to the
+ /// specified sort properties.
+ ///
+ /// # Parameters
+ ///
+ /// - `mapping`: A reference to the `ProjectionMapping` that defines the
+ /// relationship between source and target expressions.
+ ///
+ /// # Returns
+ ///
+ /// A vector of `LexOrdering` containing all valid orderings after
projection.
+ fn projected_orderings(&self, mapping: &ProjectionMapping) ->
Vec<LexOrdering> {
+ let mapping = self.normalized_mapping(mapping);
+
+ // Get dependency map for existing orderings:
+ let dependency_map = self.construct_dependency_map(&mapping);
+
+ let orderings = mapping.iter().flat_map(|(source, target)| {
+ referred_dependencies(&dependency_map, source)
+ .into_iter()
+ .filter_map(|relevant_deps| {
+ if let SortProperties::Ordered(options) =
+ get_expr_ordering(source, &relevant_deps)
+ {
+ Some((options, relevant_deps))
+ } else {
+ // Do not consider unordered cases
+ None
+ }
+ })
+ .flat_map(|(options, relevant_deps)| {
+ let sort_expr = PhysicalSortExpr {
+ expr: target.clone(),
+ options,
+ };
+ // Generate dependent orderings (i.e. prefixes for
`sort_expr`):
+ let mut dependency_orderings =
+ generate_dependency_orderings(&relevant_deps,
&dependency_map);
+ // Append `sort_expr` to the dependent orderings:
+ for ordering in dependency_orderings.iter_mut() {
+ ordering.push(sort_expr.clone());
+ }
+ dependency_orderings
+ })
+ });
+
+ // Add valid projected orderings. For example, if existing ordering is
+ // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
+ // preserve `a_new + b_new` as ordered. Please note that `a_new` and
+ // `b_new` themselves need not be ordered. Such dependencies cannot be
+ // deduced via the pass above.
+ let projected_orderings = dependency_map.iter().flat_map(|(sort_expr,
node)| {
+ let mut prefixes = construct_prefix_orderings(sort_expr,
&dependency_map);
+ if prefixes.is_empty() {
+ // If prefix is empty, there is no dependency. Insert
+ // empty ordering:
+ prefixes = vec![vec![]];
+ }
+ // Append current ordering on top its dependencies:
+ for ordering in prefixes.iter_mut() {
+ if let Some(target) = &node.target_sort_expr {
+ ordering.push(target.clone())
+ }
+ }
+ prefixes
+ });
+
+ // Simplify each ordering by removing redundant sections:
+ orderings
+ .chain(projected_orderings)
+ .map(collapse_lex_ordering)
+ .collect()
+ }
+
/// Projects constants based on the provided `ProjectionMapping`.
///
/// This function takes a `ProjectionMapping` and identifies/projects
@@ -1240,28 +1406,13 @@ impl EquivalenceProperties {
projection_mapping: &ProjectionMapping,
output_schema: SchemaRef,
) -> Self {
- let mut projected_orderings = self
- .oeq_class
- .iter()
- .filter_map(|order|
self.eq_group.project_ordering(projection_mapping, order))
- .collect::<Vec<_>>();
- for (source, target) in projection_mapping.iter() {
- let expr_ordering = ExprOrdering::new(source.clone())
- .transform_up(&|expr| Ok(update_ordering(expr, self)))
- // Guaranteed to always return `Ok`.
- .unwrap();
- if let SortProperties::Ordered(options) = expr_ordering.state {
- // Push new ordering to the state.
- projected_orderings.push(vec![PhysicalSortExpr {
- expr: target.clone(),
- options,
- }]);
- }
- }
+ let projected_constants = self.projected_constants(projection_mapping);
+ let projected_eq_group = self.eq_group.project(projection_mapping);
+ let projected_orderings = self.projected_orderings(projection_mapping);
Self {
- eq_group: self.eq_group.project(projection_mapping),
+ eq_group: projected_eq_group,
oeq_class: OrderingEquivalenceClass::new(projected_orderings),
- constants: self.projected_constants(projection_mapping),
+ constants: projected_constants,
schema: output_schema,
}
}
@@ -1397,6 +1548,270 @@ fn is_constant_recurse(
!children.is_empty() && children.iter().all(|c|
is_constant_recurse(constants, c))
}
+/// This function examines whether a referring expression directly refers to a
+/// given referred expression or if any of its children in the expression tree
+/// refer to the specified expression.
+///
+/// # Parameters
+///
+/// - `referring_expr`: A reference to the referring expression (`Arc<dyn
PhysicalExpr>`).
+/// - `referred_expr`: A reference to the referred expression (`Arc<dyn
PhysicalExpr>`)
+///
+/// # Returns
+///
+/// A boolean value indicating whether `referring_expr` refers (needs it to
evaluate its result)
+/// `referred_expr` or not.
+fn expr_refers(
+ referring_expr: &Arc<dyn PhysicalExpr>,
+ referred_expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+ referring_expr.eq(referred_expr)
+ || referring_expr
+ .children()
+ .iter()
+ .any(|child| expr_refers(child, referred_expr))
+}
+
+/// Wrapper struct for `Arc<dyn PhysicalExpr>` to use them as keys in a hash
map.
+#[derive(Debug, Clone)]
+struct ExprWrapper(Arc<dyn PhysicalExpr>);
+
+impl PartialEq<Self> for ExprWrapper {
+ fn eq(&self, other: &Self) -> bool {
+ self.0.eq(&other.0)
+ }
+}
+
+impl Eq for ExprWrapper {}
+
+impl Hash for ExprWrapper {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.0.hash(state);
+ }
+}
+
+/// This function analyzes the dependency map to collect referred dependencies
for
+/// a given source expression.
+///
+/// # Parameters
+///
+/// - `dependency_map`: A reference to the `DependencyMap` where each
+/// `PhysicalSortExpr` is associated with a `DependencyNode`.
+/// - `source`: A reference to the source expression (`Arc<dyn PhysicalExpr>`)
+/// for which relevant dependencies need to be identified.
+///
+/// # Returns
+///
+/// A `Vec<Dependencies>` containing the dependencies for the given source
+/// expression. These dependencies are expressions that are referred to by
+/// the source expression based on the provided dependency map.
+fn referred_dependencies(
+ dependency_map: &DependencyMap,
+ source: &Arc<dyn PhysicalExpr>,
+) -> Vec<Dependencies> {
+ // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
+ let mut expr_to_sort_exprs = HashMap::<ExprWrapper, Dependencies>::new();
+ for sort_expr in dependency_map
+ .keys()
+ .filter(|sort_expr| expr_refers(source, &sort_expr.expr))
+ {
+ let key = ExprWrapper(sort_expr.expr.clone());
+ expr_to_sort_exprs
+ .entry(key)
+ .or_default()
+ .insert(sort_expr.clone());
+ }
+
+ // Generate all valid dependencies for the source. For example, if the
source
+ // is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
+ // `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
+ expr_to_sort_exprs
+ .values()
+ .multi_cartesian_product()
+ .map(|referred_deps| referred_deps.into_iter().cloned().collect())
+ .collect()
+}
+
+/// This function recursively analyzes the dependencies of the given sort
+/// expression within the given dependency map to construct lexicographical
+/// orderings that include the sort expression and its dependencies.
+///
+/// # Parameters
+///
+/// - `referred_sort_expr`: A reference to the sort expression
(`PhysicalSortExpr`)
+/// for which lexicographical orderings satisfying its dependencies are to be
+/// constructed.
+/// - `dependency_map`: A reference to the `DependencyMap` that contains
+/// dependencies for different `PhysicalSortExpr`s.
+///
+/// # Returns
+///
+/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the
given
+/// sort expression and its dependencies.
+fn construct_orderings(
+ referred_sort_expr: &PhysicalSortExpr,
+ dependency_map: &DependencyMap,
+) -> Vec<LexOrdering> {
+ // We are sure that `referred_sort_expr` is inside `dependency_map`.
+ let node = &dependency_map[referred_sort_expr];
+ // Since we work on intermediate nodes, we are sure `val.target_sort_expr`
+ // exists.
+ let target_sort_expr = node.target_sort_expr.clone().unwrap();
+ if node.dependencies.is_empty() {
+ vec![vec![target_sort_expr]]
+ } else {
+ node.dependencies
+ .iter()
+ .flat_map(|dep| {
+ let mut orderings = construct_orderings(dep, dependency_map);
+ for ordering in orderings.iter_mut() {
+ ordering.push(target_sort_expr.clone())
+ }
+ orderings
+ })
+ .collect()
+ }
+}
+
+/// This function retrieves the dependencies of the given relevant sort
expression
+/// from the given dependency map. It then constructs prefix orderings by
recursively
+/// analyzing the dependencies and include them in the orderings.
+///
+/// # Parameters
+///
+/// - `relevant_sort_expr`: A reference to the relevant sort expression
+/// (`PhysicalSortExpr`) for which prefix orderings are to be constructed.
+/// - `dependency_map`: A reference to the `DependencyMap` containing
dependencies.
+///
+/// # Returns
+///
+/// A vector of prefix orderings (`Vec<LexOrdering>`) based on the given
relevant
+/// sort expression and its dependencies.
+fn construct_prefix_orderings(
+ relevant_sort_expr: &PhysicalSortExpr,
+ dependency_map: &DependencyMap,
+) -> Vec<LexOrdering> {
+ dependency_map[relevant_sort_expr]
+ .dependencies
+ .iter()
+ .flat_map(|dep| construct_orderings(dep, dependency_map))
+ .collect()
+}
+
+/// Given a set of relevant dependencies (`relevant_deps`) and a map of
dependencies
+/// (`dependency_map`), this function generates all possible prefix orderings
+/// based on the given dependencies.
+///
+/// # Parameters
+///
+/// * `dependencies` - A reference to the dependencies.
+/// * `dependency_map` - A reference to the map of dependencies for
expressions.
+///
+/// # Returns
+///
+/// A vector of lexical orderings (`Vec<LexOrdering>`) representing all valid
orderings
+/// based on the given dependencies.
+fn generate_dependency_orderings(
+ dependencies: &Dependencies,
+ dependency_map: &DependencyMap,
+) -> Vec<LexOrdering> {
+ // Construct all the valid prefix orderings for each expression appearing
+ // in the projection:
+ let relevant_prefixes = dependencies
+ .iter()
+ .flat_map(|dep| {
+ let prefixes = construct_prefix_orderings(dep, dependency_map);
+ (!prefixes.is_empty()).then_some(prefixes)
+ })
+ .collect::<Vec<_>>();
+
+ // No dependency, dependent is a leading ordering.
+ if relevant_prefixes.is_empty() {
+ // Return an empty ordering:
+ return vec![vec![]];
+ }
+
+ // Generate all possible orderings where dependencies are satisfied for the
+ // current projection expression. For example, if expression is `a + b
ASC`,
+ // and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC`
+ // is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and
+ // `[d DESC, c ASC, a + b ASC]`.
+ relevant_prefixes
+ .into_iter()
+ .multi_cartesian_product()
+ .flat_map(|prefix_orderings| {
+ prefix_orderings
+ .iter()
+ .permutations(prefix_orderings.len())
+ .map(|prefixes|
prefixes.into_iter().flatten().cloned().collect())
+ .collect::<Vec<_>>()
+ })
+ .collect()
+}
+
+/// This function examines the given expression and the sort expressions it
+/// refers to determine the ordering properties of the expression.
+///
+/// # Parameters
+///
+/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`)
for
+/// which ordering properties need to be determined.
+/// - `dependencies`: A reference to `Dependencies`, containing sort
expressions
+/// referred to by `expr`.
+///
+/// # Returns
+///
+/// A `SortProperties` indicating the ordering information of the given
expression.
+fn get_expr_ordering(
+ expr: &Arc<dyn PhysicalExpr>,
+ dependencies: &Dependencies,
+) -> SortProperties {
+ if let Some(column_order) = dependencies.iter().find(|&order|
expr.eq(&order.expr)) {
+ // If exact match is found, return its ordering.
+ SortProperties::Ordered(column_order.options)
+ } else {
+ // Find orderings of its children
+ let child_states = expr
+ .children()
+ .iter()
+ .map(|child| get_expr_ordering(child, dependencies))
+ .collect::<Vec<_>>();
+ // Calculate expression ordering using ordering of its children.
+ expr.get_ordering(&child_states)
+ }
+}
+
+/// Represents a node in the dependency map used to construct projected
orderings.
+///
+/// A `DependencyNode` contains information about a particular sort expression,
+/// including its target sort expression and a set of dependencies on other
sort
+/// expressions.
+///
+/// # Fields
+///
+/// - `target_sort_expr`: An optional `PhysicalSortExpr` representing the
target
+/// sort expression associated with the node. It is `None` if the sort
expression
+/// cannot be projected.
+/// - `dependencies`: A [`Dependencies`] containing dependencies on other sort
+/// expressions that are referred to by the target sort expression.
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct DependencyNode {
+ target_sort_expr: Option<PhysicalSortExpr>,
+ dependencies: Dependencies,
+}
+
+impl DependencyNode {
+ // Insert dependency to the state (if exists).
+ fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
+ if let Some(dep) = dependency {
+ self.dependencies.insert(dep.clone());
+ }
+ }
+}
+
+type DependencyMap = HashMap<PhysicalSortExpr, DependencyNode>;
+type Dependencies = HashSet<PhysicalSortExpr>;
+
/// Calculate ordering equivalence properties for the given join operation.
pub fn join_equivalence_properties(
left: EquivalenceProperties,
@@ -1544,7 +1959,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, Float64Array, RecordBatch, UInt32Array};
use arrow_schema::{Fields, SortOptions, TimeUnit};
- use datafusion_common::{Result, ScalarValue};
+ use datafusion_common::{plan_datafusion_err, DataFusionError, Result,
ScalarValue};
use datafusion_expr::{BuiltinScalarFunction, Operator};
use itertools::{izip, Itertools};
@@ -1552,6 +1967,37 @@ mod tests {
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
+ fn output_schema(
+ mapping: &ProjectionMapping,
+ input_schema: &Arc<Schema>,
+ ) -> Result<SchemaRef> {
+ // Calculate output schema
+ let fields: Result<Vec<Field>> = mapping
+ .iter()
+ .map(|(source, target)| {
+ let name = target
+ .as_any()
+ .downcast_ref::<Column>()
+ .ok_or_else(|| plan_datafusion_err!("Expects to have
column"))?
+ .name();
+ let field = Field::new(
+ name,
+ source.data_type(input_schema)?,
+ source.nullable(input_schema)?,
+ );
+
+ Ok(field)
+ })
+ .collect();
+
+ let output_schema = Arc::new(Schema::new_with_metadata(
+ fields?,
+ input_schema.metadata().clone(),
+ ));
+
+ Ok(output_schema)
+ }
+
// Generate a schema which consists of 8 columns (a, b, c, d, e, f, g, h)
fn create_test_schema() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
@@ -1679,7 +2125,7 @@ mod tests {
.map(|(expr, options)| {
PhysicalSortRequirement::new((*expr).clone(), *options)
})
- .collect::<Vec<_>>()
+ .collect()
}
// Convert each tuple to PhysicalSortExpr
@@ -1692,7 +2138,7 @@ mod tests {
expr: (*expr).clone(),
options: *options,
})
- .collect::<Vec<_>>()
+ .collect()
}
// Convert each inner tuple to PhysicalSortExpr
@@ -1705,6 +2151,56 @@ mod tests {
.collect()
}
+ // Convert each tuple to PhysicalSortExpr
+ fn convert_to_sort_exprs_owned(
+ in_data: &[(Arc<dyn PhysicalExpr>, SortOptions)],
+ ) -> Vec<PhysicalSortExpr> {
+ in_data
+ .iter()
+ .map(|(expr, options)| PhysicalSortExpr {
+ expr: (*expr).clone(),
+ options: *options,
+ })
+ .collect()
+ }
+
+ // Convert each inner tuple to PhysicalSortExpr
+ fn convert_to_orderings_owned(
+ orderings: &[Vec<(Arc<dyn PhysicalExpr>, SortOptions)>],
+ ) -> Vec<Vec<PhysicalSortExpr>> {
+ orderings
+ .iter()
+ .map(|sort_exprs| convert_to_sort_exprs_owned(sort_exprs))
+ .collect()
+ }
+
+ // Apply projection to the input_data, return projected equivalence
properties and record batch
+ fn apply_projection(
+ proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)>,
+ input_data: &RecordBatch,
+ input_eq_properties: &EquivalenceProperties,
+ ) -> Result<(RecordBatch, EquivalenceProperties)> {
+ let input_schema = input_data.schema();
+ let projection_mapping = ProjectionMapping::try_new(&proj_exprs,
&input_schema)?;
+
+ let output_schema = output_schema(&projection_mapping, &input_schema)?;
+ let num_rows = input_data.num_rows();
+ // Apply projection to the input record batch.
+ let projected_values = projection_mapping
+ .iter()
+ .map(|(source, _target)|
source.evaluate(input_data)?.into_array(num_rows))
+ .collect::<Result<Vec<_>>>()?;
+ let projected_batch = if projected_values.is_empty() {
+ RecordBatch::new_empty(output_schema.clone())
+ } else {
+ RecordBatch::try_new(output_schema.clone(), projected_values)?
+ };
+
+ let projected_eq =
+ input_eq_properties.project(&projection_mapping, output_schema);
+ Ok((projected_batch, projected_eq))
+ }
+
#[test]
fn add_equal_conditions_test() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
@@ -1774,13 +2270,16 @@ mod tests {
let input_properties =
EquivalenceProperties::new(input_schema.clone());
let col_a = col("a", &input_schema)?;
- let out_schema = Arc::new(Schema::new(vec![
- Field::new("a1", DataType::Int64, true),
- Field::new("a2", DataType::Int64, true),
- Field::new("a3", DataType::Int64, true),
- Field::new("a4", DataType::Int64, true),
- ]));
+ // a as a1, a as a2, a as a3, a as a3
+ let proj_exprs = vec![
+ (col_a.clone(), "a1".to_string()),
+ (col_a.clone(), "a2".to_string()),
+ (col_a.clone(), "a3".to_string()),
+ (col_a.clone(), "a4".to_string()),
+ ];
+ let projection_mapping = ProjectionMapping::try_new(&proj_exprs,
&input_schema)?;
+ let out_schema = output_schema(&projection_mapping, &input_schema)?;
// a as a1, a as a2, a as a3, a as a3
let proj_exprs = vec![
(col_a.clone(), "a1".to_string()),
@@ -3686,30 +4185,1143 @@ mod tests {
}
#[test]
- fn test_expr_consists_of_constants() -> Result<()> {
+ fn project_orderings() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
+ Field::new("e", DataType::Int32, true),
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None),
true),
]));
- let col_a = col("a", &schema)?;
- let col_b = col("b", &schema)?;
- let col_d = col("d", &schema)?;
+ let col_a = &col("a", &schema)?;
+ let col_b = &col("b", &schema)?;
+ let col_c = &col("c", &schema)?;
+ let col_d = &col("d", &schema)?;
+ let col_e = &col("e", &schema)?;
+ let col_ts = &col("ts", &schema)?;
+ let interval =
Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2))))
+ as Arc<dyn PhysicalExpr>;
+ let date_bin_func = &create_physical_expr(
+ &BuiltinScalarFunction::DateBin,
+ &[interval, col_ts.clone()],
+ &schema,
+ &ExecutionProps::default(),
+ )?;
+ let a_plus_b = Arc::new(BinaryExpr::new(
+ col_a.clone(),
+ Operator::Plus,
+ col_b.clone(),
+ )) as Arc<dyn PhysicalExpr>;
let b_plus_d = Arc::new(BinaryExpr::new(
col_b.clone(),
Operator::Plus,
col_d.clone(),
)) as Arc<dyn PhysicalExpr>;
+ let b_plus_e = Arc::new(BinaryExpr::new(
+ col_b.clone(),
+ Operator::Plus,
+ col_e.clone(),
+ )) as Arc<dyn PhysicalExpr>;
+ let c_plus_d = Arc::new(BinaryExpr::new(
+ col_c.clone(),
+ Operator::Plus,
+ col_d.clone(),
+ )) as Arc<dyn PhysicalExpr>;
- let constants = vec![col_a.clone(), col_b.clone()];
- let expr = b_plus_d.clone();
- assert!(!is_constant_recurse(&constants, &expr));
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option_desc = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
- let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()];
- let expr = b_plus_d.clone();
- assert!(is_constant_recurse(&constants, &expr));
+ let test_cases = vec![
+ // ---------- TEST CASE 1 ------------
+ (
+ // orderings
+ vec![
+ // [b ASC]
+ vec![(col_b, option_asc)],
+ ],
+ // projection exprs
+ vec![(col_b, "b_new".to_string()), (col_a,
"a_new".to_string())],
+ // expected
+ vec![
+ // [b_new ASC]
+ vec![("b_new", option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 2 ------------
+ (
+ // orderings
+ vec![
+ // empty ordering
+ ],
+ // projection exprs
+ vec![(col_c, "c_new".to_string()), (col_b,
"b_new".to_string())],
+ // expected
+ vec![
+ // no ordering at the output
+ ],
+ ),
+ // ---------- TEST CASE 3 ------------
+ (
+ // orderings
+ vec![
+ // [ts ASC]
+ vec![(col_ts, option_asc)],
+ ],
+ // projection exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_ts, "ts_new".to_string()),
+ (date_bin_func, "date_bin_res".to_string()),
+ ],
+ // expected
+ vec![
+ // [date_bin_res ASC]
+ vec![("date_bin_res", option_asc)],
+ // [ts_new ASC]
+ vec![("ts_new", option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 4 ------------
+ (
+ // orderings
+ vec![
+ // [a ASC, ts ASC]
+ vec![(col_a, option_asc), (col_ts, option_asc)],
+ // [b ASC, ts ASC]
+ vec![(col_b, option_asc), (col_ts, option_asc)],
+ ],
+ // projection exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_ts, "ts_new".to_string()),
+ (date_bin_func, "date_bin_res".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, ts_new ASC]
+ vec![("a_new", option_asc), ("ts_new", option_asc)],
+ // [a_new ASC, date_bin_res ASC]
+ vec![("a_new", option_asc), ("date_bin_res", option_asc)],
+ // [b_new ASC, ts_new ASC]
+ vec![("b_new", option_asc), ("ts_new", option_asc)],
+ // [b_new ASC, date_bin_res ASC]
+ vec![("b_new", option_asc), ("date_bin_res", option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 5 ------------
+ (
+ // orderings
+ vec![
+ // [a + b ASC]
+ vec![(&a_plus_b, option_asc)],
+ ],
+ // projection exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (&a_plus_b, "a+b".to_string()),
+ ],
+ // expected
+ vec![
+ // [a + b ASC]
+ vec![("a+b", option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 6 ------------
+ (
+ // orderings
+ vec![
+ // [a + b ASC, c ASC]
+ vec![(&a_plus_b, option_asc), (&col_c, option_asc)],
+ ],
+ // projection exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_c, "c_new".to_string()),
+ (&a_plus_b, "a+b".to_string()),
+ ],
+ // expected
+ vec![
+ // [a + b ASC, c_new ASC]
+ vec![("a+b", option_asc), ("c_new", option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 7 ----------
+ (
+ vec![
+ // [a ASC, b ASC, c ASC]
+ vec![(col_a, option_asc), (col_b, option_asc)],
+ // [a ASC, d ASC]
+ vec![(col_a, option_asc), (col_d, option_asc)],
+ ],
+ // b as b_new, a as a_new, d as d_new b+d
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_d, "d_new".to_string()),
+ (&b_plus_d, "b+d".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, b_new ASC]
+ vec![("a_new", option_asc), ("b_new", option_asc)],
+ // [a_new ASC, d_new ASC]
+ vec![("a_new", option_asc), ("d_new", option_asc)],
+ // [a_new ASC, b+d ASC]
+ vec![("a_new", option_asc), ("b+d", option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 8 ----------
+ (
+ // orderings
+ vec![
+ // [b+d ASC]
+ vec![(&b_plus_d, option_asc)],
+ ],
+ // proj exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_d, "d_new".to_string()),
+ (&b_plus_d, "b+d".to_string()),
+ ],
+ // expected
+ vec![
+ // [b+d ASC]
+ vec![("b+d", option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 9 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, d ASC, b ASC]
+ vec![
+ (col_a, option_asc),
+ (col_d, option_asc),
+ (col_b, option_asc),
+ ],
+ // [c ASC]
+ vec![(col_c, option_asc)],
+ ],
+ // proj exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_d, "d_new".to_string()),
+ (col_c, "c_new".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, d_new ASC, b_new ASC]
+ vec![
+ ("a_new", option_asc),
+ ("d_new", option_asc),
+ ("b_new", option_asc),
+ ],
+ // [c_new ASC],
+ vec![("c_new", option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 10 ----------
+ (
+ vec![
+ // [a ASC, b ASC, c ASC]
+ vec![
+ (col_a, option_asc),
+ (col_b, option_asc),
+ (col_c, option_asc),
+ ],
+ // [a ASC, d ASC]
+ vec![(col_a, option_asc), (col_d, option_asc)],
+ ],
+ // proj exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_c, "c_new".to_string()),
+ (&c_plus_d, "c+d".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, b_new ASC, c_new ASC]
+ vec![
+ ("a_new", option_asc),
+ ("b_new", option_asc),
+ ("c_new", option_asc),
+ ],
+ // [a_new ASC, b_new ASC, c+d ASC]
+ vec![
+ ("a_new", option_asc),
+ ("b_new", option_asc),
+ ("c+d", option_asc),
+ ],
+ ],
+ ),
+ // ------- TEST CASE 11 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, b ASC]
+ vec![(col_a, option_asc), (col_b, option_asc)],
+ // [a ASC, d ASC]
+ vec![(col_a, option_asc), (col_d, option_asc)],
+ ],
+ // proj exprs
+ vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (&b_plus_d, "b+d".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, b_new ASC]
+ vec![("a_new", option_asc), ("b_new", option_asc)],
+ // [a_new ASC, b + d ASC]
+ vec![("a_new", option_asc), ("b+d", option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 12 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, b ASC, c ASC]
+ vec![
+ (col_a, option_asc),
+ (col_b, option_asc),
+ (col_c, option_asc),
+ ],
+ ],
+ // proj exprs
+ vec![(col_c, "c_new".to_string()), (col_a,
"a_new".to_string())],
+ // expected
+ vec![
+ // [a_new ASC]
+ vec![("a_new", option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 13 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, b ASC, c ASC]
+ vec![
+ (col_a, option_asc),
+ (col_b, option_asc),
+ (col_c, option_asc),
+ ],
+ // [a ASC, a + b ASC, c ASC]
+ vec![
+ (col_a, option_asc),
+ (&a_plus_b, option_asc),
+ (col_c, option_asc),
+ ],
+ ],
+ // proj exprs
+ vec![
+ (col_c, "c_new".to_string()),
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (&a_plus_b, "a+b".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, b_new ASC, c_new ASC]
+ vec![
+ ("a_new", option_asc),
+ ("b_new", option_asc),
+ ("c_new", option_asc),
+ ],
+ // [a_new ASC, a+b ASC, c_new ASC]
+ vec![
+ ("a_new", option_asc),
+ ("a+b", option_asc),
+ ("c_new", option_asc),
+ ],
+ ],
+ ),
+ // ------- TEST CASE 14 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, b ASC]
+ vec![(col_a, option_asc), (col_b, option_asc)],
+ // [c ASC, b ASC]
+ vec![(col_c, option_asc), (col_b, option_asc)],
+ // [d ASC, e ASC]
+ vec![(col_d, option_asc), (col_e, option_asc)],
+ ],
+ // proj exprs
+ vec![
+ (col_c, "c_new".to_string()),
+ (col_d, "d_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (&b_plus_e, "b+e".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, d_new ASC, b+e ASC]
+ vec![
+ ("a_new", option_asc),
+ ("d_new", option_asc),
+ ("b+e", option_asc),
+ ],
+ // [d_new ASC, a_new ASC, b+e ASC]
+ vec![
+ ("d_new", option_asc),
+ ("a_new", option_asc),
+ ("b+e", option_asc),
+ ],
+ // [c_new ASC, d_new ASC, b+e ASC]
+ vec![
+ ("c_new", option_asc),
+ ("d_new", option_asc),
+ ("b+e", option_asc),
+ ],
+ // [d_new ASC, c_new ASC, b+e ASC]
+ vec![
+ ("d_new", option_asc),
+ ("c_new", option_asc),
+ ("b+e", option_asc),
+ ],
+ ],
+ ),
+ // ------- TEST CASE 15 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, c ASC, b ASC]
+ vec![
+ (col_a, option_asc),
+ (col_c, option_asc),
+ (&col_b, option_asc),
+ ],
+ ],
+ // proj exprs
+ vec![
+ (col_c, "c_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (&a_plus_b, "a+b".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, d_new ASC, b+e ASC]
+ vec![
+ ("a_new", option_asc),
+ ("c_new", option_asc),
+ ("a+b", option_asc),
+ ],
+ ],
+ ),
+ // ------- TEST CASE 16 ----------
+ (
+ // orderings
+ vec![
+ // [a ASC, b ASC]
+ vec![(col_a, option_asc), (col_b, option_asc)],
+ // [c ASC, b DESC]
+ vec![(col_c, option_asc), (col_b, option_desc)],
+ // [e ASC]
+ vec![(col_e, option_asc)],
+ ],
+ // proj exprs
+ vec![
+ (col_c, "c_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_b, "b_new".to_string()),
+ (&b_plus_e, "b+e".to_string()),
+ ],
+ // expected
+ vec![
+ // [a_new ASC, b_new ASC]
+ vec![("a_new", option_asc), ("b_new", option_asc)],
+ // [a_new ASC, b_new ASC]
+ vec![("a_new", option_asc), ("b+e", option_asc)],
+ // [c_new ASC, b_new DESC]
+ vec![("c_new", option_asc), ("b_new", option_desc)],
+ ],
+ ),
+ ];
+
+ for (idx, (orderings, proj_exprs, expected)) in
test_cases.into_iter().enumerate()
+ {
+ let mut eq_properties = EquivalenceProperties::new(schema.clone());
+
+ let orderings = convert_to_orderings(&orderings);
+ eq_properties.add_new_orderings(orderings);
+
+ let proj_exprs = proj_exprs
+ .into_iter()
+ .map(|(expr, name)| (expr.clone(), name))
+ .collect::<Vec<_>>();
+ let projection_mapping = ProjectionMapping::try_new(&proj_exprs,
&schema)?;
+ let output_schema = output_schema(&projection_mapping, &schema)?;
+
+ let expected = expected
+ .into_iter()
+ .map(|ordering| {
+ ordering
+ .into_iter()
+ .map(|(name, options)| {
+ (col(name, &output_schema).unwrap(), options)
+ })
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>();
+ let expected = convert_to_orderings_owned(&expected);
+
+ let projected_eq = eq_properties.project(&projection_mapping,
output_schema);
+ let orderings = projected_eq.oeq_class();
+
+ let err_msg = format!(
+ "test_idx: {:?}, actual: {:?}, expected: {:?},
projection_mapping: {:?}",
+ idx, orderings.orderings, expected, projection_mapping
+ );
+
+ assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
+ for expected_ordering in &expected {
+ assert!(orderings.contains(expected_ordering), "{}", err_msg)
+ }
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn project_orderings2() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ Field::new("d", DataType::Int32, true),
+ Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None),
true),
+ ]));
+ let col_a = &col("a", &schema)?;
+ let col_b = &col("b", &schema)?;
+ let col_c = &col("c", &schema)?;
+ let col_ts = &col("ts", &schema)?;
+ let a_plus_b = Arc::new(BinaryExpr::new(
+ col_a.clone(),
+ Operator::Plus,
+ col_b.clone(),
+ )) as Arc<dyn PhysicalExpr>;
+ let interval =
Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2))))
+ as Arc<dyn PhysicalExpr>;
+ let date_bin_ts = &create_physical_expr(
+ &BuiltinScalarFunction::DateBin,
+ &[interval, col_ts.clone()],
+ &schema,
+ &ExecutionProps::default(),
+ )?;
+
+ let round_c = &create_physical_expr(
+ &BuiltinScalarFunction::Round,
+ &[col_c.clone()],
+ &schema,
+ &ExecutionProps::default(),
+ )?;
+
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let proj_exprs = vec![
+ (col_b, "b_new".to_string()),
+ (col_a, "a_new".to_string()),
+ (col_c, "c_new".to_string()),
+ (date_bin_ts, "date_bin_res".to_string()),
+ (round_c, "round_c_res".to_string()),
+ ];
+ let proj_exprs = proj_exprs
+ .into_iter()
+ .map(|(expr, name)| (expr.clone(), name))
+ .collect::<Vec<_>>();
+ let projection_mapping = ProjectionMapping::try_new(&proj_exprs,
&schema)?;
+ let output_schema = output_schema(&projection_mapping, &schema)?;
+
+ let col_a_new = &col("a_new", &output_schema)?;
+ let col_b_new = &col("b_new", &output_schema)?;
+ let col_c_new = &col("c_new", &output_schema)?;
+ let col_date_bin_res = &col("date_bin_res", &output_schema)?;
+ let col_round_c_res = &col("round_c_res", &output_schema)?;
+ let a_new_plus_b_new = Arc::new(BinaryExpr::new(
+ col_a_new.clone(),
+ Operator::Plus,
+ col_b_new.clone(),
+ )) as Arc<dyn PhysicalExpr>;
+
+ let test_cases = vec![
+ // ---------- TEST CASE 1 ------------
+ (
+ // orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, option_asc)],
+ ],
+ // expected
+ vec![
+ // [b_new ASC]
+ vec![(col_a_new, option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 2 ------------
+ (
+ // orderings
+ vec![
+ // [a+b ASC]
+ vec![(&a_plus_b, option_asc)],
+ ],
+ // expected
+ vec![
+ // [b_new ASC]
+ vec![(&a_new_plus_b_new, option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 3 ------------
+ (
+ // orderings
+ vec![
+ // [a ASC, ts ASC]
+ vec![(col_a, option_asc), (col_ts, option_asc)],
+ ],
+ // expected
+ vec![
+ // [a_new ASC, date_bin_res ASC]
+ vec![(col_a_new, option_asc), (col_date_bin_res,
option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 4 ------------
+ (
+ // orderings
+ vec![
+ // [a ASC, ts ASC, b ASC]
+ vec![
+ (col_a, option_asc),
+ (col_ts, option_asc),
+ (col_b, option_asc),
+ ],
+ ],
+ // expected
+ vec![
+ // [a_new ASC, date_bin_res ASC]
+ // Please note that result is not [a_new ASC, date_bin_res
ASC, b_new ASC]
+ // because, datebin_res may not be 1-1 function. Hence
without introducing ts
+ // dependency we cannot guarantee any ordering after
date_bin_res column.
+ vec![(col_a_new, option_asc), (col_date_bin_res,
option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 5 ------------
+ (
+ // orderings
+ vec![
+ // [a ASC, c ASC]
+ vec![(col_a, option_asc), (col_c, option_asc)],
+ ],
+ // expected
+ vec![
+ // [a_new ASC, round_c_res ASC, c_new ASC]
+ vec![(col_a_new, option_asc), (col_round_c_res,
option_asc)],
+ // [a_new ASC, c_new ASC]
+ vec![(col_a_new, option_asc), (col_c_new, option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 6 ------------
+ (
+ // orderings
+ vec![
+ // [c ASC, b ASC]
+ vec![(col_c, option_asc), (col_b, option_asc)],
+ ],
+ // expected
+ vec![
+ // [round_c_res ASC]
+ vec![(col_round_c_res, option_asc)],
+ // [c_new ASC, b_new ASC]
+ vec![(col_c_new, option_asc), (col_b_new, option_asc)],
+ ],
+ ),
+ // ---------- TEST CASE 7 ------------
+ (
+ // orderings
+ vec![
+ // [a+b ASC, c ASC]
+ vec![(&a_plus_b, option_asc), (col_c, option_asc)],
+ ],
+ // expected
+ vec![
+ // [a+b ASC, round(c) ASC, c_new ASC]
+ vec![
+ (&a_new_plus_b_new, option_asc),
+ (&col_round_c_res, option_asc),
+ ],
+ // [a+b ASC, c_new ASC]
+ vec![(&a_new_plus_b_new, option_asc), (col_c_new,
option_asc)],
+ ],
+ ),
+ ];
+
+ for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
+ let mut eq_properties = EquivalenceProperties::new(schema.clone());
+
+ let orderings = convert_to_orderings(orderings);
+ eq_properties.add_new_orderings(orderings);
+
+ let expected = convert_to_orderings(expected);
+
+ let projected_eq =
+ eq_properties.project(&projection_mapping,
output_schema.clone());
+ let orderings = projected_eq.oeq_class();
+
+ let err_msg = format!(
+ "test idx: {:?}, actual: {:?}, expected: {:?},
projection_mapping: {:?}",
+ idx, orderings.orderings, expected, projection_mapping
+ );
+
+ assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
+ for expected_ordering in &expected {
+ assert!(orderings.contains(expected_ordering), "{}", err_msg)
+ }
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn project_orderings3() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ Field::new("d", DataType::Int32, true),
+ Field::new("e", DataType::Int32, true),
+ Field::new("f", DataType::Int32, true),
+ ]));
+ let col_a = &col("a", &schema)?;
+ let col_b = &col("b", &schema)?;
+ let col_c = &col("c", &schema)?;
+ let col_d = &col("d", &schema)?;
+ let col_e = &col("e", &schema)?;
+ let col_f = &col("f", &schema)?;
+ let a_plus_b = Arc::new(BinaryExpr::new(
+ col_a.clone(),
+ Operator::Plus,
+ col_b.clone(),
+ )) as Arc<dyn PhysicalExpr>;
+
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let proj_exprs = vec![
+ (col_c, "c_new".to_string()),
+ (col_d, "d_new".to_string()),
+ (&a_plus_b, "a+b".to_string()),
+ ];
+ let proj_exprs = proj_exprs
+ .into_iter()
+ .map(|(expr, name)| (expr.clone(), name))
+ .collect::<Vec<_>>();
+ let projection_mapping = ProjectionMapping::try_new(&proj_exprs,
&schema)?;
+ let output_schema = output_schema(&projection_mapping, &schema)?;
+
+ let col_a_plus_b_new = &col("a+b", &output_schema)?;
+ let col_c_new = &col("c_new", &output_schema)?;
+ let col_d_new = &col("d_new", &output_schema)?;
+
+ let test_cases = vec![
+ // ---------- TEST CASE 1 ------------
+ (
+ // orderings
+ vec![
+ // [d ASC, b ASC]
+ vec![(col_d, option_asc), (col_b, option_asc)],
+ // [c ASC, a ASC]
+ vec![(col_c, option_asc), (col_a, option_asc)],
+ ],
+ // equal conditions
+ vec![],
+ // expected
+ vec![
+ // [d_new ASC, c_new ASC, a+b ASC]
+ vec![
+ (col_d_new, option_asc),
+ (col_c_new, option_asc),
+ (col_a_plus_b_new, option_asc),
+ ],
+ // [c_new ASC, d_new ASC, a+b ASC]
+ vec![
+ (col_c_new, option_asc),
+ (col_d_new, option_asc),
+ (col_a_plus_b_new, option_asc),
+ ],
+ ],
+ ),
+ // ---------- TEST CASE 2 ------------
+ (
+ // orderings
+ vec![
+ // [d ASC, b ASC]
+ vec![(col_d, option_asc), (col_b, option_asc)],
+ // [c ASC, e ASC], Please note that a=e
+ vec![(col_c, option_asc), (col_e, option_asc)],
+ ],
+ // equal conditions
+ vec![(col_e, col_a)],
+ // expected
+ vec![
+ // [d_new ASC, c_new ASC, a+b ASC]
+ vec![
+ (col_d_new, option_asc),
+ (col_c_new, option_asc),
+ (col_a_plus_b_new, option_asc),
+ ],
+ // [c_new ASC, d_new ASC, a+b ASC]
+ vec![
+ (col_c_new, option_asc),
+ (col_d_new, option_asc),
+ (col_a_plus_b_new, option_asc),
+ ],
+ ],
+ ),
+ // ---------- TEST CASE 3 ------------
+ (
+ // orderings
+ vec![
+ // [d ASC, b ASC]
+ vec![(col_d, option_asc), (col_b, option_asc)],
+ // [c ASC, e ASC], Please note that a=f
+ vec![(col_c, option_asc), (col_e, option_asc)],
+ ],
+ // equal conditions
+ vec![(col_a, col_f)],
+ // expected
+ vec![
+ // [d_new ASC]
+ vec![(col_d_new, option_asc)],
+ // [c_new ASC]
+ vec![(col_c_new, option_asc)],
+ ],
+ ),
+ ];
+ for (orderings, equal_columns, expected) in test_cases {
+ let mut eq_properties = EquivalenceProperties::new(schema.clone());
+ for (lhs, rhs) in equal_columns {
+ eq_properties.add_equal_conditions(lhs, rhs);
+ }
+
+ let orderings = convert_to_orderings(&orderings);
+ eq_properties.add_new_orderings(orderings);
+
+ let expected = convert_to_orderings(&expected);
+
+ let projected_eq =
+ eq_properties.project(&projection_mapping,
output_schema.clone());
+ let orderings = projected_eq.oeq_class();
+
+ let err_msg = format!(
+ "actual: {:?}, expected: {:?}, projection_mapping: {:?}",
+ orderings.orderings, expected, projection_mapping
+ );
+
+ assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
+ for expected_ordering in &expected {
+ assert!(orderings.contains(expected_ordering), "{}", err_msg)
+ }
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn project_orderings_random() -> Result<()> {
+ const N_RANDOM_SCHEMA: usize = 20;
+ const N_ELEMENTS: usize = 125;
+ const N_DISTINCT: usize = 5;
+
+ for seed in 0..N_RANDOM_SCHEMA {
+ // Create a random schema with random properties
+ let (test_schema, eq_properties) = create_random_schema(seed as
u64)?;
+ // Generate a data that satisfies properties given
+ let table_data_with_properties =
+ generate_table_for_eq_properties(&eq_properties, N_ELEMENTS,
N_DISTINCT)?;
+ // Floor(a)
+ let floor_a = create_physical_expr(
+ &BuiltinScalarFunction::Floor,
+ &[col("a", &test_schema)?],
+ &test_schema,
+ &ExecutionProps::default(),
+ )?;
+ // a + b
+ let a_plus_b = Arc::new(BinaryExpr::new(
+ col("a", &test_schema)?,
+ Operator::Plus,
+ col("b", &test_schema)?,
+ )) as Arc<dyn PhysicalExpr>;
+ let proj_exprs = vec![
+ (col("a", &test_schema)?, "a_new"),
+ (col("b", &test_schema)?, "b_new"),
+ (col("c", &test_schema)?, "c_new"),
+ (col("d", &test_schema)?, "d_new"),
+ (col("e", &test_schema)?, "e_new"),
+ (col("f", &test_schema)?, "f_new"),
+ (floor_a, "floor(a)"),
+ (a_plus_b, "a+b"),
+ ];
+
+ for n_req in 0..=proj_exprs.len() {
+ for proj_exprs in proj_exprs.iter().combinations(n_req) {
+ let proj_exprs = proj_exprs
+ .into_iter()
+ .map(|(expr, name)| (expr.clone(), name.to_string()))
+ .collect::<Vec<_>>();
+ let (projected_batch, projected_eq) = apply_projection(
+ proj_exprs.clone(),
+ &table_data_with_properties,
+ &eq_properties,
+ )?;
+
+ // Make sure each ordering after projection is valid.
+ for ordering in projected_eq.oeq_class().iter() {
+ let err_msg = format!(
+ "Error in test case ordering:{:?},
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?},
eq_properties.constants: {:?}, proj_exprs: {:?}",
+ ordering, eq_properties.oeq_class,
eq_properties.eq_group, eq_properties.constants, proj_exprs
+ );
+ // Since ordered section satisfies schema, we expect
+ // that result will be same after sort (e.g sort was
unnecessary).
+ assert!(
+ is_table_same_after_sort(
+ ordering.clone(),
+ projected_batch.clone(),
+ )?,
+ "{}",
+ err_msg
+ );
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn ordering_satisfy_after_projection_random() -> Result<()> {
+ const N_RANDOM_SCHEMA: usize = 20;
+ const N_ELEMENTS: usize = 125;
+ const N_DISTINCT: usize = 5;
+ const SORT_OPTIONS: SortOptions = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ for seed in 0..N_RANDOM_SCHEMA {
+ // Create a random schema with random properties
+ let (test_schema, eq_properties) = create_random_schema(seed as
u64)?;
+ // Generate a data that satisfies properties given
+ let table_data_with_properties =
+ generate_table_for_eq_properties(&eq_properties, N_ELEMENTS,
N_DISTINCT)?;
+ // Floor(a)
+ let floor_a = create_physical_expr(
+ &BuiltinScalarFunction::Floor,
+ &[col("a", &test_schema)?],
+ &test_schema,
+ &ExecutionProps::default(),
+ )?;
+ // a + b
+ let a_plus_b = Arc::new(BinaryExpr::new(
+ col("a", &test_schema)?,
+ Operator::Plus,
+ col("b", &test_schema)?,
+ )) as Arc<dyn PhysicalExpr>;
+ let proj_exprs = vec![
+ (col("a", &test_schema)?, "a_new"),
+ (col("b", &test_schema)?, "b_new"),
+ (col("c", &test_schema)?, "c_new"),
+ (col("d", &test_schema)?, "d_new"),
+ (col("e", &test_schema)?, "e_new"),
+ (col("f", &test_schema)?, "f_new"),
+ (floor_a, "floor(a)"),
+ (a_plus_b, "a+b"),
+ ];
+
+ for n_req in 0..=proj_exprs.len() {
+ for proj_exprs in proj_exprs.iter().combinations(n_req) {
+ let proj_exprs = proj_exprs
+ .into_iter()
+ .map(|(expr, name)| (expr.clone(), name.to_string()))
+ .collect::<Vec<_>>();
+ let (projected_batch, projected_eq) = apply_projection(
+ proj_exprs.clone(),
+ &table_data_with_properties,
+ &eq_properties,
+ )?;
+
+ let projection_mapping =
+ ProjectionMapping::try_new(&proj_exprs, &test_schema)?;
+
+ let projected_exprs = projection_mapping
+ .iter()
+ .map(|(_source, target)| target.clone())
+ .collect::<Vec<_>>();
+
+ for n_req in 0..=projected_exprs.len() {
+ for exprs in
projected_exprs.iter().combinations(n_req) {
+ let requirement = exprs
+ .into_iter()
+ .map(|expr| PhysicalSortExpr {
+ expr: expr.clone(),
+ options: SORT_OPTIONS,
+ })
+ .collect::<Vec<_>>();
+ let expected = is_table_same_after_sort(
+ requirement.clone(),
+ projected_batch.clone(),
+ )?;
+ let err_msg = format!(
+ "Error in test case requirement:{:?},
expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?},
eq_properties.constants: {:?}, projected_eq.oeq_class: {:?},
projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping:
{:?}",
+ requirement, expected,
eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants,
projected_eq.oeq_class, projected_eq.eq_group, projected_eq.constants,
projection_mapping
+ );
+ // Check whether ordering_satisfy API result and
+ // experimental result matches.
+ assert_eq!(
+ projected_eq.ordering_satisfy(&requirement),
+ expected,
+ "{}",
+ err_msg
+ );
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_expr_consists_of_constants() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ Field::new("d", DataType::Int32, true),
+ Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None),
true),
+ ]));
+ let col_a = col("a", &schema)?;
+ let col_b = col("b", &schema)?;
+ let col_d = col("d", &schema)?;
+ let b_plus_d = Arc::new(BinaryExpr::new(
+ col_b.clone(),
+ Operator::Plus,
+ col_d.clone(),
+ )) as Arc<dyn PhysicalExpr>;
+
+ let constants = vec![col_a.clone(), col_b.clone()];
+ let expr = b_plus_d.clone();
+ assert!(!is_constant_recurse(&constants, &expr));
+
+ let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()];
+ let expr = b_plus_d.clone();
+ assert!(is_constant_recurse(&constants, &expr));
+ Ok(())
+ }
+
+ #[test]
+ fn test_join_equivalence_properties() -> Result<()> {
+ let schema = create_test_schema()?;
+ let col_a = &col("a", &schema)?;
+ let col_b = &col("b", &schema)?;
+ let col_c = &col("c", &schema)?;
+ let offset = schema.fields.len();
+ let col_a2 = &add_offset_to_expr(col_a.clone(), offset);
+ let col_b2 = &add_offset_to_expr(col_b.clone(), offset);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let test_cases = vec![
+ // ------- TEST CASE 1 --------
+ // [a ASC], [b ASC]
+ (
+ // [a ASC], [b ASC]
+ vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
+ // [a ASC], [b ASC]
+ vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
+ // expected [a ASC, a2 ASC], [a ASC, b2 ASC], [b ASC, a2 ASC],
[b ASC, b2 ASC]
+ vec![
+ vec![(col_a, option_asc), (col_a2, option_asc)],
+ vec![(col_a, option_asc), (col_b2, option_asc)],
+ vec![(col_b, option_asc), (col_a2, option_asc)],
+ vec![(col_b, option_asc), (col_b2, option_asc)],
+ ],
+ ),
+ // ------- TEST CASE 2 --------
+ // [a ASC], [b ASC]
+ (
+ // [a ASC], [b ASC], [c ASC]
+ vec![
+ vec![(col_a, option_asc)],
+ vec![(col_b, option_asc)],
+ vec![(col_c, option_asc)],
+ ],
+ // [a ASC], [b ASC]
+ vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
+ // expected [a ASC, a2 ASC], [a ASC, b2 ASC], [b ASC, a2 ASC],
[b ASC, b2 ASC], [c ASC, a2 ASC], [c ASC, b2 ASC]
+ vec![
+ vec![(col_a, option_asc), (col_a2, option_asc)],
+ vec![(col_a, option_asc), (col_b2, option_asc)],
+ vec![(col_b, option_asc), (col_a2, option_asc)],
+ vec![(col_b, option_asc), (col_b2, option_asc)],
+ vec![(col_c, option_asc), (col_a2, option_asc)],
+ vec![(col_c, option_asc), (col_b2, option_asc)],
+ ],
+ ),
+ ];
+ for (left_orderings, right_orderings, expected) in test_cases {
+ let mut left_eq_properties =
EquivalenceProperties::new(schema.clone());
+ let mut right_eq_properties =
EquivalenceProperties::new(schema.clone());
+ let left_orderings = convert_to_orderings(&left_orderings);
+ let right_orderings = convert_to_orderings(&right_orderings);
+ let expected = convert_to_orderings(&expected);
+ left_eq_properties.add_new_orderings(left_orderings);
+ right_eq_properties.add_new_orderings(right_orderings);
+ let join_eq = join_equivalence_properties(
+ left_eq_properties,
+ right_eq_properties,
+ &JoinType::Inner,
+ Arc::new(Schema::empty()),
+ &[true, false],
+ Some(JoinSide::Left),
+ &[],
+ );
+ let orderings = &join_eq.oeq_class.orderings;
+ let err_msg = format!("expected: {:?}, actual:{:?}", expected,
orderings);
+ assert_eq!(
+ join_eq.oeq_class.orderings.len(),
+ expected.len(),
+ "{}",
+ err_msg
+ );
+ for ordering in orderings {
+ assert!(
+ expected.contains(ordering),
+ "{}, ordering: {:?}",
+ err_msg,
+ ordering
+ );
+ }
+ }
Ok(())
}
}