Repository: spark
Updated Branches:
  refs/heads/branch-2.1 6a1b2eb4c -> e3cec18e1


[SPARK-20084][CORE] Remove internal.metrics.updatedBlockStatuses from history 
files.

## What changes were proposed in this pull request?

Remove accumulator updates for internal.metrics.updatedBlockStatuses from 
SparkListenerTaskEnd entries in the history file. These can cause history files 
to grow to hundreds of GB because the value of the accumulator contains all 
tracked blocks.

## How was this patch tested?

Current History UI tests cover use of the history file.

Author: Ryan Blue <b...@apache.org>

Closes #17412 from rdblue/SPARK-20084-remove-block-accumulator-info.

(cherry picked from commit c4c03eed67c05a78dc8944f6119ea708d6b955be)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3cec18e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3cec18e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3cec18e

Branch: refs/heads/branch-2.1
Commit: e3cec18e1844c2d791754325113f90f005323f9f
Parents: 6a1b2eb
Author: Ryan Blue <b...@apache.org>
Authored: Fri Mar 31 09:42:49 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Fri Mar 31 09:43:08 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/JsonProtocol.scala   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3cec18e/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c11eb3f..7e734bd 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -264,8 +264,7 @@ private[spark] object JsonProtocol {
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
     ("Failure Reason" -> failureReason) ~
-    ("Accumulables" -> JArray(
-      stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
+    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
   }
 
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -281,7 +280,15 @@ private[spark] object JsonProtocol {
     ("Finish Time" -> taskInfo.finishTime) ~
     ("Failed" -> taskInfo.failed) ~
     ("Killed" -> taskInfo.killed) ~
-    ("Accumulables" -> 
JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
+    ("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
+  }
+
+  private lazy val accumulableBlacklist = 
Set("internal.metrics.updatedBlockStatuses")
+
+  def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = 
{
+    JArray(accumulables
+        .filterNot(_.name.exists(accumulableBlacklist.contains))
+        .toList.map(accumulableInfoToJson))
   }
 
   def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
@@ -376,7 +383,7 @@ private[spark] object JsonProtocol {
         ("Message" -> fetchFailed.message)
       case exceptionFailure: ExceptionFailure =>
         val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
-        val accumUpdates = 
JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList)
+        val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
         ("Class Name" -> exceptionFailure.className) ~
         ("Description" -> exceptionFailure.description) ~
         ("Stack Trace" -> stackTrace) ~


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to