zhou xiang created SPARK-33409: ---------------------------------- Summary: Spark job can not be killed in BoradcastNestedLoopJoin Key: SPARK-33409 URL: https://issues.apache.org/jira/browse/SPARK-33409 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.1 Reporter: zhou xiang
If we kill a spark job in spark web UI, the task content will be marked interrupted, as the code below shows: {code:java} /** * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark * code and user code to properly handle the flag. This function should be idempotent so it can * be called multiple times. * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread. */ def kill(interruptThread: Boolean, reason: String): Unit = { require(reason != null) _reasonIfKilled = reason if (context != null) { context.markInterrupted(reason) } if (interruptThread && taskThread != null) { taskThread.interrupt() } }{code} And spark will check the interrupt flag during the loop to stop it. Like this: {code:java} /** * :: DeveloperApi :: * An iterator that wraps around an existing iterator to provide task killing functionality. * It works by checking the interrupted flag in [[TaskContext]]. */ @DeveloperApi class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { def hasNext: Boolean = { // TODO(aarondav/rxin): Check Thread.interrupted instead of context.interrupted if interrupt // is allowed. The assumption is that Thread.interrupted does not have a memory fence in read // (just a volatile field in C), while context.interrupted is a volatile in the JVM, which // introduces an expensive read fence. context.killTaskIfInterrupted() delegate.hasNext } def next(): T = delegate.next() }{code} In my case, there is a "not in" in my spark sql, which leads to the "BoradcastNestedLoopJoin" The related code as below: {code:java} private def leftExistenceJoin( relation: Broadcast[Array[InternalRow]], exists: Boolean): RDD[InternalRow] = { assert(buildSide == BuildRight) streamed.execute().mapPartitionsInternal { streamedIter => val buildRows = relation.value val joinedRow = new JoinedRow if (condition.isDefined) { streamedIter.filter(l => buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists ) } else if (buildRows.nonEmpty == exists) { streamedIter } else { Iterator.empty } } }{code} The "streamedIter" and "buildRows" both have millions of records, the executor get stuck in the join loop, I found something wrong in my sql and try to kill the job, but the executor thread is not interrupted. I have to restart the executor to stop it. I think we should also do this check: " context.killTaskIfInterrupted() " in BoradcastNestedLoopJoin to support real cancel. {code:java} {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org