Repository: spark Updated Branches: refs/heads/master e44697606 -> 5b5a69bea
[SPARK-20923] turn tracking of TaskMetrics._updatedBlockStatuses off ## What changes were proposed in this pull request? Turn tracking of TaskMetrics._updatedBlockStatuses off by default. As far as I can see its not used by anything and it uses a lot of memory when caching and processing a lot of blocks. In my case it was taking 5GB of a 10GB heap and I even went up to 50GB heap and the job still ran out of memory. With this change in place the same job easily runs in less then 10GB of heap. We leave the api there as well as a config to turn it back on just in case anyone is using it. TaskMetrics is exposed via SparkListenerTaskEnd so if users are relying on it they can turn it back on. ## How was this patch tested? Ran unit tests that were modified and manually tested on a couple of jobs (with and without caching). Clicked through the UI and didn't see anything missing. Ran my very large hive query job with 200,000 small tasks, 1000 executors, cached 6+TB of data this runs fine now whereas without this change it would go into full gcs and eventually die. Author: Thomas Graves <[email protected]> Author: Tom Graves <[email protected]> Closes #18162 from tgravescs/SPARK-20923. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5a69be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5a69be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5a69be Branch: refs/heads/master Commit: 5b5a69bea9de806e2c39b04b248ee82a7b664d7b Parents: e446976 Author: Thomas Graves <[email protected]> Authored: Fri Jun 23 09:19:02 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Fri Jun 23 09:19:02 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/executor/TaskMetrics.scala | 6 ++++ .../apache/spark/internal/config/package.scala | 8 +++++ .../org/apache/spark/storage/BlockManager.scala | 6 ++-- .../spark/storage/BlockManagerSuite.scala | 32 +++++++++++++++++++- 4 files changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 341a6da..85b2745 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -112,6 +112,12 @@ class TaskMetrics private[spark] () extends Serializable { /** * Storage statuses of any blocks that have been updated as a result of this task. + * + * Tracking the _updatedBlockStatuses can use a lot of memory. + * It is not used anywhere inside of Spark so we would ideally remove it, but its exposed to + * the user in SparkListenerTaskEnd so the api is kept for compatibility. + * Tracking can be turned off to save memory via config + * TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES. */ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = { // This is called on driver. All accumulator updates have a fixed value. So it's safe to use http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 615497d..462c189 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -322,4 +322,12 @@ package object config { "above this threshold. This is to avoid a giant request takes too much memory.") .bytesConf(ByteUnit.BYTE) .createWithDefaultString("200m") + + private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES = + ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses") + .doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by default since " + + "tracking the block statuses can use a lot of memory and its not used anywhere within " + + "spark.") + .booleanConf + .createWithDefault(false) } http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 74be703..adbe3cf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1473,8 +1473,10 @@ private[spark] class BlockManager( } private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = { - Option(TaskContext.get()).foreach { c => - c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) + if (conf.get(config.TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES)) { + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 88f1829..086adcc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -922,8 +922,38 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("turn off updated block statuses") { + val conf = new SparkConf() + conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false) + store = makeBlockManager(12000, testConf = Some(conf)) + + store.registerTask(0) + val list = List.fill(2)(new Array[Byte](2000)) + + def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = { + val context = TaskContext.empty() + try { + TaskContext.setTaskContext(context) + task + } finally { + TaskContext.unset() + } + context.taskMetrics.updatedBlockStatuses + } + + // 1 updated block (i.e. list1) + val updatedBlocks1 = getUpdatedBlocks { + store.putIterator( + "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + assert(updatedBlocks1.size === 0) + } + + test("updated block statuses") { - store = makeBlockManager(12000) + val conf = new SparkConf() + conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true) + store = makeBlockManager(12000, testConf = Some(conf)) store.registerTask(0) val list = List.fill(2)(new Array[Byte](2000)) val bigList = List.fill(8)(new Array[Byte](2000)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
