Copilot commented on code in PR #1844:
URL: https://github.com/apache/auron/pull/1844#discussion_r2674942224
##########
native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs:
##########
@@ -193,10 +193,15 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
.as_ref()
.map(|nb| nb.is_valid(row_idx))
.unwrap_or(true);
- if P.mode == Anti && P.probe_is_join_side && !key_is_valid {
+ if P.mode == Anti
+ && P.probe_is_join_side
+ && !key_is_valid
+ && self.join_params.is_null_aware_anti_join
+ {
probed_joined.set(row_idx, true);
continue;
}
Review Comment:
The null-aware anti join filtering logic only applies when the probe is on
the join side. However, the condition checks should be grouped differently to
make the logic clearer. The key_is_valid check is now separated by an early
continue, which could lead to confusion about when probed_joined is set versus
when the hash lookup occurs.
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ logError(
+ "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
Review Comment:
Debug logging statements using logError are left in production code. These
should be removed before merging as they will pollute error logs with non-error
messages during normal execution.
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ logError(
+ "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
+ if (probedRDD.partitions.isEmpty) {
+ joinType match {
+ case LeftAnti =>
+ return builtRDD
+ case LeftSemi =>
+ return probedRDD
+ case _: ExistenceJoin =>
+ // For ExistenceJoin, when streamed is empty, we need to output all
build rows
+ // with exists=false. We let native join execute with empty probed
side,
+ // and native finish() will handle this correctly.
+ // Use builtRDD.partitions to ensure native join can execute.
Review Comment:
The empty case block for ExistenceJoin at line 153-157 is not
self-documenting. While the comment explains the intent, the empty case body
makes it unclear that this is intentional fall-through behavior. Consider
adding a comment directly in the case block or restructuring to make the intent
more explicit.
```suggestion
// For ExistenceJoin, when streamed is empty, we need to output
all build rows
// with exists=false. We let native join execute with empty probed
side,
// and native finish() will handle this correctly. Use
builtRDD.partitions
// to ensure native join can execute. This case intentionally
performs no
// early return so we fall through to the common execution path.
()
```
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ logError(
+ "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
+ if (probedRDD.partitions.isEmpty) {
+ joinType match {
+ case LeftAnti =>
+ return builtRDD
+ case LeftSemi =>
+ return probedRDD
+ case _: ExistenceJoin =>
+ // For ExistenceJoin, when streamed is empty, we need to output all
build rows
+ // with exists=false. We let native join execute with empty probed
side,
+ // and native finish() will handle this correctly.
+ // Use builtRDD.partitions to ensure native join can execute.
+ case _ =>
+ }
+ }
+ logError(
+ "Debug: probedRDDV2.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
Review Comment:
Debug logging statements using logError are left in production code. These
should be removed before merging as they will pollute error logs with non-error
messages during normal execution.
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala:
##########
@@ -126,44 +124,116 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
- val nativeSchema = this.nativeSchema
- val nativeJoinType = this.nativeJoinType
- val nativeJoinOn = this.nativeJoinOn
val (probedRDD, builtRDD) = broadcastSide match {
case BroadcastLeft => (rightRDD, leftRDD)
case BroadcastRight => (leftRDD, rightRDD)
}
+ // Handle the edge case when probed side is empty (no partitions)
+ // This matches Spark's BroadcastNestedLoopJoinExec behavior for
condition.isEmpty case:
+ // val streamExists = !streamed.executeTake(1).isEmpty
+ // if (streamExists == exists) sparkContext.makeRDD(relation.value)
+ // else sparkContext.emptyRDD
+ // where exists = true for Semi, false for Anti
+ //
+ // Note: This optimization only applies to Semi/Anti joins.
+ // For ExistenceJoin, we need to let native join execute so that finish()
can output
+ // all build rows with exists=false when streamed is empty.
+ logError(
+ "Debug: probedRDD.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
+ if (probedRDD.partitions.isEmpty) {
+ joinType match {
+ case LeftAnti =>
+ return builtRDD
+ case LeftSemi =>
+ return probedRDD
+ case _: ExistenceJoin =>
+ // For ExistenceJoin, when streamed is empty, we need to output all
build rows
+ // with exists=false. We let native join execute with empty probed
side,
+ // and native finish() will handle this correctly.
+ // Use builtRDD.partitions to ensure native join can execute.
+ case _ =>
+ }
+ }
+ logError(
+ "Debug: probedRDDV2.partitions.size = " + probedRDD.partitions.length
+ + ", builtRDD.partitions.size = " + builtRDD.partitions.length
+ + ", joinType = " + joinType.toString)
+
+ val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
+ val nativeSchema = this.nativeSchema
+ val nativeJoinType = this.nativeJoinType
+ val nativeJoinOn = this.nativeJoinOn
+
val probedShuffleReadFull = probedRDD.isShuffleReadFull && (broadcastSide
match {
case BroadcastLeft =>
Seq(FullOuter, RightOuter).contains(joinType)
case BroadcastRight =>
Seq(FullOuter, LeftOuter, LeftSemi, LeftAnti).contains(joinType)
})
+ // For ExistenceJoin with empty probed side, use builtRDD.partitions to
ensure
+ // native join can execute and finish() will output all build rows with
exists=false
+ val (rddPartitions, rddPartitioner, rddDependencies) =
+ if (probedRDD.partitions.isEmpty &&
joinType.isInstanceOf[ExistenceJoin]) {
+ (builtRDD.partitions, builtRDD.partitioner, new
OneToOneDependency(builtRDD) :: Nil)
+ } else {
+ (probedRDD.partitions, probedRDD.partitioner, new
OneToOneDependency(probedRDD) :: Nil)
+ }
+
new NativeRDD(
sparkContext,
nativeMetrics,
- probedRDD.partitions,
- rddPartitioner = probedRDD.partitioner,
- rddDependencies = new OneToOneDependency(probedRDD) :: Nil,
+ rddPartitions,
+ rddPartitioner = rddPartitioner,
+ rddDependencies = rddDependencies,
probedShuffleReadFull,
(partition, context) => {
val partition0 = new Partition() {
override def index: Int = 0
}
- val (leftChild, rightChild) = broadcastSide match {
- case BroadcastLeft =>
- (
- leftRDD.nativePlan(partition0, context),
- rightRDD.nativePlan(rightRDD.partitions(partition.index),
context))
- case BroadcastRight =>
- (
- leftRDD.nativePlan(leftRDD.partitions(partition.index), context),
- rightRDD.nativePlan(partition0, context))
- }
+ val (leftChild, rightChild) =
+ if (probedRDD.partitions.isEmpty &&
joinType.isInstanceOf[ExistenceJoin]) {
Review Comment:
There is duplicated logic checking for empty probed RDD and ExistenceJoin.
This condition is checked at line 181 and again at line 199, which makes the
code harder to maintain. Consider extracting this logic into a boolean variable
at the beginning of the method to avoid duplication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]