coderfender commented on code in PR #4429:
URL: https://github.com/apache/datafusion-comet/pull/4429#discussion_r3367795619
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -1924,6 +1924,158 @@ trait CometHashJoin {
}
}
+case class CometBroadcastNestedLoopJoinExec(
+ override val nativeOp: Operator,
+ override val originalPlan: SparkPlan,
+ override val output: Seq[Attribute],
+ override val outputOrdering: Seq[SortOrder],
+ joinType: JoinType,
+ condition: Option[Expression],
+ buildSide: BuildSide,
+ override val left: SparkPlan,
+ override val right: SparkPlan,
+ override val serializedPlanOpt: SerializedPlan)
+ extends CometBinaryExec {
+
+ // Mirror Spark's BroadcastNestedLoopJoinExec: output partitioning derives
from the streamed
+ // (non-broadcast) side. Reading from live left/right rather than
originalPlan keeps this
+ // correct after an AQE child swap.
+ private def streamedPlan: SparkPlan = buildSide match {
+ case BuildLeft => right
+ case BuildRight => left
+ }
+
+ override def outputPartitioning: Partitioning =
streamedPlan.outputPartitioning
+
+ override def withNewChildrenInternal(newLeft: SparkPlan, newRight:
SparkPlan): SparkPlan =
+ this.copy(left = newLeft, right = newRight)
+
+ override def stringArgs: Iterator[Any] =
+ Iterator(joinType, buildSide, condition, left, right)
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: CometBroadcastNestedLoopJoinExec =>
+ this.output == other.output &&
+ this.joinType == other.joinType &&
+ this.condition == other.condition &&
+ this.buildSide == other.buildSide &&
+ this.left == other.left &&
+ this.right == other.right &&
+ this.serializedPlanOpt == other.serializedPlanOpt
+ case _ =>
+ false
+ }
+ }
+
+ override def hashCode(): Int =
+ Objects.hashCode(output, joinType, condition, buildSide, left, right)
+}
+
+object CometBroadcastNestedLoopJoinExec extends
CometOperatorSerde[BroadcastNestedLoopJoinExec] {
+
+ /**
+ * Get the optional Comet configuration entry that is used to enable or
disable native support
+ * for this operator.
+ */
+ override def enabledConfig: Option[ConfigEntry[Boolean]] = {
+ Some(CometConf.COMET_EXEC_BROADCAST_NESTED_LOOP_JOIN_ENABLED)
+ }
+
+ private val broadcastBuildReplicationReason =
+ "BNLJ combinations that emit per-build-row results need a cross-partition
merge that" +
+ " DataFusion's NestedLoopJoinExec does not provide. Affects:
LeftOuter+BuildLeft," +
+ " RightOuter+BuildRight, FullOuter, LeftSemi+BuildLeft,
LeftAnti+BuildLeft."
+
+ override def getSupportLevel(op: BroadcastNestedLoopJoinExec): SupportLevel =
Review Comment:
Agreed. I was initially gating the join behind the config in the test suite
to figure out the support matrix through testing. I have since then removed the
now redundant config
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]