Copilot commented on code in PR #1844:
URL: https://github.com/apache/auron/pull/1844#discussion_r2681283514


##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -127,44 +124,103 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    if (probedRDD.partitions.isEmpty) {
+      joinType match {
+        case LeftAnti =>
+          return builtRDD
+        case LeftSemi =>
+          return probedRDD
+        case _ =>
+      }
+    }
+
+    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
+    val nativeSchema = this.nativeSchema
+    val nativeJoinType = this.nativeJoinType
+    val nativeJoinOn = this.nativeJoinOn
+
     val probedShuffleReadFull = probedRDD.isShuffleReadFull && (broadcastSide 
match {
       case BroadcastLeft =>
         Seq(FullOuter, RightOuter).contains(joinType)
       case BroadcastRight =>
         Seq(FullOuter, LeftOuter, LeftSemi, LeftAnti).contains(joinType)
     })
 
+    // For ExistenceJoin with empty probed side, use builtRDD.partitions to 
ensure
+    // native join can execute and finish() will output all build rows with 
exists=false
+    val (rddPartitions, rddPartitioner, rddDependencies) =
+      if (probedRDD.partitions.isEmpty && 
joinType.isInstanceOf[ExistenceJoin]) {
+        (builtRDD.partitions, builtRDD.partitioner, new 
OneToOneDependency(builtRDD) :: Nil)
+      } else {
+        (probedRDD.partitions, probedRDD.partitioner, new 
OneToOneDependency(probedRDD) :: Nil)
+      }
+
     new NativeRDD(
       sparkContext,
       nativeMetrics,
-      probedRDD.partitions,
-      rddPartitioner = probedRDD.partitioner,
-      rddDependencies = new OneToOneDependency(probedRDD) :: Nil,
+      rddPartitions,
+      rddPartitioner = rddPartitioner,
+      rddDependencies = rddDependencies,
       probedShuffleReadFull,
       (partition, context) => {
         val partition0 = new Partition() {
           override def index: Int = 0
         }
-        val (leftChild, rightChild) = broadcastSide match {
-          case BroadcastLeft =>
-            (
-              leftRDD.nativePlan(partition0, context),
-              rightRDD.nativePlan(rightRDD.partitions(partition.index), 
context))
-          case BroadcastRight =>
-            (
-              leftRDD.nativePlan(leftRDD.partitions(partition.index), context),
-              rightRDD.nativePlan(partition0, context))
-        }
+        val (leftChild, rightChild) =
+          if (probedRDD.partitions.isEmpty && 
joinType.isInstanceOf[ExistenceJoin]) {

Review Comment:
   The condition `probedRDD.partitions.isEmpty && 
joinType.isInstanceOf[ExistenceJoin]` is repeated in multiple places (lines 168 
and 186). Consider extracting this into a well-named boolean variable (e.g., 
`isExistenceJoinWithEmptyProbed`) to improve code maintainability and reduce 
duplication.



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -127,44 +124,103 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    if (probedRDD.partitions.isEmpty) {
+      joinType match {
+        case LeftAnti =>
+          return builtRDD
+        case LeftSemi =>
+          return probedRDD
+        case _ =>
+      }
+    }

Review Comment:
   The early return logic for LeftAnti when probedRDD is empty is incorrect 
when BroadcastRight. When broadcastSide is BroadcastRight, probedRDD is leftRDD 
and builtRDD is rightRDD. For a LeftAnti join, if the left side is empty, the 
result should be empty (leftRDD/probedRDD), not the right side 
(rightRDD/builtRDD). The current code would incorrectly return rightRDD.
   
   The fix should check which side is actually the left side based on 
broadcastSide. For LeftAnti: if left side is empty, return empty; otherwise if 
right side is empty, return left side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to