[ 
https://issues.apache.org/jira/browse/SPARK-55531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenton Walker updated SPARK-55531:
-----------------------------------
    Environment: Ubuntu 20.04 - 24.04  (was: This issue came up in experiments 
with Spark 3.2.4 running on Ubuntu 20.04, but I verified it is still here in 
the 4.2.0 snapshot running on Ubuntu 24.04.

Here is some code that reproduces the issue in a contrived way.  In the current 
4.2.0 snapshot this takes about 35 sec to run. With the patch it takes 6-8 sec.

 

 
{code:java}
object BarrierPollingTest {

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")
      .set("spark.executor.cores", "1")
      .set("spark.executor.instances", "2")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)
    sc.setLogLevel("INFO")
    val numJobs = 30
    val numTasks = 2

    // execute one job, so the executor(s) get(s) created
    sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
      Thread.sleep(1000)
      Iterator(1)
    }.count()

    // queue up a bunch of jobs to make sure that the BEM jobs execute 
immediately
    val threadList = ListBuffer[Thread]()
    var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
    val startTime = java.lang.System.currentTimeMillis()
    for (jobNum <- 0 to numJobs) {
      println("submitting job: " + jobNum)
      val t = new Thread(new Runnable {
        def run(): Unit = {
          val jobId = jobNum
          sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
            // make the first job take some time, so the other jobs queue 
behind it.
            if (jobId == 0) {
              Thread.sleep(3000)
            }
            Iterator(1)
          }.count()
          doneSignal.countDown()
        }
      })
      threadList += t;
      t.start()
      // make sure the first job actually starts first
      if (jobNum == 0) {
        Thread.sleep(1000)
      }
    }

    println("*** waiting for jobs to finish... ***")
    doneSignal.await()
    for (t <- threadList) {
      t.join()  // unnecessary?
    }
    val stopTime = java.lang.System.currentTimeMillis()
    // could create a listener and look at the times between tasks manually, or
    // just look at the total time it takes to complete all the jobs.
    println("\n\n *******************************")
    println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed: 
"+(stopTime-startTime))
    println(" *******************************\n\n")

  }

}

{code}
 

 )

> Start pending barrier stages immediately when enough cores are available
> ------------------------------------------------------------------------
>
>                 Key: SPARK-55531
>                 URL: https://issues.apache.org/jira/browse/SPARK-55531
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 4.2.0
>         Environment: Ubuntu 20.04 - 24.04
>            Reporter: Brenton Walker
>            Priority: Minor
>              Labels: patch, performance, pull-request-available
>             Fix For: 4.2.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If a barrier mode job (requiring _k_ cores) is pending, it is not serviced 
> immediately when _k_ cores become available, unless all _k_ available cores 
> are on the same executor.
> The events that can cause a pending barrier job to be serviced are:
>  * revive timer (1 sec interval)
>  * job arrival
>  * rare state changes like executor add/remove
> This leads to an average 500 ms delay in starting pending barrier stages.  
> Barrier jobs already have scaling issues because cores have to sit idle until 
> enough are available to meet the requirements.  The extra 500 ms could have a 
> large effect on performance.
> *More detail:*
> When a task completes and the core becomes available, 
> CoarseGrainedSchedulerBackend only makes a resource offer of that one 
> executor.  That is OK for normal stages, but if a barrier stage is pending, 
> the cores needed to service it may be on other executors.
> A global offer of all executors is only triggered by the events listed above.
> *Why?*
> Not clear if this is intentional or not. 
> If there are N cores, then the rate of task completions is proportional to N. 
>  The number of executors to be iterated over for a global resource offer is 
> also proportional to N.  So the load of doing global resource offers for 
> every task completion is O(N^2), which is not so nice.  The current situation 
> is an understandable optimization.
> *Suggested fix:*
> Lots of ways this could be addressed.  I implemented a minor patch that, when 
> a task finishes, checks if any barrier tasks are pending.  if barrier tasks 
> are pending it makes a global offer.  I also added a unit test that verifies 
> the change.  I was going to submit a pull request for this.
> This would keep the behavior the same for non-barrier jobs, but allow barrier 
> jobs to be serviced immediately when enough cores are available.
> *Reproducing the issue:*
> This issue came up in experiments with Spark 3.2.4 running on Ubuntu 20.04, 
> but I verified it is still here in the 4.2.0 snapshot running on Ubuntu 24.04.
> Here is some code that reproduces the issue in a contrived way.  In the 
> current 4.2.0 snapshot this takes about 35 sec to run. With the patch it 
> takes 6-8 sec.
> {code:java}
> object BarrierPollingTest {
>   def main(args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("test")
>       .set("spark.executor.cores", "1")
>       .set("spark.executor.instances", "2")
>       .setMaster("local[2]")
>     val sc = new SparkContext(conf)
>     sc.setLogLevel("INFO")
>     val numJobs = 30
>     val numTasks = 2
>     // execute one job, so the executor(s) get(s) created
>     sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i =>
>       Thread.sleep(1000)
>       Iterator(1)
>     }.count()
>     // queue up a bunch of jobs to make sure that the BEM jobs execute 
> immediately
>     val threadList = ListBuffer[Thread]()
>     var doneSignal: CountDownLatch = new CountDownLatch(numJobs)
>     val startTime = java.lang.System.currentTimeMillis()
>     for (jobNum <- 0 to numJobs) {
>       println("submitting job: " + jobNum)
>       val t = new Thread(new Runnable {
>         def run(): Unit = {
>           val jobId = jobNum
>           sc.parallelize(1 to numTasks, numTasks).barrier().mapPartitions { i 
> =>
>             // make the first job take some time, so the other jobs queue 
> behind it.
>             if (jobId == 0) {
>               Thread.sleep(3000)
>             }
>             Iterator(1)
>           }.count()
>           doneSignal.countDown()
>         }
>       })
>       threadList += t;
>       t.start()
>       // make sure the first job actually starts first
>       if (jobNum == 0) {
>         Thread.sleep(1000)
>       }
>     }
>     println("*** waiting for jobs to finish... ***")
>     doneSignal.await()
>     for (t <- threadList) {
>       t.join()  // unnecessary?
>     }
>     val stopTime = java.lang.System.currentTimeMillis()
>     // could create a listener and look at the times between tasks manually, 
> or
>     // just look at the total time it takes to complete all the jobs.
>     println("\n\n *******************************")
>     println("startTime: "+startTime+"\tstopTime: "+stopTime+"\telapsed: 
> "+(stopTime-startTime))
>     println(" *******************************\n\n")
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to