mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112608135
##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -235,6 +268,186 @@ pub fn ordering_satisfy_concrete<F: FnOnce() ->
EquivalenceProperties>(
}
}
+/// Checks whether the required ordering requirements are satisfied by the
provided [PhysicalSortExpr]s.
+pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+ provided: Option<&[PhysicalSortExpr]>,
+ required: Option<&[PhysicalSortRequirements]>,
+ equal_properties: F,
+) -> bool {
+ match (provided, required) {
+ (_, None) => true,
+ (None, Some(_)) => false,
+ (Some(provided), Some(required)) => {
+ ordering_satisfy_requirement_concrete(provided, required,
equal_properties)
+ }
+ }
+}
+
+pub fn ordering_satisfy_requirement_concrete<F: FnOnce() ->
EquivalenceProperties>(
+ provided: &[PhysicalSortExpr],
+ required: &[PhysicalSortRequirements],
+ equal_properties: F,
+) -> bool {
+ if required.len() > provided.len() {
+ false
+ } else if required
+ .iter()
+ .zip(provided.iter())
+ .all(|(order1, order2)| order2.satisfy(order1))
+ {
+ true
+ } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+ let normalized_requirements = required
+ .iter()
+ .map(|e| {
+ normalize_sort_requirement_with_equivalence_properties(
+ e.clone(),
+ eq_classes,
+ )
+ })
+ .collect::<Vec<_>>();
+ let normalized_provided_exprs = provided
+ .iter()
+ .map(|e| {
+ normalize_sort_expr_with_equivalence_properties(e.clone(),
eq_classes)
+ })
+ .collect::<Vec<_>>();
+ normalized_requirements
+ .iter()
+ .zip(normalized_provided_exprs.iter())
+ .all(|(order1, order2)| order2.satisfy(order1))
+ } else {
+ false
+ }
+}
+
+/// Provided requirements are compatible with the required, which means the
provided requirements are equal or more specific than the required
+pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+ provided: Option<&[PhysicalSortRequirements]>,
+ required: Option<&[PhysicalSortRequirements]>,
+ equal_properties: F,
+) -> bool {
+ match (provided, required) {
+ (_, None) => true,
+ (None, Some(_)) => false,
+ (Some(provided), Some(required)) => {
+ if required.len() > provided.len() {
+ false
+ } else if required
+ .iter()
+ .zip(provided.iter())
+ .all(|(req, pro)| pro.compatible(req))
+ {
+ true
+ } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+ let normalized_required = required
+ .iter()
+ .map(|e| {
+ normalize_sort_requirement_with_equivalence_properties(
+ e.clone(),
+ eq_classes,
+ )
+ })
+ .collect::<Vec<_>>();
+ let normalized_provided = provided
+ .iter()
+ .map(|e| {
+ normalize_sort_requirement_with_equivalence_properties(
+ e.clone(),
+ eq_classes,
+ )
+ })
+ .collect::<Vec<_>>();
+ normalized_required
+ .iter()
+ .zip(normalized_provided.iter())
+ .all(|(req, pro)| pro.compatible(req))
+ } else {
+ false
+ }
+ }
+ }
+}
+
+pub fn map_columns_before_projection(
+ parent_required: &[Arc<dyn PhysicalExpr>],
+ proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+ let mut column_mapping = HashMap::new();
+ for (expression, name) in proj_exprs.iter() {
+ if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+ column_mapping.insert(name.clone(), column.clone());
+ };
+ }
+ let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
+ .iter()
+ .filter_map(|r| {
+ if let Some(column) = r.as_any().downcast_ref::<Column>() {
+ column_mapping.get(column.name())
+ } else {
+ None
+ }
+ })
+ .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
+ .collect::<Vec<_>>();
+ new_required
+}
+
+pub fn map_requirement_before_projection(
+ parent_required: Option<&[PhysicalSortRequirements]>,
+ proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Option<Vec<PhysicalSortRequirements>> {
+ if let Some(requirement) = parent_required {
+ let required_expr = create_sort_expr_from_requirement(requirement)
+ .iter()
+ .map(|sort_expr| sort_expr.expr.clone())
+ .collect::<Vec<_>>();
+ let new_exprs = map_columns_before_projection(&required_expr,
proj_exprs);
+ if new_exprs.len() == requirement.len() {
+ let new_request = new_exprs
+ .iter()
+ .zip(requirement.iter())
+ .map(|(new, old)| PhysicalSortRequirements {
+ expr: new.clone(),
+ sort_options: old.sort_options,
+ })
+ .collect::<Vec<_>>();
+ Some(new_request)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+}
+
+pub fn create_sort_expr_from_requirement(
+ required: &[PhysicalSortRequirements],
+) -> Vec<PhysicalSortExpr> {
+ let parent_required_expr = required
+ .iter()
+ .map(|prop| {
+ if prop.sort_options.is_some() {
+ PhysicalSortExpr {
+ expr: prop.expr.clone(),
+ options: prop.sort_options.unwrap(),
+ }
+ } else {
Review Comment:
I think we can use `If let Some(sort_options) = prop.sort_options` idiom
here. This would remove `.unwrap()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]