alamb commented on code in PR #18176:
URL: https://github.com/apache/datafusion/pull/18176#discussion_r2447837798
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -127,42 +128,23 @@ impl ProjectionExec {
{
let input_schema = input.schema();
// convert argument to Vec<ProjectionExpr>
- let expr = expr.into_iter().map(Into::into).collect::<Vec<_>>();
+ let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
+ let projection = Projection::new(expr_vec);
- let fields: Result<Vec<Field>> = expr
- .iter()
- .map(|proj_expr| {
- let metadata = proj_expr
- .expr
- .return_field(&input_schema)?
- .metadata()
- .clone();
-
- let field = Field::new(
- &proj_expr.alias,
- proj_expr.expr.data_type(&input_schema)?,
- proj_expr.expr.nullable(&input_schema)?,
- )
- .with_metadata(metadata);
-
- Ok(field)
- })
- .collect();
-
- let schema = Arc::new(Schema::new_with_metadata(
- fields?,
- input_schema.metadata().clone(),
- ));
+ let schema = Arc::new(projection.project_schema(&input_schema)?);
// Construct a map from the input expressions to the output expression
of the Projection
let projection_mapping = ProjectionMapping::try_new(
- expr.iter().map(|p| (Arc::clone(&p.expr), p.alias.clone())),
+ projection
Review Comment:
as a follow on, this might be a nice candidate to move to Projection (namely
`fn projection_mapping()`)
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -291,7 +453,7 @@ impl ExecutionPlan for ProjectionExec {
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
- let all_simple_exprs = self.expr.iter().all(|proj_expr| {
+ let all_simple_exprs = self.projection.as_ref().iter().all(|proj_expr|
{
Review Comment:
You could also potentially add `Projection::iter()` to iterate directly over
exprs from a Projection to avoid the `as_ref()` which might make this code more
concise
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -232,6 +236,163 @@ impl From<(Arc<dyn PhysicalExpr>, String)> for
ProjectionExpr {
}
}
+impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
+ fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
+ Self::new(Arc::clone(&value.0), value.1.clone())
+ }
+}
+
+impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
+ fn from(value: ProjectionExpr) -> Self {
+ (value.expr, value.alias)
+ }
+}
+
+/// A collection of projection expressions.
+///
+/// This struct encapsulates multiple `ProjectionExpr` instances,
+/// representing a complete projection operation and provides
+/// methods to manipulate and analyze the projection as a whole.
+#[derive(Debug, Clone)]
+pub struct Projection {
Review Comment:
What would you think about moving `Projection` to its own module -- maybe in
`physical-expr-common/` 🤔
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -232,6 +236,163 @@ impl From<(Arc<dyn PhysicalExpr>, String)> for
ProjectionExpr {
}
}
+impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
+ fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
+ Self::new(Arc::clone(&value.0), value.1.clone())
+ }
+}
+
+impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
+ fn from(value: ProjectionExpr) -> Self {
+ (value.expr, value.alias)
+ }
+}
+
+/// A collection of projection expressions.
+///
+/// This struct encapsulates multiple `ProjectionExpr` instances,
+/// representing a complete projection operation and provides
+/// methods to manipulate and analyze the projection as a whole.
+#[derive(Debug, Clone)]
+pub struct Projection {
+ exprs: Vec<ProjectionExpr>,
+}
+
+impl From<Vec<ProjectionExpr>> for Projection {
+ fn from(value: Vec<ProjectionExpr>) -> Self {
+ Self { exprs: value }
+ }
+}
+
+impl From<&[ProjectionExpr]> for Projection {
+ fn from(value: &[ProjectionExpr]) -> Self {
+ Self {
+ exprs: value.to_vec(),
+ }
+ }
+}
+
+impl AsRef<[ProjectionExpr]> for Projection {
+ fn as_ref(&self) -> &[ProjectionExpr] {
+ &self.exprs
+ }
+}
+
+impl Projection {
+ pub fn new(exprs: Vec<ProjectionExpr>) -> Self {
+ Self { exprs }
+ }
+
+ /// Apply another projection on top of this projection, returning the
combined projection.
+ /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as
z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
+ /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0
as c2`.
+ ///
+ /// # Errors
+ /// This function returns an error if any expression in the `other`
projection cannot be
+ /// applied on top of this projection.
+ pub fn try_merge(&self, other: &Projection) -> Result<Projection> {
+ let mut new_exprs = Vec::with_capacity(other.exprs.len());
+ for proj_expr in &other.exprs {
+ let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
+ .ok_or_else(|| {
+ internal_datafusion_err!(
+ "Failed to combine projections: expression {} could
not be applied on top of existing projections {}",
+ proj_expr.expr,
+ self.exprs.iter().map(|e| format!("{e}")).join(", ")
+ )
+ })?;
+ new_exprs.push(ProjectionExpr {
+ expr: new_expr,
+ alias: proj_expr.alias.clone(),
+ });
+ }
+ Ok(Projection::new(new_exprs))
+ }
+
+ /// Extract the column indices used in this projection.
+ /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a`
is at index 0 and `b` is at index 1,
+ /// this function would return `[0, 1]`.
+ /// Repeated indices are returned only once, and the order is ascending.
+ pub fn column_indices(&self) -> Vec<usize> {
+ self.exprs
+ .iter()
+ .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col|
col.index()))
+ .sorted_unstable()
+ .dedup()
+ .collect_vec()
+ }
+
+ /// Project a schema according to this projection.
Review Comment:
this is very nice
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -308,8 +470,11 @@ impl ExecutionPlan for ProjectionExec {
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- ProjectionExec::try_new(self.expr.clone(), children.swap_remove(0))
- .map(|p| Arc::new(p) as _)
+ ProjectionExec::try_new(
+ self.projection.as_ref().to_vec(),
Review Comment:
If you made Projection implement into_iterator over ProjectionExpr, you
could probably simply pass in `self.projection.clone()` here which I think
would make for a nice interface
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -320,7 +485,12 @@ impl ExecutionPlan for ProjectionExec {
trace!("Start ProjectionExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
Ok(Box::pin(ProjectionStream {
schema: Arc::clone(&self.schema),
- expr: self.expr.iter().map(|x| Arc::clone(&x.expr)).collect(),
+ expr: self
Review Comment:
this pattern of getting the raw exprs also comes up a bunch -- maybe it is
worth its own method too: `Projection::expr_iter()` or something 🤔
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -750,9 +909,14 @@ pub fn update_expr(
if sync_with_child {
state = RewriteState::RewrittenValid;
// Update the index of `column`:
- Ok(Transformed::yes(Arc::clone(
- &projected_exprs[column.index()].expr,
- )))
+ let projected_expr =
projected_exprs.get(column.index()).ok_or_else(|| {
Review Comment:
we could also implment
[`Index`](https://doc.rust-lang.org/std/ops/trait.Index.html) to keep the old
syntax as well
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -1459,4 +1633,608 @@ mod tests {
);
assert!(stats.total_byte_size.is_exact().unwrap_or(false));
}
+
+ // Tests for Projection struct
+
+ #[test]
+ fn test_projection_new() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "b".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs.clone());
+ assert_eq!(projection.as_ref().len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_from_vec() -> Result<()> {
+ let exprs = vec![ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "x".to_string(),
+ }];
+ let projection: Projection = exprs.clone().into();
+ assert_eq!(projection.as_ref().len(), 1);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_as_ref() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col1", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col2", 1)),
+ alias: "col2".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs);
+ let as_ref: &[ProjectionExpr] = projection.as_ref();
+ assert_eq!(as_ref.len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_single_column() -> Result<()> {
Review Comment:
I am not sure this adds much compared to the multi-column test right below
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -1459,4 +1633,608 @@ mod tests {
);
assert!(stats.total_byte_size.is_exact().unwrap_or(false));
}
+
+ // Tests for Projection struct
+
+ #[test]
+ fn test_projection_new() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "b".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs.clone());
+ assert_eq!(projection.as_ref().len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_from_vec() -> Result<()> {
+ let exprs = vec![ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "x".to_string(),
+ }];
+ let projection: Projection = exprs.clone().into();
+ assert_eq!(projection.as_ref().len(), 1);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_as_ref() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col1", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col2", 1)),
+ alias: "col2".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs);
+ let as_ref: &[ProjectionExpr] = projection.as_ref();
+ assert_eq!(as_ref.len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_single_column() -> Result<()> {
+ let projection = Projection::new(vec![ProjectionExpr {
+ expr: Arc::new(Column::new("a", 3)),
+ alias: "a".to_string(),
+ }]);
+ assert_eq!(projection.column_indices(), vec![3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_multiple_columns() -> Result<()> {
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 2)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![0, 2, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_duplicates() -> Result<()> {
+ // Test that duplicate column indices appear only once
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a2", 1)), // duplicate index
+ alias: "a2".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_unsorted() -> Result<()> {
+ // Test that column indices are sorted in the output
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_complex_expr() -> Result<()> {
+ // Test with complex expressions containing multiple columns
+ let expr = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("b", 4)),
+ ));
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr,
+ alias: "sum".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "c".to_string(),
+ },
+ ]);
+ // Should return [1, 2, 4] - all columns used, sorted and deduplicated
+ assert_eq!(projection.column_indices(), vec![1, 2, 4]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_empty() -> Result<()> {
+ let projection = Projection::new(vec![]);
+ assert_eq!(projection.column_indices(), Vec::<usize>::new());
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_simple_columns() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 AS col1, y@1 AS col2
+ let top_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("y", 1)),
+ alias: "col2".to_string(),
+ },
+ ]);
+
+ // Merge should produce: SELECT c@2 AS col1, b@1 AS col2
+ let merged = base_projection.try_merge(&top_projection)?;
+ assert_eq!(merged.as_ref().len(), 2);
+
+ // Check first expression (col1 should reference c@2)
+ let col1_expr =
merged.as_ref()[0].expr.as_any().downcast_ref::<Column>();
+ assert!(col1_expr.is_some());
+ let col1 = col1_expr.unwrap();
+ assert_eq!(col1.name(), "c");
+ assert_eq!(col1.index(), 2);
+ assert_eq!(merged.as_ref()[0].alias, "col1");
+
+ // Check second expression (col2 should reference b@1)
+ let col2_expr =
merged.as_ref()[1].expr.as_any().downcast_ref::<Column>();
+ assert!(col2_expr.is_some());
+ let col2 = col2_expr.unwrap();
+ assert_eq!(col2.name(), "b");
+ assert_eq!(col2.index(), 1);
+ assert_eq!(merged.as_ref()[1].alias, "col2");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_with_expressions() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
Review Comment:
again, I recommending selecting in an order different than x,y,z to test
reordering
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -1459,4 +1633,608 @@ mod tests {
);
assert!(stats.total_byte_size.is_exact().unwrap_or(false));
}
+
+ // Tests for Projection struct
+
+ #[test]
+ fn test_projection_new() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "b".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs.clone());
+ assert_eq!(projection.as_ref().len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_from_vec() -> Result<()> {
+ let exprs = vec![ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "x".to_string(),
+ }];
+ let projection: Projection = exprs.clone().into();
+ assert_eq!(projection.as_ref().len(), 1);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_as_ref() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col1", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col2", 1)),
+ alias: "col2".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs);
+ let as_ref: &[ProjectionExpr] = projection.as_ref();
+ assert_eq!(as_ref.len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_single_column() -> Result<()> {
+ let projection = Projection::new(vec![ProjectionExpr {
+ expr: Arc::new(Column::new("a", 3)),
+ alias: "a".to_string(),
+ }]);
+ assert_eq!(projection.column_indices(), vec![3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_multiple_columns() -> Result<()> {
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 2)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![0, 2, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_duplicates() -> Result<()> {
+ // Test that duplicate column indices appear only once
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a2", 1)), // duplicate index
+ alias: "a2".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_unsorted() -> Result<()> {
+ // Test that column indices are sorted in the output
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_complex_expr() -> Result<()> {
+ // Test with complex expressions containing multiple columns
+ let expr = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("b", 4)),
+ ));
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr,
+ alias: "sum".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "c".to_string(),
+ },
+ ]);
+ // Should return [1, 2, 4] - all columns used, sorted and deduplicated
+ assert_eq!(projection.column_indices(), vec![1, 2, 4]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_empty() -> Result<()> {
+ let projection = Projection::new(vec![]);
+ assert_eq!(projection.column_indices(), Vec::<usize>::new());
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_simple_columns() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 AS col1, y@1 AS col2
+ let top_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("y", 1)),
+ alias: "col2".to_string(),
+ },
+ ]);
+
+ // Merge should produce: SELECT c@2 AS col1, b@1 AS col2
+ let merged = base_projection.try_merge(&top_projection)?;
+ assert_eq!(merged.as_ref().len(), 2);
+
+ // Check first expression (col1 should reference c@2)
+ let col1_expr =
merged.as_ref()[0].expr.as_any().downcast_ref::<Column>();
+ assert!(col1_expr.is_some());
+ let col1 = col1_expr.unwrap();
+ assert_eq!(col1.name(), "c");
+ assert_eq!(col1.index(), 2);
+ assert_eq!(merged.as_ref()[0].alias, "col1");
+
+ // Check second expression (col2 should reference b@1)
+ let col2_expr =
merged.as_ref()[1].expr.as_any().downcast_ref::<Column>();
+ assert!(col2_expr.is_some());
+ let col2 = col2_expr.unwrap();
+ assert_eq!(col2.name(), "b");
+ assert_eq!(col2.index(), 1);
+ assert_eq!(merged.as_ref()[1].alias, "col2");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_with_expressions() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
+ let top_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("x", 0)),
+ Operator::Plus,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
+ )),
+ alias: "c1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("y", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("z", 2)),
+ )),
+ alias: "c2".to_string(),
+ },
+ ]);
+
+ // Merge should produce: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
+ let merged = base_projection.try_merge(&top_projection)?;
+ assert_eq!(merged.as_ref().len(), 2);
+ assert_eq!(merged.as_ref()[0].alias, "c1");
+ assert_eq!(merged.as_ref()[1].alias, "c2");
+
+ // Check that the expressions are BinaryExpr (not just Column)
+ assert!(merged.as_ref()[0].expr.as_any().is::<BinaryExpr>());
+ assert!(merged.as_ref()[1].expr.as_any().is::<BinaryExpr>());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_docstring_example() -> Result<()> {
+ // Example from the docstring:
Review Comment:
How about you just make this a docstring test? That would ensure it is run
as well as improve the docs
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -1459,4 +1633,608 @@ mod tests {
);
assert!(stats.total_byte_size.is_exact().unwrap_or(false));
}
+
+ // Tests for Projection struct
+
+ #[test]
+ fn test_projection_new() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "b".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs.clone());
+ assert_eq!(projection.as_ref().len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_from_vec() -> Result<()> {
+ let exprs = vec![ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "x".to_string(),
+ }];
+ let projection: Projection = exprs.clone().into();
+ assert_eq!(projection.as_ref().len(), 1);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_as_ref() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col1", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col2", 1)),
+ alias: "col2".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs);
+ let as_ref: &[ProjectionExpr] = projection.as_ref();
+ assert_eq!(as_ref.len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_single_column() -> Result<()> {
+ let projection = Projection::new(vec![ProjectionExpr {
+ expr: Arc::new(Column::new("a", 3)),
+ alias: "a".to_string(),
+ }]);
+ assert_eq!(projection.column_indices(), vec![3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_multiple_columns() -> Result<()> {
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 2)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![0, 2, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_duplicates() -> Result<()> {
+ // Test that duplicate column indices appear only once
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a2", 1)), // duplicate index
+ alias: "a2".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_unsorted() -> Result<()> {
+ // Test that column indices are sorted in the output
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_complex_expr() -> Result<()> {
+ // Test with complex expressions containing multiple columns
+ let expr = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("b", 4)),
+ ));
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr,
+ alias: "sum".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "c".to_string(),
+ },
+ ]);
+ // Should return [1, 2, 4] - all columns used, sorted and deduplicated
+ assert_eq!(projection.column_indices(), vec![1, 2, 4]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_empty() -> Result<()> {
+ let projection = Projection::new(vec![]);
+ assert_eq!(projection.column_indices(), Vec::<usize>::new());
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_simple_columns() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 AS col1, y@1 AS col2
+ let top_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("y", 1)),
+ alias: "col2".to_string(),
+ },
+ ]);
+
+ // Merge should produce: SELECT c@2 AS col1, b@1 AS col2
+ let merged = base_projection.try_merge(&top_projection)?;
+ assert_eq!(merged.as_ref().len(), 2);
+
+ // Check first expression (col1 should reference c@2)
+ let col1_expr =
merged.as_ref()[0].expr.as_any().downcast_ref::<Column>();
+ assert!(col1_expr.is_some());
+ let col1 = col1_expr.unwrap();
+ assert_eq!(col1.name(), "c");
+ assert_eq!(col1.index(), 2);
+ assert_eq!(merged.as_ref()[0].alias, "col1");
+
+ // Check second expression (col2 should reference b@1)
+ let col2_expr =
merged.as_ref()[1].expr.as_any().downcast_ref::<Column>();
+ assert!(col2_expr.is_some());
+ let col2 = col2_expr.unwrap();
+ assert_eq!(col2.name(), "b");
+ assert_eq!(col2.index(), 1);
+ assert_eq!(merged.as_ref()[1].alias, "col2");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_with_expressions() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
+ let top_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("x", 0)),
+ Operator::Plus,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
+ )),
+ alias: "c1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("y", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("z", 2)),
+ )),
+ alias: "c2".to_string(),
+ },
+ ]);
+
+ // Merge should produce: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
+ let merged = base_projection.try_merge(&top_projection)?;
+ assert_eq!(merged.as_ref().len(), 2);
+ assert_eq!(merged.as_ref()[0].alias, "c1");
+ assert_eq!(merged.as_ref()[1].alias, "c2");
+
+ // Check that the expressions are BinaryExpr (not just Column)
+ assert!(merged.as_ref()[0].expr.as_any().is::<BinaryExpr>());
+ assert!(merged.as_ref()[1].expr.as_any().is::<BinaryExpr>());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_docstring_example() -> Result<()> {
+ // Example from the docstring:
+ // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
+ let top = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("x", 0)),
+ Operator::Plus,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
+ )),
+ alias: "c1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("y", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("z", 2)),
+ )),
+ alias: "c2".to_string(),
+ },
+ ]);
+
+ // Expected result: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
+ let result = base.try_merge(&top)?;
+
+ assert_eq!(result.as_ref().len(), 2);
+ assert_eq!(result.as_ref()[0].alias, "c1");
+ assert_eq!(result.as_ref()[1].alias, "c2");
+
+ Ok(())
+ }
+
+ #[test]
+ fn try_merge_error() {
+ // Create a base projection
+ let base = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ]);
+
+ // Create a top projection that references a non-existent column index
+ let top = Projection::new(vec![ProjectionExpr {
+ expr: Arc::new(Column::new("z", 5)), // Invalid index
+ alias: "result".to_string(),
+ }]);
+
+ // Attempt to merge and expect an error
+ let result = base.try_merge(&top);
+ assert!(result.is_err());
+
+ let err_msg = result.err().unwrap().to_string();
Review Comment:
I think checking for `is_err` is redundant (`err().unwrap()` will panic if
the result is not an err) -- something like this might be more concise:
```suggestion
let err_msg = base.try_merge(&top).unwrap_err().to_string();
```
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -453,6 +585,23 @@ struct ProjectionStream {
baseline_metrics: BaselineMetrics,
}
+impl ProjectionStream {
Review Comment:
I suggest moving this up with the previous `impl ProjectionStream` block
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -1459,4 +1633,608 @@ mod tests {
);
assert!(stats.total_byte_size.is_exact().unwrap_or(false));
}
+
+ // Tests for Projection struct
+
+ #[test]
+ fn test_projection_new() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "b".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs.clone());
+ assert_eq!(projection.as_ref().len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_from_vec() -> Result<()> {
+ let exprs = vec![ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "x".to_string(),
+ }];
+ let projection: Projection = exprs.clone().into();
+ assert_eq!(projection.as_ref().len(), 1);
+ Ok(())
+ }
+
+ #[test]
+ fn test_projection_as_ref() -> Result<()> {
+ let exprs = vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col1", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("col2", 1)),
+ alias: "col2".to_string(),
+ },
+ ];
+ let projection = Projection::new(exprs);
+ let as_ref: &[ProjectionExpr] = projection.as_ref();
+ assert_eq!(as_ref.len(), 2);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_single_column() -> Result<()> {
+ let projection = Projection::new(vec![ProjectionExpr {
+ expr: Arc::new(Column::new("a", 3)),
+ alias: "a".to_string(),
+ }]);
+ assert_eq!(projection.column_indices(), vec![3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_multiple_columns() -> Result<()> {
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 2)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![0, 2, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_duplicates() -> Result<()> {
+ // Test that duplicate column indices appear only once
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a2", 1)), // duplicate index
+ alias: "a2".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_unsorted() -> Result<()> {
+ // Test that column indices are sorted in the output
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 5)),
+ alias: "c".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ alias: "a".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 3)),
+ alias: "b".to_string(),
+ },
+ ]);
+ assert_eq!(projection.column_indices(), vec![1, 3, 5]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_complex_expr() -> Result<()> {
+ // Test with complex expressions containing multiple columns
+ let expr = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("b", 4)),
+ ));
+ let projection = Projection::new(vec![
+ ProjectionExpr {
+ expr,
+ alias: "sum".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "c".to_string(),
+ },
+ ]);
+ // Should return [1, 2, 4] - all columns used, sorted and deduplicated
+ assert_eq!(projection.column_indices(), vec![1, 2, 4]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_column_indices_empty() -> Result<()> {
+ let projection = Projection::new(vec![]);
+ assert_eq!(projection.column_indices(), Vec::<usize>::new());
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_simple_columns() -> Result<()> {
+ // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
+ let base_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ alias: "x".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ alias: "y".to_string(),
+ },
+ ProjectionExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ alias: "z".to_string(),
+ },
+ ]);
+
+ // Second projection: SELECT x@0 AS col1, y@1 AS col2
+ let top_projection = Projection::new(vec![
+ ProjectionExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ alias: "col1".to_string(),
+ },
+ ProjectionExpr {
Review Comment:
I recommend reversing the orders here (select col2, col1, rather than col1,
col2) to ensure the correct indexes are used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]