[
https://issues.apache.org/jira/browse/SPARK-55531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brenton Walker updated SPARK-55531:
-----------------------------------
Environment: Ubuntu 20.04 - 24.04 (was: This issue came up in experiments
with Spark 3.2.4 running on Ubuntu 20.04, but I verified it is still here in
the 4.2.0 snapshot running on Ubuntu 24.04.
Here is some code that reproduces the issue in a contrived way. In the current
4.2.0 snapshot this takes about 35 sec to run. With the patch it takes 6-8 sec.
{code:java}
object BarrierPollingTest {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.executor.cores", "1")
.set("spark.executor.instances", "2")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("INFO")
val numJobs = 30
val numTasks = 2
// execute one job, so the executor(s) get(s) created
sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
Thread.sleep(1000)
Iterator(1)
}.count()
// queue up a bunch of jobs to make sure that the BEM jobs execute
immediately
val threadList = ListBuffer[Thread]()
var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
val startTime = java.lang.System.currentTimeMillis()
for (jobNum <- 0 to numJobs) {
println("submitting job: " + jobNum)
val t = new Thread(new Runnable {
def run(): Unit = {
val jobId = jobNum
sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
// make the first job take some time, so the other jobs queue
behind it.
if (jobId == 0) {
Thread.sleep(3000)
}
Iterator(1)
}.count()
doneSignal.countDown()
}
})
threadList += t;
t.start()
// make sure the first job actually starts first
if (jobNum == 0) {
Thread.sleep(1000)
}
}
println("*** waiting for jobs to finish... ***")
doneSignal.await()
for (t <- threadList) {
t.join() // unnecessary?
}
val stopTime = java.lang.System.currentTimeMillis()
// could create a listener and look at the times between tasks manually, or
// just look at the total time it takes to complete all the jobs.
println("\n\n *******************************")
println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed:
"+(stopTime-startTime))
println(" *******************************\n\n")
}
}
{code}
)
> Start pending barrier stages immediately when enough cores are available
> ------------------------------------------------------------------------
>
> Key: SPARK-55531
> URL: https://issues.apache.org/jira/browse/SPARK-55531
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 4.2.0
> Environment: Ubuntu 20.04 - 24.04
> Reporter: Brenton Walker
> Priority: Minor
> Labels: patch, performance, pull-request-available
> Fix For: 4.2.0
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> If a barrier mode job (requiring _k_ cores) is pending, it is not serviced
> immediately when _k_ cores become available, unless all _k_ available cores
> are on the same executor.
> The events that can cause a pending barrier job to be serviced are:
> * revive timer (1 sec interval)
> * job arrival
> * rare state changes like executor add/remove
> This leads to an average 500 ms delay in starting pending barrier stages.
> Barrier jobs already have scaling issues because cores have to sit idle until
> enough are available to meet the requirements. The extra 500 ms could have a
> large effect on performance.
> *More detail:*
> When a task completes and the core becomes available,
> CoarseGrainedSchedulerBackend only makes a resource offer of that one
> executor. That is OK for normal stages, but if a barrier stage is pending,
> the cores needed to service it may be on other executors.
> A global offer of all executors is only triggered by the events listed above.
> *Why?*
> Not clear if this is intentional or not.
> If there are N cores, then the rate of task completions is proportional to N.
> The number of executors to be iterated over for a global resource offer is
> also proportional to N. So the load of doing global resource offers for
> every task completion is O(N^2), which is not so nice. The current situation
> is an understandable optimization.
> *Suggested fix:*
> Lots of ways this could be addressed. I implemented a minor patch that, when
> a task finishes, checks if any barrier tasks are pending. if barrier tasks
> are pending it makes a global offer. I also added a unit test that verifies
> the change. I was going to submit a pull request for this.
> This would keep the behavior the same for non-barrier jobs, but allow barrier
> jobs to be serviced immediately when enough cores are available.
> *Reproducing the issue:*
> This issue came up in experiments with Spark 3.2.4 running on Ubuntu 20.04,
> but I verified it is still here in the 4.2.0 snapshot running on Ubuntu 24.04.
> Here is some code that reproduces the issue in a contrived way. In the
> current 4.2.0 snapshot this takes about 35 sec to run. With the patch it
> takes 6-8 sec.
> {code:java}
> object BarrierPollingTest {
> def main(args: Array[String]) {
> val conf = new SparkConf()
> .setAppName("test")
> .set("spark.executor.cores", "1")
> .set("spark.executor.instances", "2")
> .setMaster("local[2]")
> val sc = new SparkContext(conf)
> sc.setLogLevel("INFO")
> val numJobs = 30
> val numTasks = 2
> // execute one job, so the executor(s) get(s) created
> sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
> Thread.sleep(1000)
> Iterator(1)
> }.count()
> // queue up a bunch of jobs to make sure that the BEM jobs execute
> immediately
> val threadList = ListBuffer[Thread]()
> var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
> val startTime = java.lang.System.currentTimeMillis()
> for (jobNum <- 0 to numJobs) {
> println("submitting job: " + jobNum)
> val t = new Thread(new Runnable {
> def run(): Unit = {
> val jobId = jobNum
> sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i
> =>
> // make the first job take some time, so the other jobs queue
> behind it.
> if (jobId == 0) {
> Thread.sleep(3000)
> }
> Iterator(1)
> }.count()
> doneSignal.countDown()
> }
> })
> threadList += t;
> t.start()
> // make sure the first job actually starts first
> if (jobNum == 0) {
> Thread.sleep(1000)
> }
> }
> println("*** waiting for jobs to finish... ***")
> doneSignal.await()
> for (t <- threadList) {
> t.join() // unnecessary?
> }
> val stopTime = java.lang.System.currentTimeMillis()
> // could create a listener and look at the times between tasks manually,
> or
> // just look at the total time it takes to complete all the jobs.
> println("\n\n *******************************")
> println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed:
> "+(stopTime-startTime))
> println(" *******************************\n\n")
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]