This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d331fa2 fix alias (#1067)
d331fa2 is described below
commit d331fa2b87b0723eca486b1951a3f734ef6276a3
Author: carlos <[email protected]>
AuthorDate: Fri Oct 8 21:59:36 2021 +0800
fix alias (#1067)
---
ballista/rust/core/proto/ballista.proto | 3 +
.../rust/core/src/serde/logical_plan/from_proto.rs | 9 +-
.../rust/core/src/serde/logical_plan/to_proto.rs | 30 ++++---
datafusion/src/execution/context.rs | 16 ++--
datafusion/src/logical_plan/builder.rs | 78 ++++++++++-------
datafusion/src/logical_plan/plan.rs | 9 +-
.../src/optimizer/common_subexpr_eliminate.rs | 3 +
datafusion/src/optimizer/filter_push_down.rs | 1 +
datafusion/src/optimizer/limit_push_down.rs | 2 +
datafusion/src/optimizer/projection_push_down.rs | 3 +
datafusion/src/optimizer/utils.rs | 3 +-
datafusion/src/sql/planner.rs | 98 +++++++++++++---------
datafusion/tests/sql.rs | 38 +++++----
13 files changed, 184 insertions(+), 109 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index a3ed18a..47cc801 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -311,6 +311,9 @@ message AvroTableScanNode {
message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
+ oneof optional_alias {
+ string alias = 3;
+ }
}
message SelectionNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 9c7658c..c9ef97e 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -63,7 +63,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(input)
- .project(x)?
+ .project_with_alias(
+ x,
+ projection.optional_alias.as_ref().map(|a| match a {
+
protobuf::projection_node::OptionalAlias::Alias(alias) => {
+ alias.clone()
+ }
+ }),
+ )?
.build()
.map_err(|e| e.into())
}
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index f5c2414..bd7fc4d 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -801,19 +801,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)))
}
}
- LogicalPlan::Projection { expr, input, .. } => {
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type:
Some(LogicalPlanType::Projection(Box::new(
- protobuf::ProjectionNode {
- input: Some(Box::new(input.as_ref().try_into()?)),
- expr: expr
- .iter()
- .map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, BallistaError>>()?,
- },
- ))),
- })
- }
+ LogicalPlan::Projection {
+ expr, input, alias, ..
+ } => Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
+ protobuf::ProjectionNode {
+ input: Some(Box::new(input.as_ref().try_into()?)),
+ expr: expr.iter().map(|expr|
expr.try_into()).collect::<Result<
+ Vec<_>,
+ BallistaError,
+ >>(
+ )?,
+ optional_alias: alias
+ .clone()
+
.map(protobuf::projection_node::OptionalAlias::Alias),
+ },
+ ))),
+ }),
LogicalPlan::Filter { predicate, input } => {
let input: protobuf::LogicalPlanNode =
input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index de63925..7272e57 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -2047,7 +2047,7 @@ mod tests {
async fn join_partitioned() -> Result<()> {
// self join on partition id (workaround for duplicate column name)
let results = execute(
- "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) ON c1=id1",
+ "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON
c1=id1",
4,
)
.await?;
@@ -2080,7 +2080,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t as t2) \
+ JOIN (SELECT * FROM t) as t2 \
ON t1.nanos = t2.nanos",
)
.await
@@ -2090,7 +2090,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t as t2) \
+ JOIN (SELECT * FROM t) as t2 \
ON t1.micros = t2.micros",
)
.await
@@ -2100,7 +2100,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t as t2) \
+ JOIN (SELECT * FROM t) as t2 \
ON t1.millis = t2.millis",
)
.await
@@ -2967,12 +2967,12 @@ mod tests {
#[tokio::test]
async fn ctx_sql_should_optimize_plan() -> Result<()> {
let mut ctx = ExecutionContext::new();
- let plan1 =
- ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND
TRUE")?;
+ let plan1 = ctx
+ .create_logical_plan("SELECT * FROM (SELECT 1) AS one WHERE TRUE
AND TRUE")?;
let opt_plan1 = ctx.optimize(&plan1)?;
- let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
+ let plan2 = ctx.sql("SELECT * FROM (SELECT 1) AS one WHERE TRUE AND
TRUE")?;
assert_eq!(
format!("{:?}", opt_plan1),
@@ -3727,7 +3727,7 @@ mod tests {
SELECT i, 'a' AS cat FROM catalog_a.schema_a.table_a
UNION ALL
SELECT i, 'b' AS cat FROM catalog_b.schema_b.table_b
- )
+ ) AS all
GROUP BY cat
ORDER BY cat
",
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index c23102c..d4f941a 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -236,34 +236,22 @@ impl LogicalPlanBuilder {
Ok(Self::from(table_scan))
}
- /// Apply a projection.
- ///
- /// # Errors
- /// This function errors under any of the following conditions:
- /// * Two or more expressions have the same name
- /// * An invalid expression is used (e.g. a `sort` expression)
+ /// Apply a projection without alias.
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) ->
Result<Self> {
- let input_schema = self.plan.schema();
- let mut projected_expr = vec![];
- for e in expr {
- match e {
- Expr::Wildcard => {
- projected_expr.extend(expand_wildcard(input_schema,
&self.plan)?)
- }
- _ => projected_expr
- .push(columnize_expr(normalize_col(e, &self.plan)?,
input_schema)),
- }
- }
-
- validate_unique_names("Projections", projected_expr.iter(),
input_schema)?;
-
- let schema = DFSchema::new(exprlist_to_fields(&projected_expr,
input_schema)?)?;
+ self.project_with_alias(expr, None)
+ }
- Ok(Self::from(LogicalPlan::Projection {
- expr: projected_expr,
- input: Arc::new(self.plan.clone()),
- schema: DFSchemaRef::new(schema),
- }))
+ /// Apply a projection with alias
+ pub fn project_with_alias(
+ &self,
+ expr: impl IntoIterator<Item = Expr>,
+ alias: Option<String>,
+ ) -> Result<Self> {
+ Ok(Self::from(project_with_alias(
+ self.plan.clone(),
+ expr,
+ alias,
+ )?))
}
/// Apply a filter
@@ -477,12 +465,9 @@ impl LogicalPlanBuilder {
let group_expr = normalize_cols(group_expr, &self.plan)?;
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
let all_expr = group_expr.iter().chain(aggr_expr.iter());
-
validate_unique_names("Aggregations", all_expr.clone(),
self.plan.schema())?;
-
let aggr_schema =
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
-
Ok(Self::from(LogicalPlan::Aggregate {
input: Arc::new(self.plan.clone()),
group_expr,
@@ -615,6 +600,41 @@ pub fn union_with_alias(
})
}
+/// Project with optional alias
+/// # Errors
+/// This function errors under any of the following conditions:
+/// * Two or more expressions have the same name
+/// * An invalid expression is used (e.g. a `sort` expression)
+pub fn project_with_alias(
+ plan: LogicalPlan,
+ expr: impl IntoIterator<Item = Expr>,
+ alias: Option<String>,
+) -> Result<LogicalPlan> {
+ let input_schema = plan.schema();
+ let mut projected_expr = vec![];
+ for e in expr {
+ match e {
+ Expr::Wildcard => {
+ projected_expr.extend(expand_wildcard(input_schema, &plan)?)
+ }
+ _ => projected_expr
+ .push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
+ }
+ }
+ validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
+ let input_schema = DFSchema::new(exprlist_to_fields(&projected_expr,
input_schema)?)?;
+ let schema = match alias {
+ Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
+ None => input_schema,
+ };
+ Ok(LogicalPlan::Projection {
+ expr: projected_expr,
+ input: Arc::new(plan.clone()),
+ schema: DFSchemaRef::new(schema),
+ alias,
+ })
+}
+
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub(crate) fn expand_wildcard(
schema: &DFSchema,
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index 4b91ade..7552fc6 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -76,6 +76,8 @@ pub enum LogicalPlan {
input: Arc<LogicalPlan>,
/// The schema description of the output
schema: DFSchemaRef,
+ /// Projection output relation alias
+ alias: Option<String>,
},
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
@@ -723,7 +725,9 @@ impl LogicalPlan {
Ok(())
}
- LogicalPlan::Projection { ref expr, .. } => {
+ LogicalPlan::Projection {
+ ref expr, alias, ..
+ } => {
write!(f, "Projection: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
@@ -731,6 +735,9 @@ impl LogicalPlan {
}
write!(f, "{:?}", expr_item)?;
}
+ if let Some(a) = alias {
+ write!(f, ", alias={}", a)?;
+ }
Ok(())
}
LogicalPlan::Filter {
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 4bda181..f8381e8 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -81,6 +81,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
expr,
input,
schema,
+ alias,
} => {
let mut arrays = vec![];
for e in expr {
@@ -103,6 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
expr: new_expr.pop().unwrap(),
input: Arc::new(new_input),
schema: schema.clone(),
+ alias: alias.clone(),
})
}
LogicalPlan::Filter { predicate, input } => {
@@ -278,6 +280,7 @@ fn build_project_plan(
expr: project_exprs,
input: Arc::new(input),
schema: Arc::new(schema),
+ alias: None,
})
}
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index 7fc6540..1f82ca8 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -318,6 +318,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
input,
expr,
schema,
+ alias: _,
} => {
// A projection is filter-commutable, but re-writes all predicate
expressions
// collect projection.
diff --git a/datafusion/src/optimizer/limit_push_down.rs
b/datafusion/src/optimizer/limit_push_down.rs
index 46738c5..d02777c 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -80,6 +80,7 @@ fn limit_push_down(
expr,
input,
schema,
+ alias,
},
upper_limit,
) => {
@@ -93,6 +94,7 @@ fn limit_push_down(
execution_props,
)?),
schema: schema.clone(),
+ alias: alias.clone(),
})
}
(
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 927e3cb..0a84c9d 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -140,6 +140,7 @@ fn optimize_plan(
input,
expr,
schema,
+ alias,
} => {
// projection:
// * remove any expression that is not required
@@ -190,6 +191,7 @@ fn optimize_plan(
expr: new_expr,
input: Arc::new(new_input),
schema: DFSchemaRef::new(DFSchema::new(new_fields)?),
+ alias: alias.clone(),
})
}
}
@@ -744,6 +746,7 @@ mod tests {
expr,
input: Arc::new(table_scan),
schema: Arc::new(projected_schema),
+ alias: None,
};
assert_fields_eq(&plan, vec!["a", "b"]);
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index 435daef..6e64bf3 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -118,10 +118,11 @@ pub fn from_plan(
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
match plan {
- LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
+ LogicalPlan::Projection { schema, alias, .. } =>
Ok(LogicalPlan::Projection {
expr: expr.to_vec(),
input: Arc::new(inputs[0].clone()),
schema: schema.clone(),
+ alias: alias.clone(),
}),
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
predicate: expr[0].clone(),
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 4f50f6b..2db2b5c 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -64,6 +64,7 @@ use super::{
resolve_positions_to_exprs,
},
};
+use crate::logical_plan::builder::project_with_alias;
/// The ContextProvider trait allows the query planner to obtain meta-data
about tables and
/// functions referenced in SQL statements
@@ -158,7 +159,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
ctes: &mut HashMap<String, LogicalPlan>,
) -> Result<LogicalPlan> {
match set_expr {
- SetExpr::Select(s) => self.select_to_plan(s.as_ref(), ctes),
+ SetExpr::Select(s) => self.select_to_plan(s.as_ref(), ctes, alias),
SetExpr::SetOperation {
op,
left,
@@ -502,11 +503,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
relation: &TableFactor,
ctes: &mut HashMap<String, LogicalPlan>,
) -> Result<LogicalPlan> {
- let (plan, columns_alias) = match relation {
+ let (plan, alias) = match relation {
TableFactor::Table { name, alias, .. } => {
let table_name = name.to_string();
let cte = ctes.get(&table_name);
- let columns_alias = alias.clone().map(|x| x.columns);
(
match (
cte,
@@ -528,21 +528,38 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
name
))),
}?,
- columns_alias,
+ alias,
)
}
TableFactor::Derived {
subquery, alias, ..
- } => (
- self.query_to_plan_with_alias(
+ } => {
+ // if alias is None, return Err
+ if alias.is_none() {
+ return Err(DataFusionError::Plan(
+ "subquery in FROM must have an alias".to_string(),
+ ));
+ }
+ let logical_plan = self.query_to_plan_with_alias(
subquery,
alias.as_ref().map(|a| a.name.value.to_string()),
ctes,
- )?,
- alias.clone().map(|x| x.columns),
- ),
+ )?;
+ (
+ project_with_alias(
+ logical_plan.clone(),
+ logical_plan
+ .schema()
+ .fields()
+ .iter()
+ .map(|field| col(field.name())),
+ alias.as_ref().map(|a| a.name.value.to_string()),
+ )?,
+ alias,
+ )
+ }
TableFactor::NestedJoin(table_with_joins) => {
- (self.plan_table_with_joins(table_with_joins, ctes)?, None)
+ (self.plan_table_with_joins(table_with_joins, ctes)?, &None)
}
// @todo Support TableFactory::TableFunction?
_ => {
@@ -552,8 +569,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}
};
-
- if let Some(columns_alias) = columns_alias {
+ if let Some(alias) = alias {
+ let columns_alias = alias.clone().columns;
if columns_alias.is_empty() {
// sqlparser-rs encodes AS t as an empty list of column alias
Ok(plan)
@@ -564,15 +581,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
columns_alias.len(),
)));
} else {
- let fields = plan.schema().fields().clone();
- LogicalPlanBuilder::from(plan)
- .project(
- fields
+ Ok(LogicalPlanBuilder::from(plan.clone())
+ .project_with_alias(
+ plan.schema()
+ .fields()
.iter()
.zip(columns_alias.iter())
.map(|(field, ident)|
col(field.name()).alias(&ident.value)),
+ Some(alias.clone().name.value),
)?
- .build()
+ .build()?)
}
} else {
Ok(plan)
@@ -584,6 +602,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
select: &Select,
ctes: &mut HashMap<String, LogicalPlan>,
+ alias: Option<String>,
) -> Result<LogicalPlan> {
let plans = self.plan_from_tables(&select.from, ctes)?;
@@ -782,8 +801,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
plan
};
-
- self.project(plan, select_exprs_post_aggr)
+ project_with_alias(plan, select_exprs_post_aggr, alias)
}
/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
@@ -2033,12 +2051,14 @@ mod tests {
FROM (
SELECT first_name AS fn1, last_name, birth_date, age
FROM person
- )
- )";
- let expected = "Projection: #fn2, #person.last_name\
- \n Projection: #fn1 AS fn2, #person.last_name,
#person.birth_date\
- \n Projection: #person.first_name AS fn1,
#person.last_name, #person.birth_date, #person.age\
- \n TableScan: person projection=None";
+ ) AS a
+ ) AS b";
+ let expected = "Projection: #b.fn2, #b.last_name\
+ \n Projection: #b.fn2, #b.last_name, #b.birth_date,
alias=b\
+ \n Projection: #a.fn1 AS fn2, #a.last_name,
#a.birth_date, alias=b\
+ \n Projection: #a.fn1, #a.last_name,
#a.birth_date, #a.age, alias=a\
+ \n Projection: #person.first_name AS fn1,
#person.last_name, #person.birth_date, #person.age, alias=a\
+ \n TableScan: person projection=None";
quick_test(sql, expected);
}
@@ -2049,14 +2069,15 @@ mod tests {
SELECT first_name AS fn1, age
FROM person
WHERE age > 20
- )
+ ) AS a
WHERE fn1 = 'X' AND age < 30";
- let expected = "Projection: #fn1, #person.age\
- \n Filter: #fn1 = Utf8(\"X\") AND #person.age <
Int64(30)\
- \n Projection: #person.first_name AS fn1,
#person.age\
- \n Filter: #person.age > Int64(20)\
- \n TableScan: person projection=None";
+ let expected = "Projection: #a.fn1, #a.age\
+ \n Filter: #a.fn1 = Utf8(\"X\") AND #a.age <
Int64(30)\
+ \n Projection: #a.fn1, #a.age, alias=a\
+ \n Projection: #person.first_name AS fn1,
#person.age, alias=a\
+ \n Filter: #person.age > Int64(20)\
+ \n TableScan: person projection=None";
quick_test(sql, expected);
}
@@ -2065,8 +2086,8 @@ mod tests {
fn table_with_column_alias() {
let sql = "SELECT a, b, c
FROM lineitem l (a, b, c)";
- let expected = "Projection: #a, #b, #c\
- \n Projection: #l.l_item_id AS a, #l.l_description AS
b, #l.price AS c\
+ let expected = "Projection: #l.a, #l.b, #l.c\
+ \n Projection: #l.l_item_id AS a, #l.l_description AS
b, #l.price AS c, alias=l\
\n TableScan: l projection=None";
quick_test(sql, expected);
}
@@ -2394,11 +2415,12 @@ mod tests {
\n TableScan: person projection=None",
);
quick_test(
- "SELECT * FROM (SELECT first_name, last_name FROM person) GROUP BY
first_name, last_name",
- "Projection: #person.first_name, #person.last_name\
- \n Aggregate: groupBy=[[#person.first_name, #person.last_name]],
aggr=[[]]\
- \n Projection: #person.first_name, #person.last_name\
- \n TableScan: person projection=None",
+ "SELECT * FROM (SELECT first_name, last_name FROM person) AS a
GROUP BY first_name, last_name",
+ "Projection: #a.first_name, #a.last_name\
+ \n Aggregate: groupBy=[[#a.first_name, #a.last_name]], aggr=[[]]\
+ \n Projection: #a.first_name, #a.last_name, alias=a\
+ \n Projection: #person.first_name, #person.last_name,
alias=a\
+ \n TableScan: person projection=None",
);
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index e24540e..5ae3585 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -308,8 +308,8 @@ async fn csv_select_nested() -> Result<()> {
FROM aggregate_test_100
WHERE c1 = 'a' AND c2 >= 4
ORDER BY c2 ASC, c3 ASC
- )
- )";
+ ) AS a
+ ) AS b";
let actual = execute_to_batches(&mut ctx, sql).await;
let expected = vec![
"+----+----+------+",
@@ -600,7 +600,7 @@ async fn select_distinct_simple_4() {
async fn projection_same_fields() -> Result<()> {
let mut ctx = ExecutionContext::new();
- let sql = "select (1+1) as a from (select 1 as a);";
+ let sql = "select (1+1) as a from (select 1 as a) as b;";
let actual = execute_to_batches(&mut ctx, sql).await;
let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"];
@@ -2106,13 +2106,13 @@ async fn cross_join() {
);
// Two partitions (from UNION) on the left
- let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT
t1_id, t1_name FROM t1) t1 CROSS JOIN t2";
+ let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT
t1_id, t1_name FROM t1) AS t1 CROSS JOIN t2";
let actual = execute(&mut ctx, sql).await;
assert_eq!(4 * 4 * 2, actual.len());
// Two partitions (from UNION) on the right
- let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT
t2_name FROM t2 UNION ALL SELECT t2_name FROM t2)";
+ let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT
t2_name FROM t2 UNION ALL SELECT t2_name FROM t2) AS t2";
let actual = execute(&mut ctx, sql).await;
assert_eq!(4 * 4 * 2, actual.len());
@@ -2179,7 +2179,7 @@ async fn test_join_timestamp() -> Result<()> {
let sql = "SELECT * \
FROM timestamp as a \
- JOIN (SELECT * FROM timestamp as b) \
+ JOIN (SELECT * FROM timestamp) as b \
ON a.time = b.time \
ORDER BY a.time";
let actual = execute_to_batches(&mut ctx, sql).await;
@@ -2220,7 +2220,7 @@ async fn test_join_float32() -> Result<()> {
let sql = "SELECT * \
FROM population as a \
- JOIN (SELECT * FROM population as b) \
+ JOIN (SELECT * FROM population) as b \
ON a.population = b.population \
ORDER BY a.population";
let actual = execute_to_batches(&mut ctx, sql).await;
@@ -2261,7 +2261,7 @@ async fn test_join_float64() -> Result<()> {
let sql = "SELECT * \
FROM population as a \
- JOIN (SELECT * FROM population as b) \
+ JOIN (SELECT * FROM population) as b \
ON a.population = b.population \
ORDER BY a.population";
let actual = execute_to_batches(&mut ctx, sql).await;
@@ -2513,11 +2513,11 @@ async fn explain_analyze_baseline_metrics() {
FROM aggregate_test_100 \
WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
GROUP BY c1 \
- ORDER BY c1 ) \
+ ORDER BY c1 ) AS a \
UNION ALL \
SELECT 1 as cnt \
UNION ALL \
- SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) \
+ SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) AS b \
LIMIT 3";
println!("running query: {}", sql);
let plan = ctx.create_logical_plan(sql).unwrap();
@@ -2540,7 +2540,7 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
- "SortExec: [c1@1 ASC]",
+ "SortExec: [c1@0 ASC]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(
@@ -4664,9 +4664,9 @@ async fn
test_physical_plan_display_indent_multi_children() {
// ensure indenting works for nodes with multiple children
register_aggregate_csv(&mut ctx).unwrap();
let sql = "SELECT c1 \
- FROM (select c1 from aggregate_test_100)\
+ FROM (select c1 from aggregate_test_100) AS a \
JOIN\
- (select c1 as c2 from aggregate_test_100)\
+ (select c1 as c2 from aggregate_test_100) AS b \
ON c1=c2\
";
@@ -4681,13 +4681,15 @@ async fn
test_physical_plan_display_indent_multi_children() {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 0 }], 3)",
" ProjectionExec: expr=[c1@0 as c1]",
- " RepartitionExec: partitioning=RoundRobinBatch(3)",
- " CsvExec:
source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv:
[ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
+ " ProjectionExec: expr=[c1@0 as c1]",
+ " RepartitionExec: partitioning=RoundRobinBatch(3)",
+ " CsvExec:
source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv:
[ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c2\",
index: 0 }], 3)",
- " ProjectionExec: expr=[c1@0 as c2]",
- " RepartitionExec: partitioning=RoundRobinBatch(3)",
- " CsvExec:
source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv:
[ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
+ " ProjectionExec: expr=[c2@0 as c2]",
+ " ProjectionExec: expr=[c1@0 as c2]",
+ " RepartitionExec: partitioning=RoundRobinBatch(3)",
+ " CsvExec:
source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv:
[ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
];
let data_path = datafusion::test_util::arrow_test_data();