Copilot commented on code in PR #1844:
URL: https://github.com/apache/auron/pull/1844#discussion_r2670832107
##########
native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs:
##########
@@ -189,15 +189,11 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
let mut hashes_idx = 0;
for row_idx in 0..probed_batch.num_rows() {
- let key_is_valid = probed_valids
+ if probed_valids
.as_ref()
.map(|nb| nb.is_valid(row_idx))
- .unwrap_or(true);
- if P.mode == Anti && P.probe_is_join_side && !key_is_valid {
- probed_joined.set(row_idx, true);
- continue;
- }
- if key_is_valid {
+ .unwrap_or(true)
+ {
Review Comment:
The removal of the special null key handling for Anti joins changes the
semantic behavior. The old code explicitly marked rows with null keys as
"joined" (line would have been: probed_joined.set(row_idx, true)) when P.mode
== Anti && P.probe_is_join_side, which would exclude them from the Anti join
output. Now, rows with null keys will skip the hash lookup but remain marked as
not joined, causing them to be included in the Anti join output. This is a
behavior change that should be verified against SQL standard semantics for Anti
joins with null keys, and the test that was removed (join_anti_with_null_keys)
was specifically testing this behavior.
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +123,116 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ logError(
+ "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
+ if (probedRDD.partitions.isEmpty) {
+ joinType match {
+ case LeftAnti =>
+ return builtRDD
+ case LeftSemi =>
+ return probedRDD
+ case _: ExistenceJoin =>
+ // For ExistenceJoin, when streamed is empty, we need to output all
build rows
+ // with exists=false. We let native join execute with empty probed
side,
+ // and native finish() will handle this correctly.
+ // Use builtRDD.partitions to ensure native join can execute.
Review Comment:
The empty partition optimization logic doesn't properly account for which
side is being broadcast. For LeftAnti, the code always returns builtRDD when
probedRDD is empty, but this is only correct when the right side is being
probed (BroadcastLeft case). When the left side is being probed (BroadcastRight
case), if the left/probed side is empty, LeftAnti should return an empty
result, not the right/built side. Similarly, LeftSemi logic may also have
issues. The optimization needs to check the broadcastSide to determine the
correct behavior.
```suggestion
// LeftAnti always outputs rows from the left side.
// - BroadcastLeft: probed = right, built = left. With empty
right, return all left.
// - BroadcastRight: probed = left (empty), built = right. Result
must be empty left.
broadcastSide match {
case BroadcastLeft =>
return builtRDD
case BroadcastRight =>
return probedRDD
}
case LeftSemi =>
// LeftSemi always outputs rows from the left side and is empty if
either:
// - the left (probed) side is empty, or
// - the right (probed) side is empty (no matches).
// However, we can only safely return an existing RDD when it
corresponds
// to the left side and is already empty.
broadcastSide match {
case BroadcastRight =>
// probedRDD is the left side and is empty, so this is a
correct empty result.
return probedRDD
case BroadcastLeft =>
// probedRDD is the right side (empty), builtRDD is the left
side (non-empty).
// We cannot cheaply construct an empty left RDD here, so skip
the optimization
// and let the native join executor produce the correct
(empty) result.
}
case _: ExistenceJoin =>
// For ExistenceJoin, when streamed is empty, we need to output
all build rows
// with exists=false. We let native join execute with empty probed
side,
// and native finish() will handle this correctly.
// Use builtRDD.partitions to ensure native join can execute.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]