mbutrovich opened a new issue, #4014:
URL: https://github.com/apache/datafusion-comet/issues/4014

   ### Describe the bug
   
   When `CometExecRule` converts a `BroadcastExchangeExec` to 
`CometBroadcastExchangeExec`, any `ReusedExchangeExec` nodes that referenced 
the original `BroadcastExchangeExec` are not updated. This breaks exchange 
reuse, resulting in duplicate broadcast execution and an extra project node in 
the plan.
   
   This is caused by rule ordering: `ReuseExchangeAndSubquery` runs as a 
physical preparation rule and sets up `ReusedExchangeExec` → 
`BroadcastExchangeExec`. Then `CometExecRule` runs as a columnar rule (after 
preparation) and converts the join's `BroadcastExchangeExec` to 
`CometBroadcastExchangeExec`. The `SubqueryBroadcastExec` inside the DPP filter 
still holds a reference to the original `BroadcastExchangeExec`. The two 
exchanges have different canonicalized types and hashes, so they can't be 
matched for reuse.
   
   Returning the wrapped exchange's canonicalized form from 
`CometBroadcastExchangeExec.doCanonicalize()` would be incorrect because the 
two exchanges produce different output formats (Arrow columnar vs Spark 
`HashedRelation`). They are not interchangeable at runtime.
   
   This was exposed by adding non-AQE DPP support to `CometNativeScanExec`. 
Previously the DPP scan fell back to `FileSourceScanExec`, so the join wasn't 
fully native and `CometExecRule` didn't convert the broadcast exchange. With 
DPP working natively, the full join tree converts to Comet operators and the 
reuse breaks.
   
   ### Steps to reproduce
   
   ```scala
   test("DPP join exchange reuse") {
     withTempDir { dir =>
       val path = s"${dir.getAbsolutePath}/data"
       spark.range(100).selectExpr(
         "id % 10 as key", "cast(id * 2 as int) as a",
         "cast(id * 3 as int) as b", "cast(id as string) as c",
         "array(id, id + 1, id + 3) as d")
         .write.partitionBy("key").parquet(path)
       spark.read.parquet(path).createOrReplaceTempView("testView")
   
       withSQLConf(
         SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
         val query =
           """select * from (select key, a, c, b from testView) as t1
             |join (select key, a, b, c from testView) as t2
             |on t1.key = t2.key where t2.a > 50""".stripMargin
         val df = sql(query)
         df.collect()
         val plan = df.queryExecution.executedPlan
   
         val reused = collectWithSubqueries(plan) {
           case e: ReusedExchangeExec => e
         }
         assert(reused.nonEmpty, "Expected exchange reuse")
       }
     }
   }
   ```
   
   ### Expected behavior
   
   The plan should contain a `ReusedExchangeExec` on the broadcast side of the 
join, reusing the broadcast from the DPP `SubqueryBroadcastExec`:
   
   ```
   CometBroadcastHashJoin
   :- CometProject [key, a, c, b]
   :  +- CometNativeScan [a, b, c, key]  (DPP scan)
   :     +- SubqueryBroadcast
   :        +- BroadcastExchange         (original)
   +- ReusedExchange                     (reuses BroadcastExchange)
   ```
   
   ### Additional context
   
   Actual plan (no reuse, extra project):
   
   ```
   CometBroadcastHashJoin
   :- CometProject [key, a, c, b]
   :  +- CometNativeScan [a, b, c, key]  (DPP scan)
   :     +- SubqueryBroadcast
   :        +- BroadcastExchange          (Spark exchange, in subquery)
   +- CometBroadcastExchange              (Comet exchange, NOT reused)
      +- CometProject [key, a, b, c]      (extra project)
         +- CometFilter
            +- CometNativeScan
   ```
   
   Canonicalization output confirms the mismatch:
   ```
   CometBroadcastExchangeExec canonicalized: CometBroadcastExchangeExec, 
hash=-356598217
   BroadcastExchangeExec canonicalized: BroadcastExchangeExec, hash=-555436138
   ```
   
   On main (where DPP falls back to Spark), the join stays as 
`BroadcastHashJoinExec` with `ReusedExchangeExec` and only 2 project nodes.
   
   This surfaces as a failure in Spark's `RemoveRedundantProjectsSuite` "join 
with ordering requirement" test, which expects 2 project nodes but finds 3.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to