[
https://issues.apache.org/jira/browse/SPARK-55531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brenton Walker updated SPARK-55531:
-----------------------------------
Description:
If a barrier mode job (requiring _k_ cores) is pending, it is not serviced
immediately when _k_ cores become available, unless all _k_ available cores are
on the same executor.
The events that can cause a pending barrier job to be serviced are:
* revive timer (1 sec interval)
* job arrival
* rare state changes like executor add/remove
This leads to an average 500 ms delay in starting pending barrier stages.
Barrier jobs already have scaling issues because cores have to sit idle until
enough are available to meet the requirements. The extra 500 ms could have a
large effect on performance.
*More detail:*
When a task completes and the core becomes available,
CoarseGrainedSchedulerBackend only makes a resource offer of that one executor.
That is OK for normal stages, but if a barrier stage is pending, the cores
needed to service it may be on other executors.
A global offer of all executors is only triggered by the events listed above.
*Why?*
Not clear if this is intentional or not.
If there are N cores, then the rate of task completions is proportional to N.
The number of executors to be iterated over for a global resource offer is also
proportional to N. So the load of doing global resource offers for every task
completion is O(N^2), which is not so nice. The current situation is an
understandable optimization.
*Suggested fix:*
Lots of ways this could be addressed. I implemented a minor patch that, when a
task finishes, checks if any barrier tasks are pending. if barrier tasks are
pending it makes a global offer. I also added a unit test that verifies the
change. I was going to submit a pull request for this.
This would keep the behavior the same for non-barrier jobs, but allow barrier
jobs to be serviced immediately when enough cores are available.
*Reproducing the issue:*
was:
If a barrier mode job (requiring _k_ cores) is pending, it is not serviced
immediately when _k_ cores become available, unless all _k_ available cores are
on the same executor.
The events that can cause a pending barrier job to be serviced are:
* revive timer (1 sec interval)
* job arrival
* rare state changes like executor add/remove
This leads to an average 500 ms delay in starting pending barrier stages.
Barrier jobs already have scaling issues because cores have to sit idle until
enough are available to meet the requirements. The extra 500 ms could have a
large effect on performance.
*More detail:*
When a task completes and the core becomes available,
CoarseGrainedSchedulerBackend only makes a resource offer of that one executor.
That is OK for normal stages, but if a barrier stage is pending, the cores
needed to service it may be on other executors.
A global offer of all executors is only triggered by the events listed above.
*Why?*
Not clear if this is intentional or not.
If there are N cores, then the rate of task completions is proportional to N.
The number of executors to be iterated over for a global resource offer is also
proportional to N. So the load of doing global resource offers for every task
completion is O(N^2), which is not so nice. The current situation is an
understandable optimization.
*Suggested fix:*
Lots of ways this could be addressed. I implemented a minor patch that, when a
task finishes, checks if any barrier tasks are pending. if barrier tasks are
pending it makes a global offer. I also added a unit test that verifies the
change. I was going to submit a pull request for this.
This would keep the behavior the same for non-barrier jobs, but allow barrier
jobs to be serviced immediately when enough cores are available.
> Start pending barrier stages immediately when enough cores are available
> ------------------------------------------------------------------------
>
> Key: SPARK-55531
> URL: https://issues.apache.org/jira/browse/SPARK-55531
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 4.2.0
> Environment: This issue came up in experiments with Spark 3.2.4
> running on Ubuntu 20.04, but I verified it is still here in the 4.2.0
> snapshot running on Ubuntu 24.04.
> Here is some code that reproduces the issue in a contrived way. In the
> current 4.2.0 snapshot this takes about 35 sec to run. With the patch it
> takes 6-8 sec.
>
>
> {code:java}
> object BarrierPollingTest {
> def main(args: Array[String]) {
> val conf = new SparkConf()
> .setAppName("test")
> .set("spark.executor.cores", "1")
> .set("spark.executor.instances", "2")
> .setMaster("local[2]")
> val sc = new SparkContext(conf)
> sc.setLogLevel("INFO")
> val numJobs = 30
> val numTasks = 2
> // execute one job, so the executor(s) get(s) created
> sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
> Thread.sleep(1000)
> Iterator(1)
> }.count()
> // queue up a bunch of jobs to make sure that the BEM jobs execute
> immediately
> val threadList = ListBuffer[Thread]()
> var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
> val startTime = java.lang.System.currentTimeMillis()
> for (jobNum <- 0 to numJobs) {
> println("submitting job: " + jobNum)
> val t = new Thread(new Runnable {
> def run(): Unit = {
> val jobId = jobNum
> sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i
> =>
> // make the first job take some time, so the other jobs queue
> behind it.
> if (jobId == 0) {
> Thread.sleep(3000)
> }
> Iterator(1)
> }.count()
> doneSignal.countDown()
> }
> })
> threadList += t;
> t.start()
> // make sure the first job actually starts first
> if (jobNum == 0) {
> Thread.sleep(1000)
> }
> }
> println("*** waiting for jobs to finish... ***")
> doneSignal.await()
> for (t <- threadList) {
> t.join() // unnecessary?
> }
> val stopTime = java.lang.System.currentTimeMillis()
> // could create a listener and look at the times between tasks manually,
> or
> // just look at the total time it takes to complete all the jobs.
> println("\n\n *******************************")
> println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed:
> "+(stopTime-startTime))
> println(" *******************************\n\n")
> }
> }
> {code}
>
>
> Reporter: Brenton Walker
> Priority: Minor
> Labels: patch, performance, pull-request-available
> Fix For: 4.2.0
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> If a barrier mode job (requiring _k_ cores) is pending, it is not serviced
> immediately when _k_ cores become available, unless all _k_ available cores
> are on the same executor.
> The events that can cause a pending barrier job to be serviced are:
> * revive timer (1 sec interval)
> * job arrival
> * rare state changes like executor add/remove
> This leads to an average 500 ms delay in starting pending barrier stages.
> Barrier jobs already have scaling issues because cores have to sit idle until
> enough are available to meet the requirements. The extra 500 ms could have a
> large effect on performance.
> *More detail:*
> When a task completes and the core becomes available,
> CoarseGrainedSchedulerBackend only makes a resource offer of that one
> executor. That is OK for normal stages, but if a barrier stage is pending,
> the cores needed to service it may be on other executors.
> A global offer of all executors is only triggered by the events listed above.
> *Why?*
> Not clear if this is intentional or not.
> If there are N cores, then the rate of task completions is proportional to N.
> The number of executors to be iterated over for a global resource offer is
> also proportional to N. So the load of doing global resource offers for
> every task completion is O(N^2), which is not so nice. The current situation
> is an understandable optimization.
> *Suggested fix:*
> Lots of ways this could be addressed. I implemented a minor patch that, when
> a task finishes, checks if any barrier tasks are pending. if barrier tasks
> are pending it makes a global offer. I also added a unit test that verifies
> the change. I was going to submit a pull request for this.
> This would keep the behavior the same for non-barrier jobs, but allow barrier
> jobs to be serviced immediately when enough cores are available.
> *Reproducing the issue:*
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]