zhou xiang created SPARK-33409:
----------------------------------

             Summary: Spark job can not be killed in BoradcastNestedLoopJoin
                 Key: SPARK-33409
                 URL: https://issues.apache.org/jira/browse/SPARK-33409
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.1
            Reporter: zhou xiang


 

If we kill a spark job in spark web UI, the task content will be marked 
interrupted, as the code below shows:
{code:java}
/**
 * Kills a task by setting the interrupted flag to true. This relies on the 
upper level Spark
 * code and user code to properly handle the flag. This function should be 
idempotent so it can
 * be called multiple times.
 * If interruptThread is true, we will also call Thread.interrupt() on the 
Task's executor thread.
 */
def kill(interruptThread: Boolean, reason: String): Unit = {
  require(reason != null)
  _reasonIfKilled = reason
  if (context != null) {
    context.markInterrupted(reason)
  }
  if (interruptThread && taskThread != null) {
    taskThread.interrupt()
  }
}{code}
 

And spark will check the interrupt flag during the loop to stop it. Like this:
{code:java}
 /**
 * :: DeveloperApi ::
 * An iterator that wraps around an existing iterator to provide task killing 
functionality.
 * It works by checking the interrupted flag in [[TaskContext]].
 */
@DeveloperApi
class InterruptibleIterator[+T](val context: TaskContext, val delegate: 
Iterator[T])
  extends Iterator[T] {

  def hasNext: Boolean = {
    // TODO(aarondav/rxin): Check Thread.interrupted instead of 
context.interrupted if interrupt
    // is allowed. The assumption is that Thread.interrupted does not have a 
memory fence in read
    // (just a volatile field in C), while context.interrupted is a volatile in 
the JVM, which
    // introduces an expensive read fence.
    context.killTaskIfInterrupted()
    delegate.hasNext
  }

  def next(): T = delegate.next()
}{code}
In my case, there is a "not in" in my spark sql,  which leads to the 
"BoradcastNestedLoopJoin"

The related code as below:
{code:java}
private def leftExistenceJoin(
    relation: Broadcast[Array[InternalRow]],
    exists: Boolean): RDD[InternalRow] = {
  assert(buildSide == BuildRight)
  streamed.execute().mapPartitionsInternal { streamedIter =>
    val buildRows = relation.value
    val joinedRow = new JoinedRow

    if (condition.isDefined) {
      streamedIter.filter(l =>
        buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
      )
    } else if (buildRows.nonEmpty == exists) {
      streamedIter
    } else {
      Iterator.empty
    }
  }
}{code}
The "streamedIter" and "buildRows" both have millions of records,  the executor 
get stuck in the join loop, I found something wrong in my sql and try to kill 
the job, but the executor thread is not interrupted. I have to restart the 
executor to stop it.

I think we should also do this check: " context.killTaskIfInterrupted() "  in 
BoradcastNestedLoopJoin to support real cancel.

 

 
{code:java}
 {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to