Repository: spark Updated Branches: refs/heads/branch-1.4 6f78d03d2 -> ed75cc02b
[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver This fixes a bug where an executor that exits can cause the driver's OutputCommitCoordinator to stop. To fix this, we use an `isDriver` flag and check it in `stop()`. See https://issues.apache.org/jira/browse/SPARK-7563 for more details. Author: Josh Rosen <[email protected]> Closes #6197 from JoshRosen/SPARK-7563 and squashes the following commits: 04b2cc5 [Josh Rosen] [SPARK-7563] OutputCommitCoordinator.stop() should only be executed on the driver (cherry picked from commit 2c04c8a1aed34cce420b3d30d9e885daa6e03d74) Signed-off-by: Patrick Wendell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed75cc02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed75cc02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed75cc02 Branch: refs/heads/branch-1.4 Commit: ed75cc02bcd950b9e239912fec444edfd96f6566 Parents: 6f78d03 Author: Josh Rosen <[email protected]> Authored: Fri May 15 18:06:01 2015 -0700 Committer: Patrick Wendell <[email protected]> Committed: Fri May 15 18:06:12 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 10 ++++++---- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ed75cc02/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a5d831c..3271145 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -379,7 +379,7 @@ object SparkEnv extends Logging { } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { - new OutputCommitCoordinator(conf) + new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) http://git-wip-us.apache.org/repos/asf/spark/blob/ed75cc02/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 0b1d47c..8321037 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -38,7 +38,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) * for an extensive design discussion. */ -private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { +private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging { // Initialized by SparkEnv var coordinatorRef: Option[RpcEndpointRef] = None @@ -129,9 +129,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { } def stop(): Unit = synchronized { - coordinatorRef.foreach(_ send StopCoordinator) - coordinatorRef = None - authorizedCommittersByStage.clear() + if (isDriver) { + coordinatorRef.foreach(_ send StopCoordinator) + coordinatorRef = None + authorizedCommittersByStage.clear() + } } // Marked private[scheduler] instead of private so this can be mocked in tests http://git-wip-us.apache.org/repos/asf/spark/blob/ed75cc02/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index cf97707..7078a7a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter { conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { - outputCommitCoordinator = spy(new OutputCommitCoordinator(conf)) + outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true)) // Use Mockito.spy() to maintain the default infrastructure everywhere else. // This mocking allows us to control the coordinator responses in test cases. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
