Xingchao, Zhang created SPARK-40487:
---------------------------------------
Summary: Make defaultJoin in BroadcastNestedLoopJoinExec running
in parallel
Key: SPARK-40487
URL: https://issues.apache.org/jira/browse/SPARK-40487
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.4.0
Reporter: Xingchao, Zhang
The 'Part 1' and 'Part 2' could run in parallel
{code:java}
/**
* The implementation for these joins:
*
* LeftOuter with BuildLeft
* RightOuter with BuildRight
* FullOuter
*/
private def defaultJoin(relation: Broadcast[Array[InternalRow]]):
RDD[InternalRow] = {
val streamRdd = streamed.execute()
// Part 1
val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd,
relation)
val notMatchedBroadcastRows: Seq[InternalRow] = {
val nulls = new GenericInternalRow(streamed.output.size)
val buf: CompactBuffer[InternalRow] = new CompactBuffer()
val joinedRow = new JoinedRow
joinedRow.withLeft(nulls)
var i = 0
val buildRows = relation.value
while (i < buildRows.length) {
if (!matchedBroadcastRows.get(i)) {
buf += joinedRow.withRight(buildRows(i)).copy()
}
i += 1
}
buf
}
// Part 2
val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
val nulls = new GenericInternalRow(broadcast.output.size)
streamedIter.flatMap { streamedRow =>
var i = 0
var foundMatch = false
val matchedRows = new CompactBuffer[InternalRow]
while (i < buildRows.length) {
if (boundCondition(joinedRow(streamedRow, buildRows(i)))) {
matchedRows += joinedRow.copy()
foundMatch = true
}
i += 1
}
if (!foundMatch && joinType == FullOuter) {
matchedRows += joinedRow(streamedRow, nulls).copy()
}
matchedRows.iterator
}
}
// Union
sparkContext.union(
matchedStreamRows,
sparkContext.makeRDD(notMatchedBroadcastRows)
)
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]