mbutrovich opened a new issue, #4014:
URL: https://github.com/apache/datafusion-comet/issues/4014
### Describe the bug
When `CometExecRule` converts a `BroadcastExchangeExec` to
`CometBroadcastExchangeExec`, any `ReusedExchangeExec` nodes that referenced
the original `BroadcastExchangeExec` are not updated. This breaks exchange
reuse, resulting in duplicate broadcast execution and an extra project node in
the plan.
This is caused by rule ordering: `ReuseExchangeAndSubquery` runs as a
physical preparation rule and sets up `ReusedExchangeExec` →
`BroadcastExchangeExec`. Then `CometExecRule` runs as a columnar rule (after
preparation) and converts the join's `BroadcastExchangeExec` to
`CometBroadcastExchangeExec`. The `SubqueryBroadcastExec` inside the DPP filter
still holds a reference to the original `BroadcastExchangeExec`. The two
exchanges have different canonicalized types and hashes, so they can't be
matched for reuse.
Returning the wrapped exchange's canonicalized form from
`CometBroadcastExchangeExec.doCanonicalize()` would be incorrect because the
two exchanges produce different output formats (Arrow columnar vs Spark
`HashedRelation`). They are not interchangeable at runtime.
This was exposed by adding non-AQE DPP support to `CometNativeScanExec`.
Previously the DPP scan fell back to `FileSourceScanExec`, so the join wasn't
fully native and `CometExecRule` didn't convert the broadcast exchange. With
DPP working natively, the full join tree converts to Comet operators and the
reuse breaks.
### Steps to reproduce
```scala
test("DPP join exchange reuse") {
withTempDir { dir =>
val path = s"${dir.getAbsolutePath}/data"
spark.range(100).selectExpr(
"id % 10 as key", "cast(id * 2 as int) as a",
"cast(id * 3 as int) as b", "cast(id as string) as c",
"array(id, id + 1, id + 3) as d")
.write.partitionBy("key").parquet(path)
spark.read.parquet(path).createOrReplaceTempView("testView")
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val query =
"""select * from (select key, a, c, b from testView) as t1
|join (select key, a, b, c from testView) as t2
|on t1.key = t2.key where t2.a > 50""".stripMargin
val df = sql(query)
df.collect()
val plan = df.queryExecution.executedPlan
val reused = collectWithSubqueries(plan) {
case e: ReusedExchangeExec => e
}
assert(reused.nonEmpty, "Expected exchange reuse")
}
}
}
```
### Expected behavior
The plan should contain a `ReusedExchangeExec` on the broadcast side of the
join, reusing the broadcast from the DPP `SubqueryBroadcastExec`:
```
CometBroadcastHashJoin
:- CometProject [key, a, c, b]
: +- CometNativeScan [a, b, c, key] (DPP scan)
: +- SubqueryBroadcast
: +- BroadcastExchange (original)
+- ReusedExchange (reuses BroadcastExchange)
```
### Additional context
Actual plan (no reuse, extra project):
```
CometBroadcastHashJoin
:- CometProject [key, a, c, b]
: +- CometNativeScan [a, b, c, key] (DPP scan)
: +- SubqueryBroadcast
: +- BroadcastExchange (Spark exchange, in subquery)
+- CometBroadcastExchange (Comet exchange, NOT reused)
+- CometProject [key, a, b, c] (extra project)
+- CometFilter
+- CometNativeScan
```
Canonicalization output confirms the mismatch:
```
CometBroadcastExchangeExec canonicalized: CometBroadcastExchangeExec,
hash=-356598217
BroadcastExchangeExec canonicalized: BroadcastExchangeExec, hash=-555436138
```
On main (where DPP falls back to Spark), the join stays as
`BroadcastHashJoinExec` with `ReusedExchangeExec` and only 2 project nodes.
This surfaces as a failure in Spark's `RemoveRedundantProjectsSuite` "join
with ordering requirement" test, which expects 2 project nodes but finds 3.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]