Xingchao, Zhang created SPARK-40487:
---------------------------------------

             Summary: Make defaultJoin in BroadcastNestedLoopJoinExec running 
in parallel
                 Key: SPARK-40487
                 URL: https://issues.apache.org/jira/browse/SPARK-40487
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Xingchao, Zhang


The 'Part 1' and 'Part 2' could run in parallel
{code:java}
  /**
   * The implementation for these joins:
   *
   *   LeftOuter with BuildLeft
   *   RightOuter with BuildRight
   *   FullOuter
   */
  private def defaultJoin(relation: Broadcast[Array[InternalRow]]): 
RDD[InternalRow] = {
    val streamRdd = streamed.execute()

    // Part 1
    val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd, 
relation)
    val notMatchedBroadcastRows: Seq[InternalRow] = {
      val nulls = new GenericInternalRow(streamed.output.size)
      val buf: CompactBuffer[InternalRow] = new CompactBuffer()
      val joinedRow = new JoinedRow
      joinedRow.withLeft(nulls)
      var i = 0
      val buildRows = relation.value
      while (i < buildRows.length) {
        if (!matchedBroadcastRows.get(i)) {
          buf += joinedRow.withRight(buildRows(i)).copy()
        }
        i += 1
      }
      buf
    }

    // Part 2
    val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
      val buildRows = relation.value
      val joinedRow = new JoinedRow
      val nulls = new GenericInternalRow(broadcast.output.size)

      streamedIter.flatMap { streamedRow =>
        var i = 0
        var foundMatch = false
        val matchedRows = new CompactBuffer[InternalRow]

        while (i < buildRows.length) {
          if (boundCondition(joinedRow(streamedRow, buildRows(i)))) {
            matchedRows += joinedRow.copy()
            foundMatch = true
          }
          i += 1
        }

        if (!foundMatch && joinType == FullOuter) {
          matchedRows += joinedRow(streamedRow, nulls).copy()
        }
        matchedRows.iterator
      }
    }

    // Union
    sparkContext.union(
      matchedStreamRows,
      sparkContext.makeRDD(notMatchedBroadcastRows)
    )
  }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to