mbutrovich commented on code in PR #4374:
URL: https://github.com/apache/datafusion-comet/pull/4374#discussion_r3283813792
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -85,7 +85,13 @@ case class CometBroadcastExchangeExec(
"number of coalesced rows for broadcast"))
override def doCanonicalize(): SparkPlan = {
- CometBroadcastExchangeExec(null, null, mode, child.canonicalized)
+ // originalPlan is the source-of-truth for the projection applied during
broadcast
+ // (e.g. count+1 vs count-1 in correlated IN with count-bug
decorrelation). Two
+ // CometBroadcastExchanges with identical Comet children but different
originalPlans
+ // produce different broadcast values, so their canonical forms must
differ to
+ // prevent AQE's ReusedExchange from incorrectly merging them. See issue
#4242.
+ val canonicalOriginal = if (originalPlan != null)
originalPlan.canonicalized else null
+ CometBroadcastExchangeExec(canonicalOriginal, null, mode,
child.canonicalized)
Review Comment:
Pre-existing thing, but since this PR is reworking canonicalization, it
might be worth addressing in the same change. Spark's
BroadcastExchangeExec.doCanonicalize calls mode.canonicalized, but here we pass
the bare mode. HashedRelationBroadcastMode carries the join-key expressions
with non-canonical exprIds, so two semantically equivalent Comet broadcasts may
end up with non-matching canonical forms and miss legitimate reuse
opportunities. Worth either fixing here or filing a follow-up issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]