nseekhao opened a new issue, #6867: URL: https://github.com/apache/arrow-datafusion/issues/6867
### Is your feature request related to a problem or challenge?
If there is a `SubqueryAlias` relation, `datafusion-substrait` will bypass
it. This works for the producer, the generated Substrait plans are correct.
However, the DF plan generated with the consumer will be incorrect since it has
no way to distinguish between the different relations that read from the same
table.
This can be demonstrated in these examples:
```Rust
#[tokio::test]
async fn roundtrip_non_equi_inner_join_table_reuse() -> Result<()> {
roundtrip("SELECT d1.a FROM data d1 JOIN data d2 ON d1.e <> d2.e").await
}
//thread 'tests::roundtrip_non_equi_inner_join_table_reuse' panicked at
'assertion failed: `(left == right)`
// left: `"Projection: d1.a\n Inner Join: Filter: d1.e != d2.e\n
SubqueryAlias: d1\n TableScan: data projection=[a, e]\n SubqueryAlias:
d2\n TableScan: data projection=[e]"`,
// right: `"Projection: data.a\n Inner Join: \n Projection: data.a\n
Filter: data.e != data.e\n TableScan: data projection=[a, e],
partial_filters=[data.e != data.e]\n TableScan: data projection=[e]"`',
datafusion/substrait/tests/roundtrip_logical_plan.rs:628:9
```
The original DF plan is:
```Bash
Projection: d1.a
Inner Join: Filter: d1.e != d2.e
SubqueryAlias: d1
TableScan: data projection=[a, e]
SubqueryAlias: d2
TableScan: data projection=[e]
```
once this plan is fed through the producer, we get the correct Substrait
plan:
```Bash
Plan {
version: Some(
Version {
major_number: 0,
minor_number: 31,
patch_number: 0,
git_hash: "",
producer: "datafusion",
},
),
extension_uris: [],
extensions: [
SimpleExtensionDeclaration {
mapping_type: Some(
ExtensionFunction(
ExtensionFunction {
extension_uri_reference: 4294967295,
function_anchor: 0,
name: "not_equal",
},
),
),
},
],
relations: [
PlanRel {
rel_type: Some(
Root(
RelRoot {
input: Some(
Rel {
rel_type: Some(
Project(
ProjectRel {
common: None,
input: Some(
Rel {
rel_type: Some(
Join(
JoinRel {
common: None,
left: Some(
Rel {
rel_type: Some(
Read(
ReadRel {
common: None,
base_schema: Some(
NamedStruct {
names: [
"a",
"b",
"c",
"d",
"e",
"f",
],
r#struct: None,
},
),
filter: None,
best_effort_filter: None,
projection: Some(
MaskExpression {
select: Some(
StructSelect {
struct_items: [
StructItem {
field: 0,
child: None,
},
StructItem {
field: 4,
child: None,
},
],
},
),
maintain_singular_struct: false,
},
),
advanced_extension: None,
read_type: Some(
NamedTable(
NamedTable {
names: [
"data",
],
advanced_extension: None,
},
),
),
},
),
),
},
),
right: Some(
Rel {
rel_type: Some(
Read(
ReadRel {
common: None,
base_schema: Some(
NamedStruct {
names: [
"a",
"b",
"c",
"d",
"e",
"f",
],
r#struct: None,
},
),
filter: None,
best_effort_filter: None,
projection: Some(
MaskExpression {
select: Some(
StructSelect {
struct_items: [
StructItem {
field: 4,
child: None,
},
],
},
),
maintain_singular_struct: false,
},
),
advanced_extension: None,
read_type: Some(
NamedTable(
NamedTable {
names: [
"data",
],
advanced_extension: None,
},
),
),
},
),
),
},
),
expression:
None,
post_join_filter: Some(
Expression {
rex_type: Some(
ScalarFunction(
ScalarFunction {
function_reference: 0,
arguments: [
FunctionArgument {
arg_type: Some(
Value(
Expression {
rex_type: Some(
Selection(
FieldReference {
reference_type: Some(
DirectReference(
ReferenceSegment {
reference_type: Some(
StructField(
StructField {
field: 1,
child: None,
},
),
),
},
),
),
root_type: None,
},
),
),
},
),
),
},
FunctionArgument {
arg_type: Some(
Value(
Expression {
rex_type: Some(
Selection(
FieldReference {
reference_type: Some(
DirectReference(
ReferenceSegment {
reference_type: Some(
StructField(
StructField {
field: 2,
child: None,
},
),
),
},
),
),
root_type: None,
},
),
),
},
),
),
},
],
options: [],
output_type: None,
args: [],
},
),
),
},
),
r#type:
Inner,
advanced_extension: None,
},
),
),
},
),
expressions: [
Expression {
rex_type: Some(
Selection(
FieldReference {
reference_type: Some(
DirectReference(
ReferenceSegment {
reference_type: Some(
StructField(
StructField {
field: 0,
child: None,
},
),
),
},
),
),
root_type:
None,
},
),
),
},
],
advanced_extension: None,
},
),
),
},
),
names: [
"d1.a",
],
},
),
),
},
],
advanced_extensions: None,
expected_type_urls: [],
}
```
however, if we want to get back a DF plan, and use the consumer, we'll get:
```Bash
[Unoptimized plan]
Projection: data.a
Inner Join: Filter: data.e != data.e
TableScan: data projection=[a, e]
TableScan: data projection=[e]
[Optimized plan]
Projection: data.a
Inner Join:
Projection: data.a
Filter: data.e != data.e
TableScan: data projection=[a, e], partial_filters=[data.e != data.e]
TableScan: data projection=[e]
```
Notice that because there is no way for DF to distinguish between the left
`data` table and the right `data` table, DF thinks they are they are from the
same `TableScan` relation. Thus, the output DF plan is incorrect.
### Describe the solution you'd like
Preserve aliases in Substrait.
### Describe alternatives you've considered
N/A
### Additional context
Additional example:
```Rust
#[tokio::test]
async fn roundtrip_exists_filter() -> Result<()> {
roundtrip("SELECT b FROM data d1 WHERE EXISTS (SELECT * FROM data d2
WHERE d2.a = d1.a AND d2.e != d1.e)").await
}
// thread 'tests::roundtrip_exists_filter' panicked at 'assertion failed:
`(left == right)`
// left: `"Projection: d1.b\n LeftSemi Join: d1.a = __correlated_sq_1.a
Filter: __correlated_sq_1.e != d1.e\n SubqueryAlias: d1\n TableScan:
data projection=[a, b, e]\n SubqueryAlias: __correlated_sq_1\n
SubqueryAlias: d2\n TableScan: data projection=[a, e]"`,
// right: `"Projection: data.b\n LeftSemi Join: data.a = data.a\n
Projection: data.a, data.b\n Filter: data.e != data.e\n TableScan:
data projection=[a, b, e], partial_filters=[data.e != data.e]\n TableScan:
data projection=[a]"`',
datafusion/substrait/tests/roundtrip_logical_plan.rs:625:9
```
```Bash
[Original plan]
Projection: d1.b
LeftSemi Join: d1.a = __correlated_sq_1.a Filter: __correlated_sq_1.e !=
d1.e
SubqueryAlias: d1
TableScan: data projection=[a, b, e]
SubqueryAlias: __correlated_sq_1
SubqueryAlias: d2
TableScan: data projection=[a, e]
[Unoptimized plan from consumer]
Projection: data.b
LeftSemi Join: data.a = data.a Filter: data.e != data.e
TableScan: data projection=[a, b, e]
TableScan: data projection=[a, e]
[Optimized plan from consumer (incorrect)]
Projection: data.b
LeftSemi Join: data.a = data.a
Projection: data.a, data.b
Filter: data.e != data.e
TableScan: data projection=[a, b, e], partial_filters=[data.e !=
data.e]
TableScan: data projection=[a]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
