alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380428109


##########
datafusion/physical-plan/src/aggregates/order/mod.rs:
##########
@@ -18,13 +18,12 @@
 use arrow_array::ArrayRef;
 use arrow_schema::Schema;
 use datafusion_common::Result;
-use datafusion_physical_expr::EmitTo;
-
-use super::{AggregationOrdering, GroupByOrderMode};
+use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
 
 mod full;
 mod partial;
 
+use crate::windows::PartitionSearchMode;

Review Comment:
   This change effectively removes the duplication between `GroupByOrderMode` 
and `PartitionSearchMode` that represent the same thing right?
   
   I find `PartitionSearchMode` a confusing name as the term "partition" is 
pretty overloaded already (like each ExecutionPlan has input/output partitions, 
and WindowExec deals with partitions). 
   
   Also the fact it is in the `windows` module seems to be a mismatch given it 
is now used in the Aggregation logic
   
   Maybe it could go into its own module 🤔 and be called `SortOrderMode` or 
something
   
   Also, can we clarify what `PartitionSearchMode::PartiallySorted` means -- 
specifically, does it represent a prefix of the columns that is sorted?
   
   This could all be done as follow on PRs



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -331,16 +333,14 @@ impl GroupedHashAggregateStream {
         let reservation = MemoryConsumer::new(name)
             .with_can_spill(true)
             .register(context.memory_pool());
-
-        let group_ordering = agg
-            .aggregation_ordering
-            .as_ref()
-            .map(|aggregation_ordering| {
-                GroupOrdering::try_new(&group_schema, aggregation_ordering)
-            })
-            // return error if any
-            .transpose()?
-            .unwrap_or(GroupOrdering::None);
+        let (ordering, _) = agg

Review Comment:
   👌  very nice



##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -100,10 +100,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
         None
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   It seems like the equivalence properties would not be changed by 
`CoalescePartitionsExec` (sort order would , of course)



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -100,63 +93,20 @@ impl ProjectionExec {
             input_schema.metadata().clone(),
         ));
 
-        // construct a map from the input columns to the output columns of the 
Projection
-        let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
-        for (expr_idx, (expression, name)) in expr.iter().enumerate() {
-            if let Some(column) = expression.as_any().downcast_ref::<Column>() 
{
-                // For some executors, logical and physical plan schema fields
-                // are not the same. The information in a `Column` comes from
-                // the logical plan schema. Therefore, to produce correct 
results
-                // we use the field in the input schema with the same index. 
This
-                // corresponds to the physical plan `Column`.
-                let idx = column.index();
-                let matching_input_field = input_schema.field(idx);
-                let matching_input_column = 
Column::new(matching_input_field.name(), idx);
-                let entry = 
columns_map.entry(matching_input_column).or_default();
-                entry.push(Column::new(name, expr_idx));
-            };
-        }
-
-        // Output Ordering need to respect the alias
-        let child_output_ordering = input.output_ordering();
-        let output_ordering = match child_output_ordering {
-            Some(sort_exprs) => {
-                let normalized_exprs = sort_exprs
-                    .iter()
-                    .map(|sort_expr| {
-                        let expr = normalize_out_expr_with_columns_map(
-                            sort_expr.expr.clone(),
-                            &columns_map,
-                        );
-                        PhysicalSortExpr {
-                            expr,
-                            options: sort_expr.options,
-                        }
-                    })
-                    .collect::<Vec<_>>();
-                Some(normalized_exprs)
-            }
-            None => None,
-        };
-
-        let orderings = find_orderings_of_exprs(
-            &expr,
-            input.output_ordering(),
-            input.equivalence_properties(),
-            input.ordering_equivalence_properties(),
-        )?;
+        // construct a map from the input expressions to the output expression 
of the Projection
+        let projection_mapping = calculate_projection_mapping(&expr, 
&input_schema)?;

Review Comment:
   😍 



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -15,62 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties};
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::sort_properties::{ExprOrdering, SortProperties};
-use crate::update_ordering;
-use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
+use std::borrow::Borrow;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use crate::expressions::{BinaryExpr, Column};
+use crate::{PhysicalExpr, PhysicalSortExpr};
 
 use arrow::array::{make_array, Array, ArrayRef, BooleanArray, 
MutableArrayData};
 use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
 use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
-use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::Result;
 use datafusion_expr::Operator;
 
 use itertools::Itertools;
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
-use std::borrow::Borrow;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-
-/// Compare the two expr lists are equal no matter the order.
-/// For example two InListExpr can be considered to be equals no matter the 
order:
-///
-/// In('a','b','c') == In('c','b','a')
-pub fn expr_list_eq_any_order(

Review Comment:
   FWIW in this kind of change (moving functions to other modules and renaming 
them) is something that we could probably do as individual PRs that would be 
quick to review as they would be mostly mechanical. 
   
   That would help make it easier to find the parts of a PR such as this one 
that needed more careful review 



##########
datafusion/physical-plan/src/aggregates/order/mod.rs:
##########
@@ -18,13 +18,12 @@
 use arrow_array::ArrayRef;
 use arrow_schema::Schema;
 use datafusion_common::Result;
-use datafusion_physical_expr::EmitTo;
-
-use super::{AggregationOrdering, GroupByOrderMode};
+use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
 
 mod full;
 mod partial;
 
+use crate::windows::PartitionSearchMode;

Review Comment:
   This change effectively removes the duplication between `GroupByOrderMode` 
and `PartitionSearchMode` that represent the same thing right?
   
   I find `PartitionSearchMode` a confusing name as the term "partition" is 
pretty overloaded already (like each ExecutionPlan has input/output partitions, 
and WindowExec deals with partitions). 
   
   Also the fact it is in the `windows` module seems to be a mismatch given it 
is now used in the Aggregation logic
   
   Maybe it could go into its own module 🤔 and be called `SortOrderMode` or 
something
   
   Also, can we clarify what `PartitionSearchMode::PartiallySorted` means -- 
specifically, does it represent a prefix of the columns that is sorted?
   
   This could all be done as follow on PRs



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -100,63 +93,20 @@ impl ProjectionExec {
             input_schema.metadata().clone(),
         ));
 
-        // construct a map from the input columns to the output columns of the 
Projection
-        let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
-        for (expr_idx, (expression, name)) in expr.iter().enumerate() {
-            if let Some(column) = expression.as_any().downcast_ref::<Column>() 
{
-                // For some executors, logical and physical plan schema fields
-                // are not the same. The information in a `Column` comes from
-                // the logical plan schema. Therefore, to produce correct 
results
-                // we use the field in the input schema with the same index. 
This
-                // corresponds to the physical plan `Column`.
-                let idx = column.index();
-                let matching_input_field = input_schema.field(idx);
-                let matching_input_column = 
Column::new(matching_input_field.name(), idx);
-                let entry = 
columns_map.entry(matching_input_column).or_default();
-                entry.push(Column::new(name, expr_idx));
-            };
-        }
-
-        // Output Ordering need to respect the alias
-        let child_output_ordering = input.output_ordering();
-        let output_ordering = match child_output_ordering {
-            Some(sort_exprs) => {
-                let normalized_exprs = sort_exprs
-                    .iter()
-                    .map(|sort_expr| {
-                        let expr = normalize_out_expr_with_columns_map(
-                            sort_expr.expr.clone(),
-                            &columns_map,
-                        );
-                        PhysicalSortExpr {
-                            expr,
-                            options: sort_expr.options,
-                        }
-                    })
-                    .collect::<Vec<_>>();
-                Some(normalized_exprs)
-            }
-            None => None,
-        };
-
-        let orderings = find_orderings_of_exprs(
-            &expr,
-            input.output_ordering(),
-            input.equivalence_properties(),
-            input.ordering_equivalence_properties(),
-        )?;
+        // construct a map from the input expressions to the output expression 
of the Projection
+        let projection_mapping = calculate_projection_mapping(&expr, 
&input_schema)?;

Review Comment:
   😍 



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -191,18 +190,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
         distribution_from_join_type(&self.join_type)
     }
 
-    fn equivalence_properties(&self) -> EquivalenceProperties {

Review Comment:
   Shouldn't this still have `join_equivalent_properties` rather than being 
removed?



##########
datafusion/physical-plan/src/common.rs:
##########
@@ -373,6 +375,38 @@ pub fn batch_byte_size(batch: &RecordBatch) -> usize {
     batch.get_array_memory_size()
 }
 
+/// Constructs the mapping between a projection's input and output
+pub fn calculate_projection_mapping(

Review Comment:
   To make this easier to discover, perhaps it could be a method in 
`ProjectionMapping` such as `ProjectionMapping::try_new`



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -331,16 +333,14 @@ impl GroupedHashAggregateStream {
         let reservation = MemoryConsumer::new(name)
             .with_can_spill(true)
             .register(context.memory_pool());
-
-        let group_ordering = agg
-            .aggregation_ordering
-            .as_ref()
-            .map(|aggregation_ordering| {
-                GroupOrdering::try_new(&group_schema, aggregation_ordering)
-            })
-            // return error if any
-            .transpose()?
-            .unwrap_or(GroupOrdering::None);
+        let (ordering, _) = agg

Review Comment:
   👌  very nice



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -60,15 +56,12 @@ pub struct ProjectionExec {
     input: Arc<dyn ExecutionPlan>,
     /// The output ordering
     output_ordering: Option<Vec<PhysicalSortExpr>>,
-    /// The columns map used to normalize out expressions like Partitioning 
and PhysicalSortExpr
-    /// The key is the column from the input schema and the values are the 
columns from the output schema
-    columns_map: HashMap<Column, Vec<Column>>,
+    /// The mapping used to normalize expressions like Partitioning and
+    /// PhysicalSortExpr. The key is the expression from the input schema
+    /// and the value is the expression from the output schema.
+    projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,

Review Comment:
   This is another pattern that has appeared more than once (also in GroupBy) 
-- maybe it could be `ProjectionMapping` (which could also be made into a 
struct w/ a constructor)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to