mustafasrepo commented on code in PR #7917:
URL: https://github.com/apache/arrow-datafusion/pull/7917#discussion_r1369829412
##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -885,6 +885,240 @@ fn req_satisfied(given: LexOrderingRef, req:
&[PhysicalSortRequirement]) -> bool
true
}
+/// Combine equivalence properties of the given join inputs.
+pub fn combine_join_equivalence_properties(
+ join_type: JoinType,
+ left_properties: EquivalenceProperties,
+ right_properties: EquivalenceProperties,
+ left_columns_len: usize,
+ on: &[(Column, Column)],
+ schema: SchemaRef,
+) -> EquivalenceProperties {
+ let mut new_properties = EquivalenceProperties::new(schema);
+ match join_type {
+ JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right =>
{
+ new_properties.extend(left_properties.classes().to_vec());
+ let new_right_properties = right_properties
+ .classes()
+ .iter()
+ .map(|prop| {
+ let new_head = Column::new(
+ prop.head().name(),
+ left_columns_len + prop.head().index(),
+ );
+ let new_others = prop
+ .others()
+ .iter()
+ .map(|col| {
+ Column::new(col.name(), left_columns_len +
col.index())
+ })
+ .collect::<Vec<_>>();
+ EquivalentClass::new(new_head, new_others)
+ })
+ .collect::<Vec<_>>();
+
+ new_properties.extend(new_right_properties);
+ }
+ JoinType::LeftSemi | JoinType::LeftAnti => {
+ new_properties.extend(left_properties.classes().to_vec())
+ }
+ JoinType::RightSemi | JoinType::RightAnti => {
+ new_properties.extend(right_properties.classes().to_vec())
+ }
+ }
+
+ if join_type == JoinType::Inner {
+ on.iter().for_each(|(column1, column2)| {
+ let new_column2 =
+ Column::new(column2.name(), left_columns_len +
column2.index());
+ new_properties.add_equal_conditions((column1, &new_column2))
+ })
+ }
+ new_properties
+}
+
+/// Calculate equivalence properties for the given cross join operation.
+pub fn cross_join_equivalence_properties(
+ left_properties: EquivalenceProperties,
+ right_properties: EquivalenceProperties,
+ left_columns_len: usize,
+ schema: SchemaRef,
+) -> EquivalenceProperties {
+ let mut new_properties = EquivalenceProperties::new(schema);
+ new_properties.extend(left_properties.classes().to_vec());
+ let new_right_properties = right_properties
+ .classes()
+ .iter()
+ .map(|prop| {
+ let new_head =
+ Column::new(prop.head().name(), left_columns_len +
prop.head().index());
+ let new_others = prop
+ .others()
+ .iter()
+ .map(|col| Column::new(col.name(), left_columns_len +
col.index()))
+ .collect::<Vec<_>>();
+ EquivalentClass::new(new_head, new_others)
+ })
+ .collect::<Vec<_>>();
+ new_properties.extend(new_right_properties);
+ new_properties
+}
+
+/// Update right table ordering equivalences so that:
+/// - They point to valid indices at the output of the join schema, and
+/// - They are normalized with respect to equivalence columns.
+///
+/// To do so, we increment column indices by the size of the left table when
+/// join schema consists of a combination of left and right schema (Inner,
+/// Left, Full, Right joins). Then, we normalize the sort expressions of
+/// ordering equivalences one by one. We make sure that each expression in the
+/// ordering equivalence is either:
+/// - The head of the one of the equivalent classes, or
+/// - Doesn't have an equivalent column.
+///
+/// This way; once we normalize an expression according to equivalence
properties,
+/// it can thereafter safely be used for ordering equivalence normalization.
+fn get_updated_right_ordering_equivalent_class(
+ join_type: &JoinType,
+ right_oeq_class: &OrderingEquivalentClass,
+ left_columns_len: usize,
+ join_eq_properties: &EquivalenceProperties,
+) -> Result<OrderingEquivalentClass> {
+ match join_type {
+ // In these modes, indices of the right schema should be offset by
+ // the left table size.
+ JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right =>
{
+ let right_oeq_class =
right_oeq_class.add_offset(left_columns_len)?;
+ return Ok(
+
right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)
+ );
+ }
+ _ => {}
+ };
+
Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties))
+}
+
+/// Calculate ordering equivalence properties for the given join operation.
+pub fn combine_join_ordering_equivalence_properties(
+ join_type: &JoinType,
+ left_oeq_properties: &OrderingEquivalenceProperties,
+ right_oeq_properties: &OrderingEquivalenceProperties,
+ schema: SchemaRef,
+ maintains_input_order: &[bool],
+ probe_side: Option<JoinSide>,
+ join_eq_properties: EquivalenceProperties,
+) -> Result<OrderingEquivalenceProperties> {
+ let mut new_properties = OrderingEquivalenceProperties::new(schema);
+ let left_columns_len = left_oeq_properties.schema().fields().len();
+ // All joins have 2 children
+ assert_eq!(maintains_input_order.len(), 2);
+ let left_maintains = maintains_input_order[0];
+ let right_maintains = maintains_input_order[1];
+ match (left_maintains, right_maintains) {
+ (true, true) => return plan_err!("Cannot maintain ordering of both
sides"),
+ (true, false) => {
+ // In this special case, right side ordering can be prefixed with
left side ordering.
+ if let (
+ Some(JoinSide::Left),
+ JoinType::Inner,
+ Some(left_oeq_class),
+ Some(right_oeq_class),
+ ) = (
+ probe_side,
+ join_type,
+ left_oeq_properties.oeq_class(),
+ right_oeq_properties.oeq_class(),
+ ) {
+ let updated_right_oeq =
get_updated_right_ordering_equivalent_class(
+ join_type,
+ right_oeq_class,
+ left_columns_len,
+ &join_eq_properties,
+ )?;
+
+ // Right side ordering equivalence properties should be
prepended with
+ // those of the left side while constructing output ordering
equivalence
+ // properties since stream side is the left side.
+ //
+ // If the right table ordering equivalences contain `b ASC`,
and the output
+ // ordering of the left table is `a ASC`, then the ordering
equivalence `b ASC`
+ // for the right table should be converted to `a ASC, b ASC`
before it is added
+ // to the ordering equivalences of the join.
+ let mut orderings = vec![];
+ for left_ordering in left_oeq_class.iter() {
+ for right_ordering in updated_right_oeq.iter() {
+ let mut ordering = left_ordering.to_vec();
+ ordering.extend(right_ordering.to_vec());
+ let ordering_normalized =
+ join_eq_properties.normalize_sort_exprs(&ordering);
+ orderings.push(ordering_normalized);
+ }
+ }
Review Comment:
Previously we were prefixing with left ordering. We now prefix with all of
the orderings inside left equivalence. We no longer use left output_ordering.
This is the only change. in this function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]