This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f9a8ca54e7ad [SPARK-42204][CORE] Add option to disable redundant 
logging of TaskMetrics internal accumulators in event logs
f9a8ca54e7ad is described below

commit f9a8ca54e7adb025aca709169c8d76ff1b2f7cd7
Author: Josh Rosen <[email protected]>
AuthorDate: Fri Sep 6 15:57:50 2024 -0700

    [SPARK-42204][CORE] Add option to disable redundant logging of TaskMetrics 
internal accumulators in event logs
    
    ### What changes were proposed in this pull request?
    
    This PR adds an off-by-default option to JsonProtocol to have it exclude 
certain redundant accumulator information from Spark event logs in order to 
save space and processing time.
    
    Several event logs types contain both TaskMetrics and Accumulables, but 
there is redundancy in how the TaskMetrics data is stored:
    
    - TaskMetrics are stored in a map called "Task Metrics" which maps from 
metric names to metric values.
    - An "Accumulables" field contains information on accumulator updates from 
the task, but this field includes updates from the TaskMetrics internal 
accumulators (both the value from the task, plus a running "sum-so-far" from 
all of the tasks completed up to that point).
    
    The redundant task metrics accumulables are not actually used by the Spark 
History Server: I verified this by reading AppStatusListener and 
SQLAppStatusListener.
    
    I believe that this redundancy was introduced back in 
[SPARK-10620](https://issues.apache.org/jira/browse/SPARK-10620) when Spark 
1.x's separate TaskMetrics implementation was replaced by the current 
accumulator-based version.
    
    In this PR, I add logic to exclude TaskMetrics internal accumulators when 
writing this field (if a new flag is enabled).
    
    The new `spark.eventLog.includeTaskMetricsAccumulators` configuration 
(default `false`, meaning "keep the redundant information") can be set to 
`true` to exclude these redundant internal accumulator updates.
    
    For now, I am merging this off-by-default, but in a followup PR for Spark 
4.0.0 we might consider a change-of-default to `true` (in which case the flag 
would serve as an "escape-hatch" for users who want to restore the old 
behavior. Although I think it's somewhat unlikely that third-party non-Spark 
consumers of the event logs would be relying on this redundant information, 
this is changing a longstanding user-facing data format and thus needs a flag.
    
    ### Why are the changes needed?
    
    This change reduces the size of Spark event logs, especially for logs from 
applications that run many tasks. It should also have slight benefits on event 
log read and write speed (although I haven't tried to quantify this).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No user-facing changes in Spark History Server.
    
    This flag's effects could be considered a user-facing change from the 
perspective of third-party code which does its own direct processing of Spark 
event logs, hence the config. However, in this PR (by itself) the flag is 
off-by-default. Out-of-the-box user-facing changes will be discussed / proposed 
in a separate flag-flip PR.
    
    ### How was this patch tested?
    
    New unit tests in `JsonProtocolSuite`.
    
    Manual tests of event log size in `spark-shell` with a job that runs 
`spark.parallelize(1 to 1000, 1000).count()`. For this toy query, this PR's 
change shrunk the uncompressed event log size by ~15%. The relative size 
reduction will be even greater once other issues like 
https://issues.apache.org/jira/browse/SPARK-42206 or 
https://issues.apache.org/jira/browse/SPARK-42203 are fixed. The relative 
reduction will be smaller for tasks with many SQL metrics because those 
accumulables canno [...]
    
    Closes #39763 from 
JoshRosen/SPARK-42204-remove-redundant-logging-of-taskmetrics-internal-accumulators-in-jsonprotocol.
    
    Authored-by: Josh Rosen <[email protected]>
    Signed-off-by: Josh Rosen <[email protected]>
---
 .../org/apache/spark/internal/config/package.scala |  12 +++
 .../spark/scheduler/EventLoggingListener.scala     |   8 +-
 .../scala/org/apache/spark/util/JsonProtocol.scala | 108 ++++++++++++++++-----
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 104 ++++++++++++++++++--
 4 files changed, 194 insertions(+), 38 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0e19143411e9..47019c04aada 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -271,6 +271,18 @@ package object config {
       .toSequence
       
.createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS)
 
+  private[spark] val EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS =
+    ConfigBuilder("spark.eventLog.includeTaskMetricsAccumulators")
+      .doc("Whether to include TaskMetrics' underlying accumulator values in 
the event log (as " +
+        "part of the Task/Stage/Job metrics' 'Accumulables' fields. This 
configuration defaults " +
+        "to false because the TaskMetrics values are already logged in the 
'Task Metrics' " +
+        "fields (so the accumulator updates are redundant). This flag exists 
only as a " +
+        "backwards-compatibility escape hatch for applications that might rely 
on the old " +
+        "behavior. See SPARK-42204 for details.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EVENT_LOG_OVERWRITE =
     ConfigBuilder("spark.eventLog.overwrite")
       .version("1.0.0")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index efd8fecb974e..1e46142fab25 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils}
 
 /**
  * A SparkListener that logs events to persistent storage.
@@ -74,6 +74,8 @@ private[spark] class EventLoggingListener(
   private val liveStageExecutorMetrics =
     mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]]
 
+  private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf)
+
   /**
    * Creates the log file in the configured log directory.
    */
@@ -84,7 +86,7 @@ private[spark] class EventLoggingListener(
 
   private def initEventLog(): Unit = {
     val metadata = SparkListenerLogStart(SPARK_VERSION)
-    val eventJson = JsonProtocol.sparkEventToJsonString(metadata)
+    val eventJson = JsonProtocol.sparkEventToJsonString(metadata, 
jsonProtocolOptions)
     logWriter.writeEvent(eventJson, flushLogger = true)
     if (testing && loggedEvents != null) {
       loggedEvents += eventJson
@@ -93,7 +95,7 @@ private[spark] class EventLoggingListener(
 
   /** Log the event as JSON. */
   private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = 
false): Unit = {
-    val eventJson = JsonProtocol.sparkEventToJsonString(event)
+    val eventJson = JsonProtocol.sparkEventToJsonString(event, 
jsonProtocolOptions)
     logWriter.writeEvent(eventJson, flushLogger)
     if (testing) {
       loggedEvents += eventJson
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 19cefbc0479a..e30380f41566 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -28,6 +28,7 @@ import org.json4s.jackson.JsonMethods.compact
 
 import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.internal.config._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
 import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, ResourceProfile, TaskResourceRequest}
@@ -37,6 +38,16 @@ import org.apache.spark.storage._
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils.weakIntern
 
+/**
+ * Helper class for passing configuration options to JsonProtocol.
+ * We use this instead of passing SparkConf directly because it lets us avoid
+ * repeated re-parsing of configuration values on each read.
+ */
+private[spark] class JsonProtocolOptions(conf: SparkConf) {
+  val includeTaskMetricsAccumulators: Boolean =
+    conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS)
+}
+
 /**
  * Serializes SparkListener events to/from JSON.  This protocol provides 
strong backwards-
  * and forwards-compatibility guarantees: any version of Spark should be able 
to read JSON output
@@ -55,30 +66,41 @@ import org.apache.spark.util.Utils.weakIntern
 private[spark] object JsonProtocol extends JsonUtils {
   // TODO: Remove this file and put JSON serialization into each individual 
class.
 
+  private[util]
+  val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new 
SparkConf(false))
+
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
 
+  // Only for use in tests. Production code should use the two-argument 
overload defined below.
   def sparkEventToJsonString(event: SparkListenerEvent): String = {
+    sparkEventToJsonString(event, defaultOptions)
+  }
+
+  def sparkEventToJsonString(event: SparkListenerEvent, options: 
JsonProtocolOptions): String = {
     toJsonString { generator =>
-      writeSparkEventToJson(event, generator)
+      writeSparkEventToJson(event, generator, options)
     }
   }
 
-  def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit 
= {
+  def writeSparkEventToJson(
+      event: SparkListenerEvent,
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
-        stageSubmittedToJson(stageSubmitted, g)
+        stageSubmittedToJson(stageSubmitted, g, options)
       case stageCompleted: SparkListenerStageCompleted =>
-        stageCompletedToJson(stageCompleted, g)
+        stageCompletedToJson(stageCompleted, g, options)
       case taskStart: SparkListenerTaskStart =>
-        taskStartToJson(taskStart, g)
+        taskStartToJson(taskStart, g, options)
       case taskGettingResult: SparkListenerTaskGettingResult =>
-        taskGettingResultToJson(taskGettingResult, g)
+        taskGettingResultToJson(taskGettingResult, g, options)
       case taskEnd: SparkListenerTaskEnd =>
-        taskEndToJson(taskEnd, g)
+        taskEndToJson(taskEnd, g, options)
       case jobStart: SparkListenerJobStart =>
-        jobStartToJson(jobStart, g)
+        jobStartToJson(jobStart, g, options)
       case jobEnd: SparkListenerJobEnd =>
         jobEndToJson(jobEnd, g)
       case environmentUpdate: SparkListenerEnvironmentUpdate =>
@@ -112,12 +134,15 @@ private[spark] object JsonProtocol extends JsonUtils {
     }
   }
 
-  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: 
JsonGenerator): Unit = {
+  def stageSubmittedToJson(
+      stageSubmitted: SparkListenerStageSubmitted,
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
     g.writeFieldName("Stage Info")
     // SPARK-42205: don't log accumulables in start events:
-    stageInfoToJson(stageSubmitted.stageInfo, g, includeAccumulables = false)
+    stageInfoToJson(stageSubmitted.stageInfo, g, options, includeAccumulables 
= false)
     Option(stageSubmitted.properties).foreach { properties =>
       g.writeFieldName("Properties")
       propertiesToJson(properties, g)
@@ -125,38 +150,48 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeEndObject()
   }
 
-  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: 
JsonGenerator): Unit = {
+  def stageCompletedToJson(
+      stageCompleted: SparkListenerStageCompleted,
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
     g.writeFieldName("Stage Info")
-    stageInfoToJson(stageCompleted.stageInfo, g, includeAccumulables = true)
+    stageInfoToJson(stageCompleted.stageInfo, g, options, includeAccumulables 
= true)
     g.writeEndObject()
   }
 
-  def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): 
Unit = {
+  def taskStartToJson(
+      taskStart: SparkListenerTaskStart,
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart)
     g.writeNumberField("Stage ID", taskStart.stageId)
     g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
     g.writeFieldName("Task Info")
     // SPARK-42205: don't log accumulables in start events:
-    taskInfoToJson(taskStart.taskInfo, g, includeAccumulables = false)
+    taskInfoToJson(taskStart.taskInfo, g, options, includeAccumulables = false)
     g.writeEndObject()
   }
 
   def taskGettingResultToJson(
       taskGettingResult: SparkListenerTaskGettingResult,
-      g: JsonGenerator): Unit = {
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     val taskInfo = taskGettingResult.taskInfo
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
     g.writeFieldName("Task Info")
     // SPARK-42205: don't log accumulables in "task getting result" events:
-    taskInfoToJson(taskInfo, g, includeAccumulables = false)
+    taskInfoToJson(taskInfo, g, options, includeAccumulables = false)
     g.writeEndObject()
   }
 
-  def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
+  def taskEndToJson(
+      taskEnd: SparkListenerTaskEnd,
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
     g.writeNumberField("Stage ID", taskEnd.stageId)
@@ -165,7 +200,7 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeFieldName("Task End Reason")
     taskEndReasonToJson(taskEnd.reason, g)
     g.writeFieldName("Task Info")
-    taskInfoToJson(taskEnd.taskInfo, g, includeAccumulables = true)
+    taskInfoToJson(taskEnd.taskInfo, g, options, includeAccumulables = true)
     g.writeFieldName("Task Executor Metrics")
     executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
     Option(taskEnd.taskMetrics).foreach { m =>
@@ -175,7 +210,10 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeEndObject()
   }
 
-  def jobStartToJson(jobStart: SparkListenerJobStart, g: JsonGenerator): Unit 
= {
+  def jobStartToJson(
+      jobStart: SparkListenerJobStart,
+      g: JsonGenerator,
+      options: JsonProtocolOptions): Unit = {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart)
     g.writeNumberField("Job ID", jobStart.jobId)
@@ -186,7 +224,7 @@ private[spark] object JsonProtocol extends JsonUtils {
     // the job was submitted: it is technically possible for a stage to belong 
to multiple
     // concurrent jobs, so this situation can arise even without races 
occurring between
     // event logging and stage completion.
-    jobStart.stageInfos.foreach(stageInfoToJson(_, g, includeAccumulables = 
true))
+    jobStart.stageInfos.foreach(stageInfoToJson(_, g, options, 
includeAccumulables = true))
     g.writeEndArray()
     g.writeArrayFieldStart("Stage IDs")
     jobStart.stageIds.foreach(g.writeNumber)
@@ -386,6 +424,7 @@ private[spark] object JsonProtocol extends JsonUtils {
   def stageInfoToJson(
       stageInfo: StageInfo,
       g: JsonGenerator,
+      options: JsonProtocolOptions,
       includeAccumulables: Boolean): Unit = {
     g.writeStartObject()
     g.writeNumberField("Stage ID", stageInfo.stageId)
@@ -404,7 +443,10 @@ private[spark] object JsonProtocol extends JsonUtils {
     stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _))
     g.writeFieldName("Accumulables")
     if (includeAccumulables) {
-      accumulablesToJson(stageInfo.accumulables.values, g)
+      accumulablesToJson(
+        stageInfo.accumulables.values,
+        g,
+        includeTaskMetricsAccumulators = 
options.includeTaskMetricsAccumulators)
     } else {
       g.writeStartArray()
       g.writeEndArray()
@@ -418,6 +460,7 @@ private[spark] object JsonProtocol extends JsonUtils {
   def taskInfoToJson(
       taskInfo: TaskInfo,
       g: JsonGenerator,
+      options: JsonProtocolOptions,
       includeAccumulables: Boolean): Unit = {
     g.writeStartObject()
     g.writeNumberField("Task ID", taskInfo.taskId)
@@ -435,7 +478,10 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeBooleanField("Killed", taskInfo.killed)
     g.writeFieldName("Accumulables")
     if (includeAccumulables) {
-      accumulablesToJson(taskInfo.accumulables, g)
+      accumulablesToJson(
+        taskInfo.accumulables,
+        g,
+        includeTaskMetricsAccumulators = 
options.includeTaskMetricsAccumulators)
     } else {
       g.writeStartArray()
       g.writeEndArray()
@@ -443,13 +489,23 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeEndObject()
   }
 
-  private lazy val accumulableExcludeList = 
Set("internal.metrics.updatedBlockStatuses")
+  private[util] val accumulableExcludeList = 
Set(InternalAccumulator.UPDATED_BLOCK_STATUSES)
+
+  private[this] val taskMetricAccumulableNames = 
TaskMetrics.empty.nameToAccums.keySet.toSet
 
-  def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: 
JsonGenerator): Unit = {
+  def accumulablesToJson(
+      accumulables: Iterable[AccumulableInfo],
+      g: JsonGenerator,
+    includeTaskMetricsAccumulators: Boolean = true): Unit = {
     g.writeStartArray()
     accumulables
-        .filterNot(_.name.exists(accumulableExcludeList.contains))
-        .toList.sortBy(_.id).foreach(a => accumulableInfoToJson(a, g))
+        .filterNot { acc =>
+          acc.name.exists(accumulableExcludeList.contains) ||
+          (!includeTaskMetricsAccumulators && 
acc.name.exists(taskMetricAccumulableNames.contains))
+        }
+        .toList
+        .sortBy(_.id)
+        .foreach(a => accumulableInfoToJson(a, g))
     g.writeEndArray()
   }
 
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index cdee6ccda706..30c9693e6dee 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.exceptions.TestFailedException
 
 import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.internal.config._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
 import org.apache.spark.resource._
@@ -276,7 +277,8 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("StageInfo backward compatibility (details, accumulables)") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L)
-    val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _, 
includeAccumulables = true))
+    val newJson = toJsonString(
+      JsonProtocol.stageInfoToJson(info, _, defaultOptions, 
includeAccumulables = true))
 
     // Fields added after 1.0.0.
     assert(info.details.nonEmpty)
@@ -294,7 +296,8 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("StageInfo resourceProfileId") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L, 5)
-    val json = toJsonString(JsonProtocol.stageInfoToJson(info, _, 
includeAccumulables = true))
+    val json = toJsonString(
+      JsonProtocol.stageInfoToJson(info, _, defaultOptions, 
includeAccumulables = true))
 
     // Fields added after 1.0.0.
     assert(info.details.nonEmpty)
@@ -471,7 +474,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, 
Seq.empty, "unknown",
         resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, 
properties)
-    val oldEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, 
_)).removeField("Stage Infos")
+    val oldEvent = sparkEventToJsonString(jobStart).removeField("Stage Infos")
     val expectedJobStart =
       SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
     assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
@@ -483,8 +486,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     val stageIds = Seq[Int](1, 2, 3, 4)
     val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x 
* 40L, x * 50L))
     val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, 
properties)
-    val oldStartEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _))
-      .removeField("Submission Time")
+    val oldStartEvent = 
sparkEventToJsonString(jobStart).removeField("Submission Time")
     val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, 
properties)
     assertEquals(expectedJobStart, 
JsonProtocol.jobStartFromJson(oldStartEvent))
 
@@ -519,8 +521,9 @@ class JsonProtocolSuite extends SparkFunSuite {
     val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 
3), "details",
       resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val oldStageInfo =
-      toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _, 
includeAccumulables = true))
-        .removeField("Parent IDs")
+      toJsonString(
+        JsonProtocol.stageInfoToJson(stageInfo, _, defaultOptions, 
includeAccumulables = true)
+      ).removeField("Parent IDs")
     val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, 
Seq.empty, "details",
       resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assertEquals(expectedStageInfo, 
JsonProtocol.stageInfoFromJson(oldStageInfo))
@@ -785,6 +788,87 @@ class JsonProtocolSuite extends SparkFunSuite {
     assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected)
   }
 
+  test("SPARK-42204: spark.eventLog.includeTaskMetricsAccumulators config") {
+    val includeConf = new JsonProtocolOptions(
+      new SparkConf().set(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS, true))
+    val excludeConf = new JsonProtocolOptions(
+      new SparkConf().set(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS, false))
+
+    val taskMetricsAccumulables = TaskMetrics
+      .empty
+      .nameToAccums
+      .view
+      .filterKeys(!JsonProtocol.accumulableExcludeList.contains(_))
+      .values
+      .map(_.toInfo(Some(1), None))
+      .toSeq
+
+    val taskInfoWithTaskMetricsAccums = makeTaskInfo(222L, 333, 1, 333, 444L, 
false)
+    taskInfoWithTaskMetricsAccums.setAccumulables(taskMetricsAccumulables)
+    val taskInfoWithoutTaskMetricsAccums = makeTaskInfo(222L, 333, 1, 333, 
444L, false)
+    taskInfoWithoutTaskMetricsAccums.setAccumulables(Seq.empty)
+
+    val stageInfoWithTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 
500L)
+    stageInfoWithTaskMetricsAccums.accumulables.clear()
+    stageInfoWithTaskMetricsAccums.accumulables ++= 
taskMetricsAccumulables.map(x => (x.id, x))
+    val stageInfoWithoutTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 
500L)
+    stageInfoWithoutTaskMetricsAccums.accumulables.clear()
+
+    // Test events which should be impacted by the config.
+
+    // TaskEnd
+    {
+      val originalEvent = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
+        taskInfoWithTaskMetricsAccums,
+        new ExecutorMetrics(Array(12L, 23L, 45L, 67L, 78L, 89L,
+          90L, 123L, 456L, 789L, 40L, 20L, 20L, 10L, 20L, 10L, 301L)),
+        makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, 0,
+          hasHadoopInput = false, hasOutput = false))
+      assertEquals(
+        originalEvent,
+        sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf)))
+      val trimmedEvent = originalEvent.copy(taskInfo = 
taskInfoWithoutTaskMetricsAccums)
+      assertEquals(
+        trimmedEvent,
+        sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf)))
+    }
+
+    // StageCompleted
+    {
+      val originalEvent = 
SparkListenerStageCompleted(stageInfoWithTaskMetricsAccums)
+      assertEquals(
+        originalEvent,
+        sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf)))
+      val trimmedEvent = originalEvent.copy(stageInfo = 
stageInfoWithoutTaskMetricsAccums)
+      assertEquals(
+        trimmedEvent,
+        sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf)))
+    }
+
+    // JobStart
+    {
+      val originalEvent =
+        SparkListenerJobStart(1, 1, Seq(stageInfoWithTaskMetricsAccums), 
properties)
+      assertEquals(
+        originalEvent,
+        sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf)))
+      val trimmedEvent = originalEvent.copy(stageInfos = 
Seq(stageInfoWithoutTaskMetricsAccums))
+      assertEquals(
+        trimmedEvent,
+        sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf)))
+    }
+
+    // ExecutorMetricsUpdate events should be unaffected by the config:
+    val executorMetricsUpdate =
+      SparkListenerExecutorMetricsUpdate("0", Seq((0, 0, 0, 
taskMetricsAccumulables)))
+    assert(
+      sparkEventToJsonString(executorMetricsUpdate, includeConf) ===
+      sparkEventToJsonString(executorMetricsUpdate, excludeConf))
+    assertEquals(
+      
JsonProtocol.sparkEventFromJson(sparkEventToJsonString(executorMetricsUpdate, 
includeConf)),
+      executorMetricsUpdate)
+    }
+
   test("SPARK-42403: properly handle null string values") {
     // Null string values can appear in a few different event types,
     // so we test multiple known cases here:
@@ -966,7 +1050,8 @@ private[spark] object JsonProtocolSuite extends Assertions 
{
 
   private def testStageInfo(info: StageInfo): Unit = {
     val newInfo = JsonProtocol.stageInfoFromJson(
-      toJsonString(JsonProtocol.stageInfoToJson(info, _, includeAccumulables = 
true)))
+      toJsonString(
+        JsonProtocol.stageInfoToJson(info, _, defaultOptions, 
includeAccumulables = true)))
     assertEquals(info, newInfo)
   }
 
@@ -990,7 +1075,8 @@ private[spark] object JsonProtocolSuite extends Assertions 
{
 
   private def testTaskInfo(info: TaskInfo): Unit = {
     val newInfo = JsonProtocol.taskInfoFromJson(
-      toJsonString(JsonProtocol.taskInfoToJson(info, _, includeAccumulables = 
true)))
+      toJsonString(
+        JsonProtocol.taskInfoToJson(info, _, defaultOptions, 
includeAccumulables = true)))
     assertEquals(info, newInfo)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to