alamb commented on code in PR #6566:
URL: https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1225302032
##########
datafusion/sql/src/select.rs:
##########
@@ -555,3 +565,288 @@ fn match_window_definitions(
}
Ok(())
}
+
+fn create_function_physical_name(
+ fun: &str,
+ distinct: bool,
+ args: &[Expr],
+) -> Result<String> {
+ let names: Vec<String> = args
+ .iter()
+ .map(|e| create_physical_name(e, false))
+ .collect::<Result<_>>()?;
+
+ let distinct_str = match distinct {
+ true => "DISTINCT ",
+ false => "",
+ };
+ Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
+}
+
+fn physical_name(e: &Expr) -> Result<String> {
+ create_physical_name(e, true)
+}
+
+fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
+ match e {
+ Expr::Column(c) => {
+ if is_first_expr {
+ Ok(c.name.clone())
+ } else {
+ Ok(c.flat_name())
+ }
+ }
+ Expr::Alias(_, name) => Ok(name.clone()),
+ Expr::ScalarVariable(_, variable_names) =>
Ok(variable_names.join(".")),
+ Expr::Literal(value) => Ok(format!("{value:?}")),
+ Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
+ let left = create_physical_name(left, false)?;
+ let right = create_physical_name(right, false)?;
+ Ok(format!("{left} {op} {right}"))
+ }
+ Expr::Case(case) => {
Review Comment:
For example, this appears to the same code as
https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/physical-expr/src/expressions/case.rs#L66-L77
If we ever changed the code in phsical-expr and did not change this code,
would that cause problems?
##########
datafusion/sql/src/select.rs:
##########
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
.collect::<Result<Vec<Expr>>>()?;
-
+ if select.into.is_some() {
+ for expr in select_exprs_post_aggr.iter_mut() {
+ if let Expr::Column(_) = expr.clone() {
+ *expr = expr.clone().alias(physical_name(expr)?);
+ }
+ }
+ }
Review Comment:
I am sorry if my past comments have been confusing. Here is what I was
trying to say earlier in
https://github.com/apache/arrow-datafusion/pull/6566#discussion_r1221355361:
I ran this command to get some logs (with the extra debug in PR
https://github.com/apache/arrow-datafusion/pull/6626):
```shell
RUST_LOG=debug cargo test --test sqllogictests -- ddl 2>&1 | tee
/tmp/debug.log
```
Here is the content of debug.log:
[debug.log](https://github.com/apache/arrow-datafusion/files/11712340/debug.log)
From the log, here is the `LogialPlan` that shows the `WindowAggr` declares
it makes a column named `SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` (yes that whole thing!)
```
Projection: SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, test_table.c2, test_table.c3
WindowAggr: windowExpr=[[SUM(test_table.c1) ORDER BY [test_table.c2
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: test_table projection=[c1, c2, c3]
```
Here is the final `ExecutionPlan`, also showing the same giant column as the
declared output name:
```
[2023-06-10T11:43:29Z DEBUG datafusion::physical_plan::planner] Optimized
physical plan:
ProjectionExec: expr=[SUM(test_table.c1) ORDER BY [test_table.c2 ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as
SUM(test_table.c1), c2@1 as c2, c3@2 as c3]
BoundedWindowAggExec: wdw=[SUM(test_table.c1): Ok(Field { name:
"SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Float32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortPreservingMergeExec: [c2@1 ASC NULLS LAST]
SortExec: expr=[c2@1 ASC NULLS LAST]
MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
```
However, looking at the logs, what the execution plan actually produces a
column named `SUM(test_table.c1)`:
```
[2023-06-10T11:43:29Z DEBUG datafusion::datasource::memory] mem schema does
not contain batches schema.
Target_schema: Schema { fields: [
Field { name: "SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }], metadata: {}
}.
Batches Schema: Schema { fields: [
Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} },
Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }], metadata: {}
}
```
Thus, what I was trying to say earlier is that I think the root of the
problem is the mismatch between what the plans say the field name of the output
is and what the field name that the WindowExec is actually producing.
So I think we should fix this bug by resolving the mismatch. Either:
1. Update the Logical/Physical plans so the field names of WindowAgg matches
what the `BoundedWindowAggExec` actually produces
2. OR Update `BoundedWindowAggExec` to produce the field names declared by
the `WindowAggExec
##########
datafusion/sql/src/select.rs:
##########
@@ -555,3 +565,288 @@ fn match_window_definitions(
}
Ok(())
}
+
+fn create_function_physical_name(
Review Comment:
Does this functionality need to remain in sync with the creation of physical
names?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]