mbutrovich commented on code in PR #4429:
URL: https://github.com/apache/datafusion-comet/pull/4429#discussion_r3363491497


##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1924,6 +1924,158 @@ trait CometHashJoin {
   }
 }
 
+case class CometBroadcastNestedLoopJoinExec(

Review Comment:
   Should this define a `metrics` map like `CometMetricNode.hashJoinMetrics` 
does for BHJ (and `sortMergeJoinMetrics` for SMJ)? As written it inherits 
`baselineMetrics`, so the native NLJ's build/probe/join timing metrics won't 
surface in the Spark UI.



##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1924,6 +1924,158 @@ trait CometHashJoin {
   }
 }
 
+case class CometBroadcastNestedLoopJoinExec(
+    override val nativeOp: Operator,
+    override val originalPlan: SparkPlan,
+    override val output: Seq[Attribute],
+    override val outputOrdering: Seq[SortOrder],
+    joinType: JoinType,
+    condition: Option[Expression],
+    buildSide: BuildSide,
+    override val left: SparkPlan,
+    override val right: SparkPlan,
+    override val serializedPlanOpt: SerializedPlan)
+    extends CometBinaryExec {
+
+  // Mirror Spark's BroadcastNestedLoopJoinExec: output partitioning derives 
from the streamed
+  // (non-broadcast) side. Reading from live left/right rather than 
originalPlan keeps this
+  // correct after an AQE child swap.
+  private def streamedPlan: SparkPlan = buildSide match {
+    case BuildLeft => right
+    case BuildRight => left
+  }
+
+  override def outputPartitioning: Partitioning = 
streamedPlan.outputPartitioning
+
+  override def withNewChildrenInternal(newLeft: SparkPlan, newRight: 
SparkPlan): SparkPlan =
+    this.copy(left = newLeft, right = newRight)
+
+  override def stringArgs: Iterator[Any] =
+    Iterator(joinType, buildSide, condition, left, right)
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: CometBroadcastNestedLoopJoinExec =>
+        this.output == other.output &&
+        this.joinType == other.joinType &&
+        this.condition == other.condition &&
+        this.buildSide == other.buildSide &&
+        this.left == other.left &&
+        this.right == other.right &&
+        this.serializedPlanOpt == other.serializedPlanOpt
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int =
+    Objects.hashCode(output, joinType, condition, buildSide, left, right)
+}
+
+object CometBroadcastNestedLoopJoinExec extends 
CometOperatorSerde[BroadcastNestedLoopJoinExec] {
+
+  /**
+   * Get the optional Comet configuration entry that is used to enable or 
disable native support
+   * for this operator.
+   */
+  override def enabledConfig: Option[ConfigEntry[Boolean]] = {
+    Some(CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED)
+  }
+
+  private val broadcastBuildReplicationReason =
+    "BNLJ combinations that emit per-build-row results need a cross-partition 
merge that" +
+      " DataFusion's NestedLoopJoinExec does not provide. Affects: 
LeftOuter+BuildLeft," +
+      " RightOuter+BuildRight, FullOuter, LeftSemi+BuildLeft, 
LeftAnti+BuildLeft."
+
+  override def getSupportLevel(op: BroadcastNestedLoopJoinExec): SupportLevel =

Review Comment:
   The support matrix here lines up exactly with Spark's 
`BroadcastNestedLoopJoinExec.outputPartitioning` supported set, and the 
reasoning in `broadcastBuildReplicationReason` is spot on. Every case returns 
either `Compatible(None)` or `Unsupported(...)`, never `Incompatible`. Since 
`isOperatorEnabled` only consults the allow-incompat config on the 
`Incompatible` branch, the 
`getOperatorAllowIncompatConfigKey("BroadcastNestedLoopJoinExec")` line in the 
`CometJoinSuite` override is dead. Is that intentional, or leftover?



##########
spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala:
##########
@@ -702,4 +705,102 @@ class CometJoinSuite extends CometTestBase {
       }
     }
   }
+
+  test("BroadcastNestedLoopJoin with unequal filter") {

Review Comment:
   The compatible matrix is well covered. Could we add a couple of result-only 
`checkSparkAnswer` cases for the unsupported combinations (`FullOuter`, 
`LeftOuter` with `BuildLeft`) to confirm they fall back to Spark and still 
match? A broadcast-reuse case (same relation feeding two BNLJs under AQE) would 
also be reassuring given reuse is handled generically rather than in this 
operator. Minor: no coverage for a join condition referencing nullable or 
null-producing columns, and no left-outer-without-condition case.



##########
spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala:
##########
@@ -38,7 +38,10 @@ class CometJoinSuite extends CometTestBase {
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
       pos: Position): Unit = {
     super.test(testName, testTags: _*) {
-      withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(
+        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
+        CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED.key -> "true",
+        
CometConf.getOperatorAllowIncompatConfigKey("BroadcastNestedLoopJoinExec") -> 
"true") {

Review Comment:
   Similar to the other comment in operators.scala. This allow-incompat config 
is never read for BNLJ. `getSupportLevel` only ever returns `Compatible(None)` 
or `Unsupported(...)`, never `Incompatible`, and `isOperatorEnabled` only 
consults allow-incompat on the `Incompatible` branch. Can this be dropped, or 
was an `Incompatible` case intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to