Copilot commented on code in PR #1844:
URL: https://github.com/apache/auron/pull/1844#discussion_r2670832107


##########
native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs:
##########
@@ -189,15 +189,11 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
         let mut hashes_idx = 0;
 
         for row_idx in 0..probed_batch.num_rows() {
-            let key_is_valid = probed_valids
+            if probed_valids
                 .as_ref()
                 .map(|nb| nb.is_valid(row_idx))
-                .unwrap_or(true);
-            if P.mode == Anti && P.probe_is_join_side && !key_is_valid {
-                probed_joined.set(row_idx, true);
-                continue;
-            }
-            if key_is_valid {
+                .unwrap_or(true)
+            {

Review Comment:
   The removal of the special null key handling for Anti joins changes the 
semantic behavior. The old code explicitly marked rows with null keys as 
"joined" (line would have been: probed_joined.set(row_idx, true)) when P.mode 
== Anti && P.probe_is_join_side, which would exclude them from the Anti join 
output. Now, rows with null keys will skip the hash lookup but remain marked as 
not joined, causing them to be included in the Anti join output. This is a 
behavior change that should be verified against SQL standard semantics for Anti 
joins with null keys, and the test that was removed (join_anti_with_null_keys) 
was specifically testing this behavior.



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +123,116 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
-    val nativeSchema = this.nativeSchema
-    val nativeJoinType = this.nativeJoinType
-    val nativeJoinOn = this.nativeJoinOn
 
     val (probedRDD, builtRDD) = broadcastSide match {
       case BroadcastLeft => (rightRDD, leftRDD)
       case BroadcastRight => (leftRDD, rightRDD)
     }
 
+    // Handle the edge case when probed side is empty (no partitions)
+    // This matches Spark's BroadcastNestedLoopJoinExec behavior for 
condition.isEmpty case:
+    //   val streamExists = !streamed.executeTake(1).isEmpty
+    //   if (streamExists == exists) sparkContext.makeRDD(relation.value)
+    //   else sparkContext.emptyRDD
+    // where exists = true for Semi, false for Anti
+    //
+    // Note: This optimization only applies to Semi/Anti joins.
+    // For ExistenceJoin, we need to let native join execute so that finish() 
can output
+    // all build rows with exists=false when streamed is empty.
+    logError(
+      "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+        + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+        + ", joinType = " + joinType.toString)
+    if (probedRDD.partitions.isEmpty) {
+      joinType match {
+        case LeftAnti =>
+          return builtRDD
+        case LeftSemi =>
+          return probedRDD
+        case _: ExistenceJoin =>
+        // For ExistenceJoin, when streamed is empty, we need to output all 
build rows
+        // with exists=false. We let native join execute with empty probed 
side,
+        // and native finish() will handle this correctly.
+        // Use builtRDD.partitions to ensure native join can execute.

Review Comment:
   The empty partition optimization logic doesn't properly account for which 
side is being broadcast. For LeftAnti, the code always returns builtRDD when 
probedRDD is empty, but this is only correct when the right side is being 
probed (BroadcastLeft case). When the left side is being probed (BroadcastRight 
case), if the left/probed side is empty, LeftAnti should return an empty 
result, not the right/built side. Similarly, LeftSemi logic may also have 
issues. The optimization needs to check the broadcastSide to determine the 
correct behavior.
   ```suggestion
             // LeftAnti always outputs rows from the left side.
             // - BroadcastLeft: probed = right, built = left. With empty 
right, return all left.
             // - BroadcastRight: probed = left (empty), built = right. Result 
must be empty left.
             broadcastSide match {
               case BroadcastLeft =>
                 return builtRDD
               case BroadcastRight =>
                 return probedRDD
             }
           case LeftSemi =>
             // LeftSemi always outputs rows from the left side and is empty if 
either:
             // - the left (probed) side is empty, or
             // - the right (probed) side is empty (no matches).
             // However, we can only safely return an existing RDD when it 
corresponds
             // to the left side and is already empty.
             broadcastSide match {
               case BroadcastRight =>
                 // probedRDD is the left side and is empty, so this is a 
correct empty result.
                 return probedRDD
               case BroadcastLeft =>
                 // probedRDD is the right side (empty), builtRDD is the left 
side (non-empty).
                 // We cannot cheaply construct an empty left RDD here, so skip 
the optimization
                 // and let the native join executor produce the correct 
(empty) result.
             }
           case _: ExistenceJoin =>
             // For ExistenceJoin, when streamed is empty, we need to output 
all build rows
             // with exists=false. We let native join execute with empty probed 
side,
             // and native finish() will handle this correctly.
             // Use builtRDD.partitions to ensure native join can execute.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to