Repository: spark Updated Branches: refs/heads/master 20d8ef858 -> e2ae7bd04
[SPARK-12819] Deprecate TaskContext.isRunningLocally() We've already removed local execution but didn't deprecate `TaskContext.isRunningLocally()`; we should deprecate it for 2.0. Author: Josh Rosen <joshro...@databricks.com> Closes #10751 from JoshRosen/remove-local-exec-from-taskcontext. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ae7bd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ae7bd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ae7bd0 Branch: refs/heads/master Commit: e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 Parents: 20d8ef8 Author: Josh Rosen <joshro...@databricks.com> Authored: Wed Jan 13 21:02:54 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jan 13 21:02:54 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/CacheManager.scala | 5 ----- core/src/main/scala/org/apache/spark/TaskContext.scala | 3 ++- core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 3 +-- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 3 +-- .../src/test/scala/org/apache/spark/CacheManagerSuite.scala | 9 --------- 5 files changed, 4 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/CacheManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 4d20c73..36b536e 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Partition $key not found, computing it") val computedValues = rdd.computeOrReadCheckpoint(partition, context) - // If the task is running locally, do not persist the result - if (context.isRunningLocally) { - return computedValues - } - // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/TaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index e25ed0f..7704abc 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable { /** * Returns true if the task is running locally in the driver program. - * @return + * @return false */ + @deprecated("Local execution was removed, so this always returns false", "2.0.0") def isRunningLocally(): Boolean /** http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 6c49363..94ff884 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -33,7 +33,6 @@ private[spark] class TaskContextImpl( override val taskMemoryManager: TaskMemoryManager, @transient private val metricsSystem: MetricsSystem, internalAccumulators: Seq[Accumulator[Long]], - val runningLocally: Boolean = false, val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { @@ -85,7 +84,7 @@ private[spark] class TaskContextImpl( override def isCompleted(): Boolean = completed - override def isRunningLocally(): Boolean = runningLocally + override def isRunningLocally(): Boolean = false override def isInterrupted(): Boolean = interrupted http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/scheduler/Task.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 0379ca2..fca5792 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -74,8 +74,7 @@ private[spark] abstract class Task[T]( attemptNumber, taskMemoryManager, metricsSystem, - internalAccumulators, - runningLocally = false) + internalAccumulators) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index cb8bd04..30aa94c 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -82,15 +82,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before assert(value.toList === List(5, 6, 7)) } - test("get uncached local rdd") { - // Local computation should not persist the resulting value, so don't expect a put(). - when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None) - - val context = new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty, runningLocally = true) - val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - assert(value.toList === List(1, 2, 3, 4)) - } - test("verify task metrics updated correctly") { cacheManager = sc.env.cacheManager val context = TaskContext.empty() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org