[ 
https://issues.apache.org/jira/browse/SPARK-40487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-40487:
------------------------------------

    Assignee: Apache Spark

> Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel
> -------------------------------------------------------------------
>
>                 Key: SPARK-40487
>                 URL: https://issues.apache.org/jira/browse/SPARK-40487
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Xingchao, Zhang
>            Assignee: Apache Spark
>            Priority: Major
>
> The 'Part 1' and 'Part 2' could run in parallel
> {code:java}
>   /**
>    * The implementation for these joins:
>    *
>    *   LeftOuter with BuildLeft
>    *   RightOuter with BuildRight
>    *   FullOuter
>    */
>   private def defaultJoin(relation: Broadcast[Array[InternalRow]]): 
> RDD[InternalRow] = {
>     val streamRdd = streamed.execute()
>     // Part 1
>     val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd, 
> relation)
>     val notMatchedBroadcastRows: Seq[InternalRow] = {
>       val nulls = new GenericInternalRow(streamed.output.size)
>       val buf: CompactBuffer[InternalRow] = new CompactBuffer()
>       val joinedRow = new JoinedRow
>       joinedRow.withLeft(nulls)
>       var i = 0
>       val buildRows = relation.value
>       while (i < buildRows.length) {
>         if (!matchedBroadcastRows.get(i)) {
>           buf += joinedRow.withRight(buildRows(i)).copy()
>         }
>         i += 1
>       }
>       buf
>     }
>     // Part 2
>     val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
>       val buildRows = relation.value
>       val joinedRow = new JoinedRow
>       val nulls = new GenericInternalRow(broadcast.output.size)
>       streamedIter.flatMap { streamedRow =>
>         var i = 0
>         var foundMatch = false
>         val matchedRows = new CompactBuffer[InternalRow]
>         while (i < buildRows.length) {
>           if (boundCondition(joinedRow(streamedRow, buildRows(i)))) {
>             matchedRows += joinedRow.copy()
>             foundMatch = true
>           }
>           i += 1
>         }
>         if (!foundMatch && joinType == FullOuter) {
>           matchedRows += joinedRow(streamedRow, nulls).copy()
>         }
>         matchedRows.iterator
>       }
>     }
>     // Union
>     sparkContext.union(
>       matchedStreamRows,
>       sparkContext.makeRDD(notMatchedBroadcastRows)
>     )
>   }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to