[
https://issues.apache.org/jira/browse/SPARK-40487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606536#comment-17606536
]
Apache Spark commented on SPARK-40487:
--------------------------------------
User 'xingczhao' has created a pull request for this issue:
https://github.com/apache/spark/pull/37930
> Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel
> -------------------------------------------------------------------
>
> Key: SPARK-40487
> URL: https://issues.apache.org/jira/browse/SPARK-40487
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Xingchao, Zhang
> Priority: Major
>
> The 'Part 1' and 'Part 2' could run in parallel
> {code:java}
> /**
> * The implementation for these joins:
> *
> * LeftOuter with BuildLeft
> * RightOuter with BuildRight
> * FullOuter
> */
> private def defaultJoin(relation: Broadcast[Array[InternalRow]]):
> RDD[InternalRow] = {
> val streamRdd = streamed.execute()
> // Part 1
> val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd,
> relation)
> val notMatchedBroadcastRows: Seq[InternalRow] = {
> val nulls = new GenericInternalRow(streamed.output.size)
> val buf: CompactBuffer[InternalRow] = new CompactBuffer()
> val joinedRow = new JoinedRow
> joinedRow.withLeft(nulls)
> var i = 0
> val buildRows = relation.value
> while (i < buildRows.length) {
> if (!matchedBroadcastRows.get(i)) {
> buf += joinedRow.withRight(buildRows(i)).copy()
> }
> i += 1
> }
> buf
> }
> // Part 2
> val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
> val buildRows = relation.value
> val joinedRow = new JoinedRow
> val nulls = new GenericInternalRow(broadcast.output.size)
> streamedIter.flatMap { streamedRow =>
> var i = 0
> var foundMatch = false
> val matchedRows = new CompactBuffer[InternalRow]
> while (i < buildRows.length) {
> if (boundCondition(joinedRow(streamedRow, buildRows(i)))) {
> matchedRows += joinedRow.copy()
> foundMatch = true
> }
> i += 1
> }
> if (!foundMatch && joinType == FullOuter) {
> matchedRows += joinedRow(streamedRow, nulls).copy()
> }
> matchedRows.iterator
> }
> }
> // Union
> sparkContext.union(
> matchedStreamRows,
> sparkContext.makeRDD(notMatchedBroadcastRows)
> )
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]