This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bb59b489204 [SPARK-42205][CORE] Don't log accumulator values in stage 
/ task start and getting result event logs
bb59b489204 is described below

commit bb59b4892042bd4ad0c18f2a7fb2380c292d9e3e
Author: Josh Rosen <[email protected]>
AuthorDate: Fri Sep 29 11:27:51 2023 -0700

    [SPARK-42205][CORE] Don't log accumulator values in stage / task start and 
getting result event logs
    
    ### What changes were proposed in this pull request?
    
    This PR modifies JsonProtocol in order to skip logging of accumulator 
values in the logs for SparkListenerTaskStart, SparkListenerStageSubmitted, and 
SparkListenerTaskGettingResult events.
    
    These events contain mutable TaskInfo and StageInfo objects, which in turn 
contain Accumulables fields. When a task or stage is submitted, Accumulables is 
initially empty. When the task or stage finishes, this field is updated with 
values from the task.
    
    If a task or stage finishes _before_ the start event has been logged by the 
event logging listener then the start event will contain the Accumulable values 
from the task or stage end event.
    
    This PR updates JsonProtocol to log an empty Accumulables value for stage 
and task start events.
    
    I considered and rejected an alternative approach where the listener event 
itself would contain an immutable snapshot of the TaskInfo or StageInfo, as 
this will increase memory pressure on the driver during periods of heavy event 
logging.
    
    Those accumulables values in the start events are not used: I confirmed 
this by checking AppStatusListener and SQLAppStatusListener code.
    
    I have deliberately chosen to **not** drop the field for _job_ start events 
because it is technically possible (but rare) for a job to reference stages 
that are completed at the time that the job is submitted (a state can 
technically belong to multiple jobs) and in that case it seems consistent to 
have the StageInfo accurately reflect all of the information about the 
already-completed stage.
    
    ### Why are the changes needed?
    
    This information isn't used by the History Server and contributes to 
wasteful bloat in event log sizes. In one real-world log, I found that ~10% of 
the uncompressed log size was due to these redundant Accumulable fields in 
stage and task start events.
    
    I don't think that we need to worry about backwards-compatibility here 
because the old behavior was non-deterministic: whether or not a start event 
log contained accumulator updates was a function of the relative speed of task 
completion and the processing rate of the event logging listener; it seems 
unlikely that any third-party event log consumers would be relying on such an 
inconsistently present value when they could instead rely on the values in the 
corresponding end events.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New and updated tests in JsonProtocolSuite.
    
    Closes #39767 from 
JoshRosen/SPARK-42205-dont-log-accumulables-in-jsonprotocol-start-events.
    
    Authored-by: Josh Rosen <[email protected]>
    Signed-off-by: Josh Rosen <[email protected]>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala |  44 +++--
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 183 +++++++++------------
 2 files changed, 109 insertions(+), 118 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8654b658809..6525bd321e6 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -115,7 +115,8 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
     g.writeFieldName("Stage Info")
-    stageInfoToJson(stageSubmitted.stageInfo, g)
+    // SPARK-42205: don't log accumulables in start events:
+    stageInfoToJson(stageSubmitted.stageInfo, g, includeAccumulables = false)
     Option(stageSubmitted.properties).foreach { properties =>
       g.writeFieldName("Properties")
       propertiesToJson(properties, g)
@@ -127,7 +128,7 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
     g.writeFieldName("Stage Info")
-    stageInfoToJson(stageCompleted.stageInfo, g)
+    stageInfoToJson(stageCompleted.stageInfo, g, includeAccumulables = true)
     g.writeEndObject()
   }
 
@@ -137,7 +138,8 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeNumberField("Stage ID", taskStart.stageId)
     g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
     g.writeFieldName("Task Info")
-    taskInfoToJson(taskStart.taskInfo, g)
+    // SPARK-42205: don't log accumulables in start events:
+    taskInfoToJson(taskStart.taskInfo, g, includeAccumulables = false)
     g.writeEndObject()
   }
 
@@ -148,7 +150,8 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeStartObject()
     g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
     g.writeFieldName("Task Info")
-    taskInfoToJson(taskInfo, g)
+    // SPARK-42205: don't log accumulables in "task getting result" events:
+    taskInfoToJson(taskInfo, g, includeAccumulables = false)
     g.writeEndObject()
   }
 
@@ -161,7 +164,7 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeFieldName("Task End Reason")
     taskEndReasonToJson(taskEnd.reason, g)
     g.writeFieldName("Task Info")
-    taskInfoToJson(taskEnd.taskInfo, g)
+    taskInfoToJson(taskEnd.taskInfo, g, includeAccumulables = true)
     g.writeFieldName("Task Executor Metrics")
     executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
     Option(taskEnd.taskMetrics).foreach { m =>
@@ -177,7 +180,12 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeNumberField("Job ID", jobStart.jobId)
     g.writeNumberField("Submission Time", jobStart.time)
     g.writeArrayFieldStart("Stage Infos")  // Added in Spark 1.2.0
-    jobStart.stageInfos.foreach(stageInfoToJson(_, g))
+    // SPARK-42205: here, we purposely include accumulables so that we 
accurately log all
+    // available information about stages that may have already completed by 
the time
+    // the job was submitted: it is technically possible for a stage to belong 
to multiple
+    // concurrent jobs, so this situation can arise even without races 
occurring between
+    // event logging and stage completion.
+    jobStart.stageInfos.foreach(stageInfoToJson(_, g, includeAccumulables = 
true))
     g.writeEndArray()
     g.writeArrayFieldStart("Stage IDs")
     jobStart.stageIds.foreach(g.writeNumber)
@@ -373,7 +381,10 @@ private[spark] object JsonProtocol extends JsonUtils {
    * JSON serialization methods for classes SparkListenerEvents depend on |
    * -------------------------------------------------------------------- */
 
-  def stageInfoToJson(stageInfo: StageInfo, g: JsonGenerator): Unit = {
+  def stageInfoToJson(
+      stageInfo: StageInfo,
+      g: JsonGenerator,
+      includeAccumulables: Boolean): Unit = {
     g.writeStartObject()
     g.writeNumberField("Stage ID", stageInfo.stageId)
     g.writeNumberField("Stage Attempt ID", stageInfo.attemptNumber)
@@ -390,14 +401,22 @@ private[spark] object JsonProtocol extends JsonUtils {
     stageInfo.completionTime.foreach(g.writeNumberField("Completion Time", _))
     stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _))
     g.writeFieldName("Accumulables")
-    accumulablesToJson(stageInfo.accumulables.values, g)
+    if (includeAccumulables) {
+      accumulablesToJson(stageInfo.accumulables.values, g)
+    } else {
+      g.writeStartArray()
+      g.writeEndArray()
+    }
     g.writeNumberField("Resource Profile Id", stageInfo.resourceProfileId)
     g.writeBooleanField("Shuffle Push Enabled", stageInfo.isShufflePushEnabled)
     g.writeNumberField("Shuffle Push Mergers Count", 
stageInfo.shuffleMergerCount)
     g.writeEndObject()
   }
 
-  def taskInfoToJson(taskInfo: TaskInfo, g: JsonGenerator): Unit = {
+  def taskInfoToJson(
+      taskInfo: TaskInfo,
+      g: JsonGenerator,
+      includeAccumulables: Boolean): Unit = {
     g.writeStartObject()
     g.writeNumberField("Task ID", taskInfo.taskId)
     g.writeNumberField("Index", taskInfo.index)
@@ -413,7 +432,12 @@ private[spark] object JsonProtocol extends JsonUtils {
     g.writeBooleanField("Failed", taskInfo.failed)
     g.writeBooleanField("Killed", taskInfo.killed)
     g.writeFieldName("Accumulables")
-    accumulablesToJson(taskInfo.accumulables, g)
+    if (includeAccumulables) {
+      accumulablesToJson(taskInfo.accumulables, g)
+    } else {
+      g.writeStartArray()
+      g.writeEndArray()
+    }
     g.writeEndObject()
   }
 
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 1a8170bf0f6..5a6c332ec02 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -42,18 +42,24 @@ import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage._
 
 class JsonProtocolSuite extends SparkFunSuite {
-  import JsonProtocol.toJsonString
+  import JsonProtocol._
   import JsonProtocolSuite._
 
   test("SparkListenerEvent") {
     val stageSubmitted =
-      SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), 
properties)
+      SparkListenerStageSubmitted(
+        makeStageInfo(100, 200, 300, 400L, 500L, includeAccumulables = false), 
properties)
     val stageSubmittedWithNullProperties =
-      SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), 
properties = null)
+      SparkListenerStageSubmitted(
+        makeStageInfo(100, 200, 300, 400L, 500L, includeAccumulables = false), 
properties = null)
     val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 
301, 401L, 501L))
-    val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 
333, 444L, false))
-    val taskGettingResult =
-      SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 2000, 3000L, 
true))
+    val taskStart =
+      SparkListenerTaskStart(
+        111,
+        0,
+        makeTaskInfo(222L, 333, 1, 333, 444L, speculative = false, 
includeAccumulables = false))
+    val taskGettingResult = SparkListenerTaskGettingResult(
+        makeTaskInfo(1000L, 2000, 5, 2000, 3000L, speculative = true, 
includeAccumulables = false))
     val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
       makeTaskInfo(123L, 234, 67, 234, 345L, false),
       new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L,
@@ -270,7 +276,7 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("StageInfo backward compatibility (details, accumulables)") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L)
-    val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _))
+    val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _, 
includeAccumulables = true))
 
     // Fields added after 1.0.0.
     assert(info.details.nonEmpty)
@@ -288,7 +294,7 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("StageInfo resourceProfileId") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L, 5)
-    val json = toJsonString(JsonProtocol.stageInfoToJson(info, _))
+    val json = toJsonString(JsonProtocol.stageInfoToJson(info, _, 
includeAccumulables = true))
 
     // Fields added after 1.0.0.
     assert(info.details.nonEmpty)
@@ -512,8 +518,9 @@ class JsonProtocolSuite extends SparkFunSuite {
     // Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property
     val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 
3), "details",
       resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-    val oldStageInfo = toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _))
-      .removeField("Parent IDs")
+    val oldStageInfo =
+      toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _, 
includeAccumulables = true))
+        .removeField("Parent IDs")
     val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, 
Seq.empty, "details",
       resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assertEquals(expectedStageInfo, 
JsonProtocol.stageInfoFromJson(oldStageInfo))
@@ -874,12 +881,42 @@ class JsonProtocolSuite extends SparkFunSuite {
     val jobFailedEvent = 
JsonProtocol.sparkEventFromJson(exJobFailureNoStackJson)
     testEvent(jobFailedEvent, exJobFailureExpectedJson)
   }
+
+  test("SPARK-42205: don't log accumulables in start and getting result 
events") {
+    // Simulate case where a job / stage / task completes before the event 
logging
+    // listener logs the event. In this case, the TaskInfo / StageInfo will 
have
+    // accumulables from the finished task / stage, but we want to skip logging
+    // them because they are redundant with the accumulables in the end event 
and
+    // the history server only uses the value from the end event because start
+    // events normally will not contain accumulable values.
+    val stageInfo = makeStageInfo(1, 200, 300, 400, 500)
+    assert(stageInfo.accumulables.nonEmpty)
+    val taskInfo = makeTaskInfo(1, 200, 300, 400, 500, false)
+    assert(taskInfo.accumulables.nonEmpty)
+
+    val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
+    val taskStart = SparkListenerTaskStart(1, 0, taskInfo)
+    val jobStart = SparkListenerJobStart(10, jobSubmissionTime, 
Seq(stageInfo), properties)
+    val gettingResult = SparkListenerTaskGettingResult(taskInfo)
+
+    assert(
+      
stageSubmittedFromJson(sparkEventToJsonString(stageSubmitted)).stageInfo.accumulables.isEmpty)
+    assert(
+      
taskStartFromJson(sparkEventToJsonString(taskStart)).taskInfo.accumulables.isEmpty)
+    assert(
+      taskGettingResultFromJson(sparkEventToJsonString(gettingResult))
+        .taskInfo.accumulables.isEmpty)
+
+    // Deliberately not fixed for job starts because a job might legitimately 
reference
+    // stages that have completed even before the job start event is emitted.
+    testEvent(jobStart, sparkEventToJsonString(jobStart))
+  }
 }
 
 
 private[spark] object JsonProtocolSuite extends Assertions {
   import InternalAccumulator._
-  import JsonProtocol.toJsonString
+  import JsonProtocol._
 
   private val mapper = new ObjectMapper()
 
@@ -929,7 +966,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
 
   private def testStageInfo(info: StageInfo): Unit = {
     val newInfo = JsonProtocol.stageInfoFromJson(
-      toJsonString(JsonProtocol.stageInfoToJson(info, _)))
+      toJsonString(JsonProtocol.stageInfoToJson(info, _, includeAccumulables = 
true)))
     assertEquals(info, newInfo)
   }
 
@@ -953,7 +990,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
 
   private def testTaskInfo(info: TaskInfo): Unit = {
     val newInfo = JsonProtocol.taskInfoFromJson(
-      toJsonString(JsonProtocol.taskInfoToJson(info, _)))
+      toJsonString(JsonProtocol.taskInfoToJson(info, _, includeAccumulables = 
true)))
     assertEquals(info, newInfo)
   }
 
@@ -1326,7 +1363,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       c: Int,
       d: Long,
       e: Long,
-      rpId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) = {
+      rpId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID,
+      includeAccumulables: Boolean = true) = {
     val rddInfos = (0 until a % 5).map { i =>
       if (i == (a % 5) - 1) {
         makeRddInfo(a + i, b + i, c + i, d + i, e + i, 
DeterministicLevel.INDETERMINATE)
@@ -1336,17 +1374,30 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
     }
     val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 
200, 300), "details",
       resourceProfileId = rpId)
-    val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
-    stageInfo.accumulables(acc1.id) = acc1
-    stageInfo.accumulables(acc2.id) = acc2
+    if (includeAccumulables) {
+      val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
+      stageInfo.accumulables(acc1.id) = acc1
+      stageInfo.accumulables(acc2.id) = acc2
+    }
     stageInfo
   }
 
-  private def makeTaskInfo(a: Long, b: Int, c: Int, d: Int, e: Long, 
speculative: Boolean) = {
+  private def makeTaskInfo(
+      a: Long,
+      b: Int,
+      c: Int,
+      d: Int,
+      e: Long,
+      speculative: Boolean,
+      includeAccumulables: Boolean = true) = {
     val taskInfo = new TaskInfo(a, b, c, d, e,
       "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
-    taskInfo.setAccumulables(
-      List(makeAccumulableInfo(1), makeAccumulableInfo(2), 
makeAccumulableInfo(3, internal = true)))
+    if (includeAccumulables) {
+      taskInfo.setAccumulables(List(
+        makeAccumulableInfo(1),
+        makeAccumulableInfo(2),
+        makeAccumulableInfo(3, internal = true)))
+    }
     taskInfo
   }
 
@@ -1459,24 +1510,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "RDD Info": [],
       |    "Parent IDs" : [100, 200, 300],
       |    "Details": "details",
-      |    "Accumulables": [
-      |      {
-      |        "ID": 1,
-      |        "Name": "Accumulable1",
-      |        "Update": "delta1",
-      |        "Value": "val1",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      },
-      |      {
-      |        "ID": 2,
-      |        "Name": "Accumulable2",
-      |        "Update": "delta2",
-      |        "Value": "val2",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      }
-      |    ],
+      |    "Accumulables": [],
       |    "Resource Profile Id" : 0,
       |    "Shuffle Push Enabled" : false,
       |    "Shuffle Push Mergers Count" : 0
@@ -1502,24 +1536,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "RDD Info": [],
       |    "Parent IDs" : [100, 200, 300],
       |    "Details": "details",
-      |    "Accumulables": [
-      |      {
-      |        "ID": 1,
-      |        "Name": "Accumulable1",
-      |        "Update": "delta1",
-      |        "Value": "val1",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      },
-      |      {
-      |        "ID": 2,
-      |        "Name": "Accumulable2",
-      |        "Update": "delta2",
-      |        "Value": "val2",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      }
-      |    ],
+      |    "Accumulables": [],
       |    "Resource Profile Id" : 0,
       |    "Shuffle Push Enabled" : false,
       |    "Shuffle Push Mergers Count" : 0
@@ -1604,32 +1621,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Finish Time": 0,
       |    "Failed": false,
       |    "Killed": false,
-      |    "Accumulables": [
-      |      {
-      |        "ID": 1,
-      |        "Name": "Accumulable1",
-      |        "Update": "delta1",
-      |        "Value": "val1",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      },
-      |      {
-      |        "ID": 2,
-      |        "Name": "Accumulable2",
-      |        "Update": "delta2",
-      |        "Value": "val2",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      },
-      |      {
-      |        "ID": 3,
-      |        "Name": "Accumulable3",
-      |        "Update": "delta3",
-      |        "Value": "val3",
-      |        "Internal": true,
-      |        "Count Failed Values": false
-      |      }
-      |    ]
+      |    "Accumulables": []
       |  }
       |}
     """.stripMargin
@@ -1652,32 +1644,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Finish Time": 0,
       |    "Failed": false,
       |    "Killed": false,
-      |    "Accumulables": [
-      |      {
-      |        "ID": 1,
-      |        "Name": "Accumulable1",
-      |        "Update": "delta1",
-      |        "Value": "val1",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      },
-      |      {
-      |        "ID": 2,
-      |        "Name": "Accumulable2",
-      |        "Update": "delta2",
-      |        "Value": "val2",
-      |        "Internal": false,
-      |        "Count Failed Values": false
-      |      },
-      |      {
-      |        "ID": 3,
-      |        "Name": "Accumulable3",
-      |        "Update": "delta3",
-      |        "Value": "val3",
-      |        "Internal": true,
-      |        "Count Failed Values": false
-      |      }
-      |    ]
+      |    "Accumulables": []
       |  }
       |}
     """.stripMargin


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to