Copilot commented on code in PR #1844:
URL: https://github.com/apache/auron/pull/1844#discussion_r2674942224


##########
native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs:
##########
@@ -193,10 +193,15 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
                 .as_ref()
                 .map(|nb| nb.is_valid(row_idx))
                 .unwrap_or(true);
-            if P.mode == Anti && P.probe_is_join_side && !key_is_valid {
+            if P.mode == Anti
+                && P.probe_is_join_side
+                && !key_is_valid
+                && self.join_params.is_null_aware_anti_join
+            {
                 probed_joined.set(row_idx, true);
                 continue;
             }

Review Comment:
   The null-aware anti join filtering logic only applies when the probe is on 
the join side. However, the condition checks should be grouped differently to 
make the logic clearer. The key_is_valid check is now separated by an early 
continue, which could lead to confusion about when probed_joined is set versus 
when the hash lookup occurs.



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    logError(
+      "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)

Review Comment:
   Debug logging statements using logError are left in production code. These 
should be removed before merging as they will pollute error logs with non-error 
messages during normal execution.



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    logError(
+      "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)
+    if (probedRDD.partitions.isEmpty) {
+      joinType match {
+        case LeftAnti =>
+          return builtRDD
+        case LeftSemi =>
+          return probedRDD
+        case _: ExistenceJoin =>
+        // For ExistenceJoin, when streamed is empty, we need to output all 
build rows
+        // with exists=false. We let native join execute with empty probed 
side,
+        // and native finish() will handle this correctly.
+        // Use builtRDD.partitions to ensure native join can execute.

Review Comment:
   The empty case block for ExistenceJoin at line 153-157 is not 
self-documenting. While the comment explains the intent, the empty case body 
makes it unclear that this is intentional fall-through behavior. Consider 
adding a comment directly in the case block or restructuring to make the intent 
more explicit.
   ```suggestion
             // For ExistenceJoin, when streamed is empty, we need to output 
all build rows
             // with exists=false. We let native join execute with empty probed 
side,
             // and native finish() will handle this correctly. Use 
builtRDD.partitions
             // to ensure native join can execute. This case intentionally 
performs no
             // early return so we fall through to the common execution path.
             ()
   ```



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    logError(
+      "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)
+    if (probedRDD.partitions.isEmpty) {
+      joinType match {
+        case LeftAnti =>
+          return builtRDD
+        case LeftSemi =>
+          return probedRDD
+        case _: ExistenceJoin =>
+        // For ExistenceJoin, when streamed is empty, we need to output all 
build rows
+        // with exists=false. We let native join execute with empty probed 
side,
+        // and native finish() will handle this correctly.
+        // Use builtRDD.partitions to ensure native join can execute.
+        case _ =>
+      }
+    }
+    logError(
+      "Debug: probedRDDV2.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)

Review Comment:
   Debug logging statements using logError are left in production code. These 
should be removed before merging as they will pollute error logs with non-error 
messages during normal execution.



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    logError(
+      "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)
+    if (probedRDD.partitions.isEmpty) {
+      joinType match {
+        case LeftAnti =>
+          return builtRDD
+        case LeftSemi =>
+          return probedRDD
+        case _: ExistenceJoin =>
+        // For ExistenceJoin, when streamed is empty, we need to output all 
build rows
+        // with exists=false. We let native join execute with empty probed 
side,
+        // and native finish() will handle this correctly.
+        // Use builtRDD.partitions to ensure native join can execute.
+        case _ =>
+      }
+    }
+    logError(
+      "Debug: probedRDDV2.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)
+
+    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
+    val nativeSchema = this.nativeSchema
+    val nativeJoinType = this.nativeJoinType
+    val nativeJoinOn = this.nativeJoinOn
+
     val probedShuffleReadFull = probedRDD.isShuffleReadFull && (broadcastSide 
match {
       case BroadcastLeft =>
         Seq(FullOuter, RightOuter).contains(joinType)
       case BroadcastRight =>
         Seq(FullOuter, LeftOuter, LeftSemi, LeftAnti).contains(joinType)
     })
 
+    // For ExistenceJoin with empty probed side, use builtRDD.partitions to 
ensure
+    // native join can execute and finish() will output all build rows with 
exists=false
+    val (rddPartitions, rddPartitioner, rddDependencies) =
+      if (probedRDD.partitions.isEmpty && 
joinType.isInstanceOf[ExistenceJoin]) {
+        (builtRDD.partitions, builtRDD.partitioner, new 
OneToOneDependency(builtRDD) :: Nil)
+      } else {
+        (probedRDD.partitions, probedRDD.partitioner, new 
OneToOneDependency(probedRDD) :: Nil)
+      }
+
     new NativeRDD(
       sparkContext,
       nativeMetrics,
-      probedRDD.partitions,
-      rddPartitioner = probedRDD.partitioner,
-      rddDependencies = new OneToOneDependency(probedRDD) :: Nil,
+      rddPartitions,
+      rddPartitioner = rddPartitioner,
+      rddDependencies = rddDependencies,
       probedShuffleReadFull,
       (partition, context) => {
         val partition0 = new Partition() {
           override def index: Int = 0
         }
-        val (leftChild, rightChild) = broadcastSide match {
-          case BroadcastLeft =>
-            (
-              leftRDD.nativePlan(partition0, context),
-              rightRDD.nativePlan(rightRDD.partitions(partition.index), 
context))
-          case BroadcastRight =>
-            (
-              leftRDD.nativePlan(leftRDD.partitions(partition.index), context),
-              rightRDD.nativePlan(partition0, context))
-        }
+        val (leftChild, rightChild) =
+          if (probedRDD.partitions.isEmpty && 
joinType.isInstanceOf[ExistenceJoin]) {

Review Comment:
   There is duplicated logic checking for empty probed RDD and ExistenceJoin. 
This condition is checked at line 181 and again at line 199, which makes the 
code harder to maintain. Consider extracting this logic into a boolean variable 
at the beginning of the method to avoid duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to