alamb commented on code in PR #22161:
URL: https://github.com/apache/datafusion/pull/22161#discussion_r3326861282


##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -484,6 +504,135 @@ fn optimize_projections(
     }
 }
 
+fn remove_row_preserving_unused_unnest_from_duplicate_insensitive_input(

Review Comment:
   Some comments here might help future readers -- specifically an example SQL 
/ plan that is matched and the rewrite that is done



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -484,6 +504,135 @@ fn optimize_projections(
     }
 }
 
+fn remove_row_preserving_unused_unnest_from_duplicate_insensitive_input(
+    input: &LogicalPlan,
+    required_exprs: &[Expr],
+) -> Result<Option<LogicalPlan>> {
+    if required_exprs.iter().any(Expr::is_volatile) {
+        return Ok(None);
+    }
+
+    match input {
+        LogicalPlan::Unnest(unnest)
+            if can_remove_unused_unnest_for_exprs(unnest, required_exprs)? =>
+        {
+            Ok(Some(Arc::unwrap_or_clone(Arc::clone(&unnest.input))))

Review Comment:
   the unwrap_or_clone is never going to work as there is another reference 
(unnuest input)
   
   You would have to adjust this function to take the owned input which might 
be nice 



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -484,6 +504,135 @@ fn optimize_projections(
     }
 }
 
+fn remove_row_preserving_unused_unnest_from_duplicate_insensitive_input(
+    input: &LogicalPlan,
+    required_exprs: &[Expr],
+) -> Result<Option<LogicalPlan>> {
+    if required_exprs.iter().any(Expr::is_volatile) {
+        return Ok(None);
+    }
+
+    match input {
+        LogicalPlan::Unnest(unnest)
+            if can_remove_unused_unnest_for_exprs(unnest, required_exprs)? =>
+        {
+            Ok(Some(Arc::unwrap_or_clone(Arc::clone(&unnest.input))))
+        }
+        LogicalPlan::Projection(projection) => {
+            let LogicalPlan::Unnest(unnest) = projection.input.as_ref() else {
+                return Ok(None);
+            };
+            let required_projection_exprs = RequiredIndices::new()
+                .with_exprs(&projection.schema, required_exprs.iter())
+                .get_at_indices(&projection.expr);
+
+            if can_remove_unused_unnest_for_exprs(unnest, 
&required_projection_exprs)? {
+                Projection::try_new(required_projection_exprs, 
Arc::clone(&unnest.input))
+                    .map(LogicalPlan::Projection)
+                    .map(Some)
+            } else {
+                Ok(None)
+            }
+        }
+        _ => Ok(None),
+    }
+}
+
+fn can_remove_unused_unnest_for_exprs(unnest: &Unnest, exprs: &[Expr]) -> 
Result<bool> {
+    // This rewrite is only safe when UNNEST cannot eliminate input rows, no
+    // required expression depends on an unnested output column, and row
+    // multiplication cannot affect expression evaluation.
+    if !unnest_preserves_at_least_one_row_per_input(unnest)
+        || exprs.iter().any(Expr::is_volatile)
+    {
+        return Ok(false);
+    }
+
+    let mut columns = HashSet::new();
+    for expr in exprs {
+        expr_to_columns(expr, &mut columns)?;
+    }
+
+    for column in columns {
+        let output_index = unnest.schema.index_of_column(&column)?;
+        if is_unnested_input_index(unnest, 
unnest.dependency_indices[output_index]) {
+            return Ok(false);
+        }
+    }
+
+    Ok(true)
+}
+
+fn is_unnested_input_index(unnest: &Unnest, input_index: usize) -> bool {
+    unnest
+        .list_type_columns
+        .iter()
+        .any(|(idx, _)| *idx == input_index)
+        || unnest.struct_type_columns.contains(&input_index)
+}
+
+fn unnest_preserves_at_least_one_row_per_input(unnest: &Unnest) -> bool {
+    unnest.list_type_columns.iter().all(|(input_index, list)| {
+        list.depth == 1
+            && unnest_input_expr(unnest, *input_index)
+                .and_then(literal_non_empty_list)
+                .unwrap_or(false)
+    })
+}
+
+fn unnest_input_expr(unnest: &Unnest, input_index: usize) -> Option<&Expr> {
+    match unnest.input.as_ref() {
+        LogicalPlan::Projection(projection) => 
projection.expr.get(input_index),
+        _ => None,
+    }
+}
+
+fn literal_non_empty_list(expr: &Expr) -> Option<bool> {
+    let expr = match expr {
+        Expr::Alias(Alias { expr, .. }) => expr.as_ref(),
+        _ => expr,
+    };
+    let Expr::Literal(value, _) = expr else {
+        return None;
+    };
+
+    match value {
+        ScalarValue::List(array) => {
+            Some(has_valid_non_empty_first_value(array.as_ref(), || {
+                array.value_length(0) > 0
+            }))
+        }
+        ScalarValue::LargeList(array) => {
+            Some(has_valid_non_empty_first_value(array.as_ref(), || {
+                array.value_length(0) > 0
+            }))
+        }
+        ScalarValue::FixedSizeList(array) => {
+            Some(has_valid_non_empty_first_value(array.as_ref(), || {
+                array.value_length() > 0
+            }))
+        }
+        ScalarValue::ListView(array) => {
+            Some(has_valid_non_empty_first_value(array.as_ref(), || {
+                array.value_sizes()[0] > 0
+            }))
+        }
+        ScalarValue::LargeListView(array) => {
+            Some(has_valid_non_empty_first_value(array.as_ref(), || {
+                array.value_sizes()[0] > 0
+            }))
+        }
+        _ => None,
+    }
+}
+
+fn has_valid_non_empty_first_value(
+    array: &impl Array,
+    first_value_non_empty: impl FnOnce() -> bool,

Review Comment:
   whu does it need to be a closure? it would probably be shorter and easier to 
read if you eithe rjust passed in another boolean `first_value_non_empty` or 
just inlined this function 🤔 



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -484,6 +504,135 @@ fn optimize_projections(
     }
 }
 
+fn remove_row_preserving_unused_unnest_from_duplicate_insensitive_input(
+    input: &LogicalPlan,
+    required_exprs: &[Expr],
+) -> Result<Option<LogicalPlan>> {
+    if required_exprs.iter().any(Expr::is_volatile) {
+        return Ok(None);
+    }
+
+    match input {
+        LogicalPlan::Unnest(unnest)
+            if can_remove_unused_unnest_for_exprs(unnest, required_exprs)? =>
+        {
+            Ok(Some(Arc::unwrap_or_clone(Arc::clone(&unnest.input))))
+        }
+        LogicalPlan::Projection(projection) => {
+            let LogicalPlan::Unnest(unnest) = projection.input.as_ref() else {
+                return Ok(None);
+            };
+            let required_projection_exprs = RequiredIndices::new()
+                .with_exprs(&projection.schema, required_exprs.iter())
+                .get_at_indices(&projection.expr);
+
+            if can_remove_unused_unnest_for_exprs(unnest, 
&required_projection_exprs)? {
+                Projection::try_new(required_projection_exprs, 
Arc::clone(&unnest.input))
+                    .map(LogicalPlan::Projection)
+                    .map(Some)
+            } else {
+                Ok(None)
+            }
+        }
+        _ => Ok(None),
+    }
+}
+
+fn can_remove_unused_unnest_for_exprs(unnest: &Unnest, exprs: &[Expr]) -> 
Result<bool> {
+    // This rewrite is only safe when UNNEST cannot eliminate input rows, no
+    // required expression depends on an unnested output column, and row
+    // multiplication cannot affect expression evaluation.
+    if !unnest_preserves_at_least_one_row_per_input(unnest)
+        || exprs.iter().any(Expr::is_volatile)
+    {
+        return Ok(false);
+    }
+
+    let mut columns = HashSet::new();
+    for expr in exprs {
+        expr_to_columns(expr, &mut columns)?;
+    }
+
+    for column in columns {
+        let output_index = unnest.schema.index_of_column(&column)?;
+        if is_unnested_input_index(unnest, 
unnest.dependency_indices[output_index]) {
+            return Ok(false);
+        }
+    }
+
+    Ok(true)
+}
+
+fn is_unnested_input_index(unnest: &Unnest, input_index: usize) -> bool {
+    unnest
+        .list_type_columns
+        .iter()
+        .any(|(idx, _)| *idx == input_index)
+        || unnest.struct_type_columns.contains(&input_index)
+}
+
+fn unnest_preserves_at_least_one_row_per_input(unnest: &Unnest) -> bool {
+    unnest.list_type_columns.iter().all(|(input_index, list)| {
+        list.depth == 1
+            && unnest_input_expr(unnest, *input_index)
+                .and_then(literal_non_empty_list)
+                .unwrap_or(false)
+    })
+}
+
+fn unnest_input_expr(unnest: &Unnest, input_index: usize) -> Option<&Expr> {
+    match unnest.input.as_ref() {
+        LogicalPlan::Projection(projection) => 
projection.expr.get(input_index),
+        _ => None,
+    }
+}
+
+fn literal_non_empty_list(expr: &Expr) -> Option<bool> {

Review Comment:
   this also seems like it would be easier to find as a method on `Expr`



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -484,6 +504,135 @@ fn optimize_projections(
     }
 }
 
+fn remove_row_preserving_unused_unnest_from_duplicate_insensitive_input(
+    input: &LogicalPlan,
+    required_exprs: &[Expr],
+) -> Result<Option<LogicalPlan>> {

Review Comment:
   Could we use the `Trnasformed` structure here rather than Option? Then ou 
could probably do owned updates easier



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -484,6 +504,135 @@ fn optimize_projections(
     }
 }
 
+fn remove_row_preserving_unused_unnest_from_duplicate_insensitive_input(
+    input: &LogicalPlan,
+    required_exprs: &[Expr],
+) -> Result<Option<LogicalPlan>> {
+    if required_exprs.iter().any(Expr::is_volatile) {
+        return Ok(None);
+    }
+
+    match input {
+        LogicalPlan::Unnest(unnest)
+            if can_remove_unused_unnest_for_exprs(unnest, required_exprs)? =>
+        {
+            Ok(Some(Arc::unwrap_or_clone(Arc::clone(&unnest.input))))
+        }
+        LogicalPlan::Projection(projection) => {
+            let LogicalPlan::Unnest(unnest) = projection.input.as_ref() else {
+                return Ok(None);
+            };
+            let required_projection_exprs = RequiredIndices::new()
+                .with_exprs(&projection.schema, required_exprs.iter())
+                .get_at_indices(&projection.expr);
+
+            if can_remove_unused_unnest_for_exprs(unnest, 
&required_projection_exprs)? {
+                Projection::try_new(required_projection_exprs, 
Arc::clone(&unnest.input))
+                    .map(LogicalPlan::Projection)
+                    .map(Some)
+            } else {
+                Ok(None)
+            }
+        }
+        _ => Ok(None),
+    }
+}
+
+fn can_remove_unused_unnest_for_exprs(unnest: &Unnest, exprs: &[Expr]) -> 
Result<bool> {
+    // This rewrite is only safe when UNNEST cannot eliminate input rows, no
+    // required expression depends on an unnested output column, and row
+    // multiplication cannot affect expression evaluation.
+    if !unnest_preserves_at_least_one_row_per_input(unnest)
+        || exprs.iter().any(Expr::is_volatile)
+    {
+        return Ok(false);
+    }
+
+    let mut columns = HashSet::new();
+    for expr in exprs {
+        expr_to_columns(expr, &mut columns)?;
+    }
+
+    for column in columns {
+        let output_index = unnest.schema.index_of_column(&column)?;
+        if is_unnested_input_index(unnest, 
unnest.dependency_indices[output_index]) {
+            return Ok(false);
+        }
+    }
+
+    Ok(true)
+}
+
+fn is_unnested_input_index(unnest: &Unnest, input_index: usize) -> bool {
+    unnest
+        .list_type_columns
+        .iter()
+        .any(|(idx, _)| *idx == input_index)
+        || unnest.struct_type_columns.contains(&input_index)
+}
+
+fn unnest_preserves_at_least_one_row_per_input(unnest: &Unnest) -> bool {

Review Comment:
   It might be worth considering making these methods on `Unnest` so they are 
easier to discover



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to