alamb commented on code in PR #8006:
URL: https://github.com/apache/arrow-datafusion/pull/8006#discussion_r1380428109
##########
datafusion/physical-plan/src/aggregates/order/mod.rs:
##########
@@ -18,13 +18,12 @@
use arrow_array::ArrayRef;
use arrow_schema::Schema;
use datafusion_common::Result;
-use datafusion_physical_expr::EmitTo;
-
-use super::{AggregationOrdering, GroupByOrderMode};
+use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
mod full;
mod partial;
+use crate::windows::PartitionSearchMode;
Review Comment:
This change effectively removes the duplication between `GroupByOrderMode`
and `PartitionSearchMode` that represent the same thing right?
I find `PartitionSearchMode` a confusing name as the term "partition" is
pretty overloaded already (like each ExecutionPlan has input/output partitions,
and WindowExec deals with partitions).
Also the fact it is in the `windows` module seems to be a mismatch given it
is now used in the Aggregation logic
Maybe it could go into its own module 🤔 and be called `SortOrderMode` or
something
Also, can we clarify what `PartitionSearchMode::PartiallySorted` means --
specifically, does it represent a prefix of the columns that is sorted?
This could all be done as follow on PRs
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -331,16 +333,14 @@ impl GroupedHashAggregateStream {
let reservation = MemoryConsumer::new(name)
.with_can_spill(true)
.register(context.memory_pool());
-
- let group_ordering = agg
- .aggregation_ordering
- .as_ref()
- .map(|aggregation_ordering| {
- GroupOrdering::try_new(&group_schema, aggregation_ordering)
- })
- // return error if any
- .transpose()?
- .unwrap_or(GroupOrdering::None);
+ let (ordering, _) = agg
Review Comment:
👌 very nice
##########
datafusion/physical-plan/src/coalesce_partitions.rs:
##########
@@ -100,10 +100,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
None
}
- fn equivalence_properties(&self) -> EquivalenceProperties {
Review Comment:
It seems like the equivalence properties would not be changed by
`CoalescePartitionsExec` (sort order would , of course)
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -100,63 +93,20 @@ impl ProjectionExec {
input_schema.metadata().clone(),
));
- // construct a map from the input columns to the output columns of the
Projection
- let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
- for (expr_idx, (expression, name)) in expr.iter().enumerate() {
- if let Some(column) = expression.as_any().downcast_ref::<Column>()
{
- // For some executors, logical and physical plan schema fields
- // are not the same. The information in a `Column` comes from
- // the logical plan schema. Therefore, to produce correct
results
- // we use the field in the input schema with the same index.
This
- // corresponds to the physical plan `Column`.
- let idx = column.index();
- let matching_input_field = input_schema.field(idx);
- let matching_input_column =
Column::new(matching_input_field.name(), idx);
- let entry =
columns_map.entry(matching_input_column).or_default();
- entry.push(Column::new(name, expr_idx));
- };
- }
-
- // Output Ordering need to respect the alias
- let child_output_ordering = input.output_ordering();
- let output_ordering = match child_output_ordering {
- Some(sort_exprs) => {
- let normalized_exprs = sort_exprs
- .iter()
- .map(|sort_expr| {
- let expr = normalize_out_expr_with_columns_map(
- sort_expr.expr.clone(),
- &columns_map,
- );
- PhysicalSortExpr {
- expr,
- options: sort_expr.options,
- }
- })
- .collect::<Vec<_>>();
- Some(normalized_exprs)
- }
- None => None,
- };
-
- let orderings = find_orderings_of_exprs(
- &expr,
- input.output_ordering(),
- input.equivalence_properties(),
- input.ordering_equivalence_properties(),
- )?;
+ // construct a map from the input expressions to the output expression
of the Projection
+ let projection_mapping = calculate_projection_mapping(&expr,
&input_schema)?;
Review Comment:
😍
##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -15,62 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties};
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::sort_properties::{ExprOrdering, SortProperties};
-use crate::update_ordering;
-use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
+use std::borrow::Borrow;
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use crate::expressions::{BinaryExpr, Column};
+use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::array::{make_array, Array, ArrayRef, BooleanArray,
MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
};
-use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_common::Result;
use datafusion_expr::Operator;
use itertools::Itertools;
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
-use std::borrow::Borrow;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::sync::Arc;
-
-/// Compare the two expr lists are equal no matter the order.
-/// For example two InListExpr can be considered to be equals no matter the
order:
-///
-/// In('a','b','c') == In('c','b','a')
-pub fn expr_list_eq_any_order(
Review Comment:
FWIW in this kind of change (moving functions to other modules and renaming
them) is something that we could probably do as individual PRs that would be
quick to review as they would be mostly mechanical.
That would help make it easier to find the parts of a PR such as this one
that needed more careful review
##########
datafusion/physical-plan/src/aggregates/order/mod.rs:
##########
@@ -18,13 +18,12 @@
use arrow_array::ArrayRef;
use arrow_schema::Schema;
use datafusion_common::Result;
-use datafusion_physical_expr::EmitTo;
-
-use super::{AggregationOrdering, GroupByOrderMode};
+use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
mod full;
mod partial;
+use crate::windows::PartitionSearchMode;
Review Comment:
This change effectively removes the duplication between `GroupByOrderMode`
and `PartitionSearchMode` that represent the same thing right?
I find `PartitionSearchMode` a confusing name as the term "partition" is
pretty overloaded already (like each ExecutionPlan has input/output partitions,
and WindowExec deals with partitions).
Also the fact it is in the `windows` module seems to be a mismatch given it
is now used in the Aggregation logic
Maybe it could go into its own module 🤔 and be called `SortOrderMode` or
something
Also, can we clarify what `PartitionSearchMode::PartiallySorted` means --
specifically, does it represent a prefix of the columns that is sorted?
This could all be done as follow on PRs
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -100,63 +93,20 @@ impl ProjectionExec {
input_schema.metadata().clone(),
));
- // construct a map from the input columns to the output columns of the
Projection
- let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
- for (expr_idx, (expression, name)) in expr.iter().enumerate() {
- if let Some(column) = expression.as_any().downcast_ref::<Column>()
{
- // For some executors, logical and physical plan schema fields
- // are not the same. The information in a `Column` comes from
- // the logical plan schema. Therefore, to produce correct
results
- // we use the field in the input schema with the same index.
This
- // corresponds to the physical plan `Column`.
- let idx = column.index();
- let matching_input_field = input_schema.field(idx);
- let matching_input_column =
Column::new(matching_input_field.name(), idx);
- let entry =
columns_map.entry(matching_input_column).or_default();
- entry.push(Column::new(name, expr_idx));
- };
- }
-
- // Output Ordering need to respect the alias
- let child_output_ordering = input.output_ordering();
- let output_ordering = match child_output_ordering {
- Some(sort_exprs) => {
- let normalized_exprs = sort_exprs
- .iter()
- .map(|sort_expr| {
- let expr = normalize_out_expr_with_columns_map(
- sort_expr.expr.clone(),
- &columns_map,
- );
- PhysicalSortExpr {
- expr,
- options: sort_expr.options,
- }
- })
- .collect::<Vec<_>>();
- Some(normalized_exprs)
- }
- None => None,
- };
-
- let orderings = find_orderings_of_exprs(
- &expr,
- input.output_ordering(),
- input.equivalence_properties(),
- input.ordering_equivalence_properties(),
- )?;
+ // construct a map from the input expressions to the output expression
of the Projection
+ let projection_mapping = calculate_projection_mapping(&expr,
&input_schema)?;
Review Comment:
😍
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -191,18 +190,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
distribution_from_join_type(&self.join_type)
}
- fn equivalence_properties(&self) -> EquivalenceProperties {
Review Comment:
Shouldn't this still have `join_equivalent_properties` rather than being
removed?
##########
datafusion/physical-plan/src/common.rs:
##########
@@ -373,6 +375,38 @@ pub fn batch_byte_size(batch: &RecordBatch) -> usize {
batch.get_array_memory_size()
}
+/// Constructs the mapping between a projection's input and output
+pub fn calculate_projection_mapping(
Review Comment:
To make this easier to discover, perhaps it could be a method in
`ProjectionMapping` such as `ProjectionMapping::try_new`
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -331,16 +333,14 @@ impl GroupedHashAggregateStream {
let reservation = MemoryConsumer::new(name)
.with_can_spill(true)
.register(context.memory_pool());
-
- let group_ordering = agg
- .aggregation_ordering
- .as_ref()
- .map(|aggregation_ordering| {
- GroupOrdering::try_new(&group_schema, aggregation_ordering)
- })
- // return error if any
- .transpose()?
- .unwrap_or(GroupOrdering::None);
+ let (ordering, _) = agg
Review Comment:
👌 very nice
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -60,15 +56,12 @@ pub struct ProjectionExec {
input: Arc<dyn ExecutionPlan>,
/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
- /// The columns map used to normalize out expressions like Partitioning
and PhysicalSortExpr
- /// The key is the column from the input schema and the values are the
columns from the output schema
- columns_map: HashMap<Column, Vec<Column>>,
+ /// The mapping used to normalize expressions like Partitioning and
+ /// PhysicalSortExpr. The key is the expression from the input schema
+ /// and the value is the expression from the output schema.
+ projection_mapping: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)>,
Review Comment:
This is another pattern that has appeared more than once (also in GroupBy)
-- maybe it could be `ProjectionMapping` (which could also be made into a
struct w/ a constructor)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]