Repository: spark Updated Branches: refs/heads/master 9306b8c6c -> c0cbbdeaf
SPARK-3093 : masterLock in Worker is no longer need there's no need to use masterLock in Worker now since all communications are within Akka actor Author: CrazyJvm <[email protected]> Closes #2008 from CrazyJvm/no-need-master-lock and squashes the following commits: dd39e20 [CrazyJvm] fix format 58e7fa5 [CrazyJvm] there's no need to use masterLock now since all communications are within Akka actor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0cbbdea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0cbbdea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0cbbdea Branch: refs/heads/master Commit: c0cbbdeaf4f2033be03d32e3ea0288812b4edbf6 Parents: 9306b8c Author: CrazyJvm <[email protected]> Authored: Mon Aug 18 09:34:36 2014 -0700 Committer: Aaron Davidson <[email protected]> Committed: Mon Aug 18 09:34:36 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/worker/Worker.scala | 41 +++++++------------- 1 file changed, 14 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c0cbbdea/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 80fde7e..81400af 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -72,7 +72,6 @@ private[spark] class Worker( val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600) val testing: Boolean = sys.props.contains("spark.testing") - val masterLock: Object = new Object() var master: ActorSelection = null var masterAddress: Address = null var activeMasterUrl: String = "" @@ -145,18 +144,16 @@ private[spark] class Worker( } def changeMaster(url: String, uiUrl: String) { - masterLock.synchronized { - activeMasterUrl = url - activeMasterWebUiUrl = uiUrl - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = activeMasterUrl match { - case Master.sparkUrlRegex(_host, _port) => - Address("akka.tcp", Master.systemName, _host, _port.toInt) - case x => - throw new SparkException("Invalid spark URL: " + x) - } - connected = true + activeMasterUrl = url + activeMasterWebUiUrl = uiUrl + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(_host, _port) => + Address("akka.tcp", Master.systemName, _host, _port.toInt) + case x => + throw new SparkException("Invalid spark URL: " + x) } + connected = true } def tryRegisterAllMasters() { @@ -199,9 +196,7 @@ private[spark] class Worker( } case SendHeartbeat => - masterLock.synchronized { - if (connected) { master ! Heartbeat(workerId) } - } + if (connected) { master ! Heartbeat(workerId) } case WorkDirCleanup => // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor @@ -244,9 +239,7 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) - } + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } catch { case e: Exception => { logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) @@ -254,17 +247,13 @@ private[spark] class Worker( executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) - } + master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) } } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) - } + master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { executors.get(fullId) match { @@ -330,9 +319,7 @@ private[spark] class Worker( case _ => logDebug(s"Driver $driverId changed state to $state") } - masterLock.synchronized { - master ! DriverStateChanged(driverId, state, exception) - } + master ! DriverStateChanged(driverId, state, exception) val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver memoryUsed -= driver.driverDesc.mem --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
