Repository: spark
Updated Branches:
  refs/heads/master e44697606 -> 5b5a69bea


[SPARK-20923] turn tracking of TaskMetrics._updatedBlockStatuses off

## What changes were proposed in this pull request?
Turn tracking of TaskMetrics._updatedBlockStatuses off by default. As far as I 
can see its not used by anything and it uses a lot of memory when caching and 
processing a lot of blocks.  In my case it was taking 5GB of a 10GB heap and I 
even went up to 50GB heap and the job still ran out of memory.  With this 
change in place the same job easily runs in less then 10GB of heap.

We leave the api there as well as a config to turn it back on just in case 
anyone is using it.  TaskMetrics is exposed via SparkListenerTaskEnd so if 
users are relying on it they can turn it back on.

## How was this patch tested?

Ran unit tests that were modified and manually tested on a couple of jobs (with 
and without caching).  Clicked through the UI and didn't see anything missing.
Ran my very large hive query job with 200,000 small tasks, 1000 executors, 
cached 6+TB of data this runs fine now whereas without this change it would go 
into full gcs and eventually die.

Author: Thomas Graves <[email protected]>
Author: Tom Graves <[email protected]>

Closes #18162 from tgravescs/SPARK-20923.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5a69be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5a69be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5a69be

Branch: refs/heads/master
Commit: 5b5a69bea9de806e2c39b04b248ee82a7b664d7b
Parents: e446976
Author: Thomas Graves <[email protected]>
Authored: Fri Jun 23 09:19:02 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Jun 23 09:19:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala |  6 ++++
 .../apache/spark/internal/config/package.scala  |  8 +++++
 .../org/apache/spark/storage/BlockManager.scala |  6 ++--
 .../spark/storage/BlockManagerSuite.scala       | 32 +++++++++++++++++++-
 4 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 341a6da..85b2745 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -112,6 +112,12 @@ class TaskMetrics private[spark] () extends Serializable {
 
   /**
    * Storage statuses of any blocks that have been updated as a result of this 
task.
+   *
+   * Tracking the _updatedBlockStatuses can use a lot of memory.
+   * It is not used anywhere inside of Spark so we would ideally remove it, 
but its exposed to
+   * the user in SparkListenerTaskEnd so the api is kept for compatibility.
+   * Tracking can be turned off to save memory via config
+   * TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES.
    */
   def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = {
     // This is called on driver. All accumulator updates have a fixed value. 
So it's safe to use

http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 615497d..462c189 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -322,4 +322,12 @@ package object config {
         "above this threshold. This is to avoid a giant request takes too much 
memory.")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("200m")
+
+  private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
+    ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")
+      .doc("Enable tracking of updatedBlockStatuses in the TaskMetrics. Off by 
default since " +
+        "tracking the block statuses can use a lot of memory and its not used 
anywhere within " +
+        "spark.")
+      .booleanConf
+      .createWithDefault(false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 74be703..adbe3cf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1473,8 +1473,10 @@ private[spark] class BlockManager(
   }
 
   private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: 
BlockStatus): Unit = {
-    Option(TaskContext.get()).foreach { c =>
-      c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
+    if (conf.get(config.TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES)) {
+      Option(TaskContext.get()).foreach { c =>
+        c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a69be/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 88f1829..086adcc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -922,8 +922,38 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     }
   }
 
+  test("turn off updated block statuses") {
+    val conf = new SparkConf()
+    conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)
+    store = makeBlockManager(12000, testConf = Some(conf))
+
+    store.registerTask(0)
+    val list = List.fill(2)(new Array[Byte](2000))
+
+    def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
+      val context = TaskContext.empty()
+      try {
+        TaskContext.setTaskContext(context)
+        task
+      } finally {
+        TaskContext.unset()
+      }
+      context.taskMetrics.updatedBlockStatuses
+    }
+
+    // 1 updated block (i.e. list1)
+    val updatedBlocks1 = getUpdatedBlocks {
+      store.putIterator(
+        "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    }
+    assert(updatedBlocks1.size === 0)
+  }
+
+
   test("updated block statuses") {
-    store = makeBlockManager(12000)
+    val conf = new SparkConf()
+    conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, true)
+    store = makeBlockManager(12000, testConf = Some(conf))
     store.registerTask(0)
     val list = List.fill(2)(new Array[Byte](2000))
     val bigList = List.fill(8)(new Array[Byte](2000))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to