Copilot commented on code in PR #1844:
URL: https://github.com/apache/auron/pull/1844#discussion_r2681283514
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -127,44 +124,103 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ if (probedRDD.partitions.isEmpty) {
+ joinType match {
+ case LeftAnti =>
+ return builtRDD
+ case LeftSemi =>
+ return probedRDD
+ case _ =>
+ }
+ }
+
+ val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
+ val nativeSchema = this.nativeSchema
+ val nativeJoinType = this.nativeJoinType
+ val nativeJoinOn = this.nativeJoinOn
+
val probedShuffleReadFull = probedRDD.isShuffleReadFull && (broadcastSide
match {
case BroadcastLeft =>
Seq(FullOuter, RightOuter).contains(joinType)
case BroadcastRight =>
Seq(FullOuter, LeftOuter, LeftSemi, LeftAnti).contains(joinType)
})
+ // For ExistenceJoin with empty probed side, use builtRDD.partitions to
ensure
+ // native join can execute and finish() will output all build rows with
exists=false
+ val (rddPartitions, rddPartitioner, rddDependencies) =
+ if (probedRDD.partitions.isEmpty &&
joinType.isInstanceOf[ExistenceJoin]) {
+ (builtRDD.partitions, builtRDD.partitioner, new
OneToOneDependency(builtRDD) :: Nil)
+ } else {
+ (probedRDD.partitions, probedRDD.partitioner, new
OneToOneDependency(probedRDD) :: Nil)
+ }
+
new NativeRDD(
sparkContext,
nativeMetrics,
- probedRDD.partitions,
- rddPartitioner = probedRDD.partitioner,
- rddDependencies = new OneToOneDependency(probedRDD) :: Nil,
+ rddPartitions,
+ rddPartitioner = rddPartitioner,
+ rddDependencies = rddDependencies,
probedShuffleReadFull,
(partition, context) => {
val partition0 = new Partition() {
override def index: Int = 0
}
- val (leftChild, rightChild) = broadcastSide match {
- case BroadcastLeft =>
- (
- leftRDD.nativePlan(partition0, context),
- rightRDD.nativePlan(rightRDD.partitions(partition.index),
context))
- case BroadcastRight =>
- (
- leftRDD.nativePlan(leftRDD.partitions(partition.index), context),
- rightRDD.nativePlan(partition0, context))
- }
+ val (leftChild, rightChild) =
+ if (probedRDD.partitions.isEmpty &&
joinType.isInstanceOf[ExistenceJoin]) {
Review Comment:
The condition `probedRDD.partitions.isEmpty &&
joinType.isInstanceOf[ExistenceJoin]` is repeated in multiple places (lines 168
and 186). Consider extracting this into a well-named boolean variable (e.g.,
`isExistenceJoinWithEmptyProbed`) to improve code maintainability and reduce
duplication.
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -127,44 +124,103 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ if (probedRDD.partitions.isEmpty) {
+ joinType match {
+ case LeftAnti =>
+ return builtRDD
+ case LeftSemi =>
+ return probedRDD
+ case _ =>
+ }
+ }
Review Comment:
The early return logic for LeftAnti when probedRDD is empty is incorrect
when BroadcastRight. When broadcastSide is BroadcastRight, probedRDD is leftRDD
and builtRDD is rightRDD. For a LeftAnti join, if the left side is empty, the
result should be empty (leftRDD/probedRDD), not the right side
(rightRDD/builtRDD). The current code would incorrectly return rightRDD.
The fix should check which side is actually the left side based on
broadcastSide. For LeftAnti: if left side is empty, return empty; otherwise if
right side is empty, return left side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]