Rename kill -> cancel in user facing API / documentation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/357733d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/357733d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/357733d2 Branch: refs/heads/master Commit: 357733d292230968c6bda68dbe9407c560d97c91 Parents: ddf64f0 Author: Reynold Xin <r...@apache.org> Authored: Thu Oct 10 13:27:38 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Thu Oct 10 13:27:38 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 10 ++---- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 38 ++++++++++---------- .../apache/spark/scheduler/DAGScheduler.scala | 8 ++--- .../org/apache/spark/scheduler/JobWaiter.scala | 2 +- 4 files changed, 27 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/357733d2/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 515a3b8..2a82c5c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -866,15 +866,11 @@ class SparkContext( } /** - * Kill a running job. + * Cancel all jobs that have been scheduled or are running. */ - def killJob(jobId: Int) { - dagScheduler.killJob(jobId) - } - - def killAllJobs() { + def cancelAllJobs() { dagScheduler.activeJobs.foreach { job => - killJob(job.jobId) + dagScheduler.cancelJob(job.jobId) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/357733d2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 18c4c01..8d1dbf7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -40,13 +40,13 @@ import org.apache.hadoop.conf.{Configuration, Configurable} * created from the Configuration. */ class HadoopFileRDD[K, V]( - sc: SparkContext, - path: String, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) + sc: SparkContext, + path: String, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { override def getJobConf(): JobConf = { @@ -85,21 +85,21 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * A base class that provides core functionality for reading data partitions stored in Hadoop. */ class HadoopRDD[K, V]( - sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) + sc: SparkContext, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { def this( - sc: SparkContext, - conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) = { + sc: SparkContext, + conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) = { this( sc, sc.broadcast(new SerializableWritable(conf)) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/357733d2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 93303a9..7278237 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -265,7 +265,7 @@ class DAGScheduler( /** * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object - * can be used to block until the the job finishes executing or can be used to kill the job. + * can be used to block until the the job finishes executing or can be used to cancel the job. */ def submitJob[T, U]( rdd: RDD[T], @@ -334,10 +334,10 @@ class DAGScheduler( } /** - * Kill a job that is running or waiting in the queue. + * Cancel a job that is running or waiting in the queue. */ - def killJob(jobId: Int): Unit = this.synchronized { - logInfo("Asked to kill job " + jobId) + def cancelJob(jobId: Int): Unit = this.synchronized { + logInfo("Asked to cancel job " + jobId) eventQueue.put(JobCancelled(jobId)) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/357733d2/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 7e152a6..7274547 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -40,7 +40,7 @@ private[spark] class JobWaiter[T]( private var jobResult: JobResult = if (jobFinished) JobSucceeded else null def kill() { - dagScheduler.killJob(jobId) + dagScheduler.cancelJob(jobId) } override def taskSucceeded(index: Int, result: Any): Unit = synchronized {