mingmwang commented on code in PR #5290: URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1114107508
########## datafusion/physical-expr/src/utils.rs: ########## @@ -235,6 +268,186 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>( } } +/// Checks whether the required ordering requirements are satisfied by the provided [PhysicalSortExpr]s. +pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>( + provided: Option<&[PhysicalSortExpr]>, + required: Option<&[PhysicalSortRequirements]>, + equal_properties: F, +) -> bool { + match (provided, required) { + (_, None) => true, + (None, Some(_)) => false, + (Some(provided), Some(required)) => { + ordering_satisfy_requirement_concrete(provided, required, equal_properties) + } + } +} + +pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalenceProperties>( + provided: &[PhysicalSortExpr], + required: &[PhysicalSortRequirements], + equal_properties: F, +) -> bool { + if required.len() > provided.len() { + false + } else if required + .iter() + .zip(provided.iter()) + .all(|(order1, order2)| order2.satisfy(order1)) + { + true + } else if let eq_classes @ [_, ..] = equal_properties().classes() { + let normalized_requirements = required + .iter() + .map(|e| { + normalize_sort_requirement_with_equivalence_properties( + e.clone(), + eq_classes, + ) + }) + .collect::<Vec<_>>(); + let normalized_provided_exprs = provided + .iter() + .map(|e| { + normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes) + }) + .collect::<Vec<_>>(); + normalized_requirements + .iter() + .zip(normalized_provided_exprs.iter()) + .all(|(order1, order2)| order2.satisfy(order1)) + } else { + false + } +} + +/// Provided requirements are compatible with the required, which means the provided requirements are equal or more specific than the required +pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>( + provided: Option<&[PhysicalSortRequirements]>, + required: Option<&[PhysicalSortRequirements]>, + equal_properties: F, +) -> bool { + match (provided, required) { + (_, None) => true, + (None, Some(_)) => false, + (Some(provided), Some(required)) => { + if required.len() > provided.len() { + false + } else if required + .iter() + .zip(provided.iter()) + .all(|(req, pro)| pro.compatible(req)) + { + true + } else if let eq_classes @ [_, ..] = equal_properties().classes() { + let normalized_required = required + .iter() + .map(|e| { + normalize_sort_requirement_with_equivalence_properties( + e.clone(), + eq_classes, + ) + }) + .collect::<Vec<_>>(); + let normalized_provided = provided + .iter() + .map(|e| { + normalize_sort_requirement_with_equivalence_properties( + e.clone(), + eq_classes, + ) + }) + .collect::<Vec<_>>(); + normalized_required + .iter() + .zip(normalized_provided.iter()) + .all(|(req, pro)| pro.compatible(req)) + } else { + false + } + } + } +} + +pub fn map_columns_before_projection( + parent_required: &[Arc<dyn PhysicalExpr>], + proj_exprs: &[(Arc<dyn PhysicalExpr>, String)], +) -> Vec<Arc<dyn PhysicalExpr>> { + let mut column_mapping = HashMap::new(); + for (expression, name) in proj_exprs.iter() { + if let Some(column) = expression.as_any().downcast_ref::<Column>() { + column_mapping.insert(name.clone(), column.clone()); + }; + } + let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required + .iter() + .filter_map(|r| { + if let Some(column) = r.as_any().downcast_ref::<Column>() { + column_mapping.get(column.name()) + } else { + None + } + }) + .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>) + .collect::<Vec<_>>(); + new_required +} + +pub fn map_requirement_before_projection( + parent_required: Option<&[PhysicalSortRequirements]>, + proj_exprs: &[(Arc<dyn PhysicalExpr>, String)], +) -> Option<Vec<PhysicalSortRequirements>> { + if let Some(requirement) = parent_required { + let required_expr = create_sort_expr_from_requirement(requirement) + .iter() + .map(|sort_expr| sort_expr.expr.clone()) + .collect::<Vec<_>>(); + let new_exprs = map_columns_before_projection(&required_expr, proj_exprs); + if new_exprs.len() == requirement.len() { + let new_request = new_exprs + .iter() + .zip(requirement.iter()) + .map(|(new, old)| PhysicalSortRequirements { + expr: new.clone(), + sort_options: old.sort_options, + }) + .collect::<Vec<_>>(); + Some(new_request) + } else { + None + } + } else { + None + } +} + +pub fn create_sort_expr_from_requirement( + required: &[PhysicalSortRequirements], +) -> Vec<PhysicalSortExpr> { + let parent_required_expr = required + .iter() + .map(|prop| { + if prop.sort_options.is_some() { + PhysicalSortExpr { + expr: prop.expr.clone(), + options: prop.sort_options.unwrap(), + } + } else { Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org