Hi, I am running a PySpark app with 1000's of cores (partitions is a small multiple of # of cores) and the overall application performance is fine. However, I noticed that, at the end of the job, PySpark initiates job clean-up procedures and as part of this procedure, PySpark executes a job shown in the Web UI as "runJob at PythonRDD.scala:361" for each executor/core. The pain point is that, this step is running in a sequential fashion and it has become the bottleneck in our application. Even though each job takes only 0.5 sec (on average), it adds up when running with 1000's of executors.
Looking into the code for "destroyPythonWorker" in "SparkEnv.scala", is this behavior the result of "stopWorker" being executed sequentially within foreach? Let me know if I'm missing something and what can be done to fix the issue. private[spark] > def destroyPythonWorker(pythonExec: String, envVars: Map[String, > String], worker: Socket) { > synchronized { > val key = (pythonExec, envVars) > pythonWorkers.get(key).foreach(_.stopWorker(worker)) > } > } Spark version: 1.5.0-cdh5.5.1 Thanks