Brenton Walker created SPARK-55531:
--------------------------------------

             Summary: Start pending barrier stages immediately when enough 
cores are available
                 Key: SPARK-55531
                 URL: https://issues.apache.org/jira/browse/SPARK-55531
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 4.2.0
         Environment: This issue came up in experiments with Spark 3.2.4 
running on Ubuntu 20.04, but I verified it is still here in the 4.2.0 snapshot 
running on Ubuntu 24.04.

Here is some code that reproduces the issue in a contrived way.  In the current 
4.2.0 snapshot this takes about 35 sec to run. With the patch it takes 6-8 sec.

 

 
{code:java}
object BarrierPollingTest {

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")
      .set("spark.executor.cores", "1")
      .set("spark.executor.instances", "2")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    sc.setLogLevel("INFO")
    val numJobs = 30
    val numTasks = 2

    // execute one job, so the executor(s) get(s) created
    sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
      Thread.sleep(1000)
      Iterator(1)
    }.count()

    // queue up a bunch of jobs to make sure that the BEM jobs execute 
immediately
    val threadList = ListBuffer[Thread]()
    var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
    val startTime = java.lang.System.currentTimeMillis()
    for (jobNum <- 0 to numJobs) {
      println("submitting job: " + jobNum)
      val t = new Thread(new Runnable {
        def run(): Unit = {
          val jobId = jobNum
          sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
            // make the first job take some time, so the other jobs queue 
behind it.
            if (jobId == 0) {
              Thread.sleep(3000)
            }
            Iterator(1)
          }.count()
          doneSignal.countDown()
        }
      })
      threadList += t;
      t.start()
      // make sure the first job actually starts first
      if (jobNum == 0) {
        Thread.sleep(1000)
      }
    }

    println("*** waiting for jobs to finish... ***")
    doneSignal.await()
    for (t <- threadList) {
      t.join()  // unnecessary?
    }
    val stopTime = java.lang.System.currentTimeMillis()
    // could create a listener and look at the times between tasks manually, or
    // just look at the total time it takes to complete all the jobs.
    println("\n\n *******************************")
    println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed: 
"+(stopTime-startTime))
    println(" *******************************\n\n")

  }

}

{code}
 

 
            Reporter: Brenton Walker
             Fix For: 4.2.0


If a barrier mode job (requiring _k_ cores) is pending, it is not serviced 
immediately when _k_ cores become available, unless all _k_ available cores are 
on the same executor.

The events that can cause a pending barrier job to be serviced are:
 * revive timer (1 sec interval)
 * job arrival
 * rare state changes like executor add/remove

This leads to an average 500 ms delay in starting pending barrier stages.  
Barrier jobs already have scaling issues because cores have to sit idle until 
enough are available to meet the requirements.  The extra 500 ms could have a 
large effect on performance.

*More detail:*

When a task completes and the core becomes available, 
CoarseGrainedSchedulerBackend only makes a resource offer of that one executor. 
 That is OK for normal stages, but if a barrier stage is pending, the cores 
needed to service it may be on other executors.

A global offer of all executors is only triggered by the events listed above.

*Why?*

Not clear if this is intentional or not. 

If there are N cores, then the rate of task completions is proportional to N.  
The number of executors to be iterated over for a global resource offer is also 
proportional to N.  So the load of doing global resource offers for every task 
completion is O(N^2), which is not so nice.  The current situation is an 
understandable optimization.

*Suggested fix:*

Lots of ways this could be addressed.  I implemented a minor patch that, when a 
task finishes, checks if any barrier tasks are pending.  if barrier tasks are 
pending it makes a global offer.  I also added a unit test that verifies the 
change.  I was going to submit a pull request for this.

This would keep the behavior the same for non-barrier jobs, but allow barrier 
jobs to be serviced immediately when enough cores are available.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to