Brenton Walker created SPARK-55531:
--------------------------------------
Summary: Start pending barrier stages immediately when enough
cores are available
Key: SPARK-55531
URL: https://issues.apache.org/jira/browse/SPARK-55531
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 4.2.0
Environment: This issue came up in experiments with Spark 3.2.4
running on Ubuntu 20.04, but I verified it is still here in the 4.2.0 snapshot
running on Ubuntu 24.04.
Here is some code that reproduces the issue in a contrived way. In the current
4.2.0 snapshot this takes about 35 sec to run. With the patch it takes 6-8 sec.
{code:java}
object BarrierPollingTest {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.executor.cores", "1")
.set("spark.executor.instances", "2")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("INFO")
val numJobs = 30
val numTasks = 2
// execute one job, so the executor(s) get(s) created
sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
Thread.sleep(1000)
Iterator(1)
}.count()
// queue up a bunch of jobs to make sure that the BEM jobs execute
immediately
val threadList = ListBuffer[Thread]()
var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
val startTime = java.lang.System.currentTimeMillis()
for (jobNum <- 0 to numJobs) {
println("submitting job: " + jobNum)
val t = new Thread(new Runnable {
def run(): Unit = {
val jobId = jobNum
sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
// make the first job take some time, so the other jobs queue
behind it.
if (jobId == 0) {
Thread.sleep(3000)
}
Iterator(1)
}.count()
doneSignal.countDown()
}
})
threadList += t;
t.start()
// make sure the first job actually starts first
if (jobNum == 0) {
Thread.sleep(1000)
}
}
println("*** waiting for jobs to finish... ***")
doneSignal.await()
for (t <- threadList) {
t.join() // unnecessary?
}
val stopTime = java.lang.System.currentTimeMillis()
// could create a listener and look at the times between tasks manually, or
// just look at the total time it takes to complete all the jobs.
println("\n\n *******************************")
println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed:
"+(stopTime-startTime))
println(" *******************************\n\n")
}
}
{code}
Reporter: Brenton Walker
Fix For: 4.2.0
If a barrier mode job (requiring _k_ cores) is pending, it is not serviced
immediately when _k_ cores become available, unless all _k_ available cores are
on the same executor.
The events that can cause a pending barrier job to be serviced are:
* revive timer (1 sec interval)
* job arrival
* rare state changes like executor add/remove
This leads to an average 500 ms delay in starting pending barrier stages.
Barrier jobs already have scaling issues because cores have to sit idle until
enough are available to meet the requirements. The extra 500 ms could have a
large effect on performance.
*More detail:*
When a task completes and the core becomes available,
CoarseGrainedSchedulerBackend only makes a resource offer of that one executor.
That is OK for normal stages, but if a barrier stage is pending, the cores
needed to service it may be on other executors.
A global offer of all executors is only triggered by the events listed above.
*Why?*
Not clear if this is intentional or not.
If there are N cores, then the rate of task completions is proportional to N.
The number of executors to be iterated over for a global resource offer is also
proportional to N. So the load of doing global resource offers for every task
completion is O(N^2), which is not so nice. The current situation is an
understandable optimization.
*Suggested fix:*
Lots of ways this could be addressed. I implemented a minor patch that, when a
task finishes, checks if any barrier tasks are pending. if barrier tasks are
pending it makes a global offer. I also added a unit test that verifies the
change. I was going to submit a pull request for this.
This would keep the behavior the same for non-barrier jobs, but allow barrier
jobs to be serviced immediately when enough cores are available.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]