This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 7cb92a5d25a7 [SPARK-51378][CORE] Apply JsonProtocol's 
accumulableExcludeList to ExecutorMetricsUpdate and TaskEndReason
7cb92a5d25a7 is described below

commit 7cb92a5d25a7cba730be3b6c88333f4574cf7c37
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Tue Mar 4 15:36:50 2025 +0900

    [SPARK-51378][CORE] Apply JsonProtocol's accumulableExcludeList to 
ExecutorMetricsUpdate and TaskEndReason
    
    ### What changes were proposed in this pull request?
    
    This PR updates `JsonProtocol` so that its `accumulableExcludeList` is 
considered when logging accumulable in TaskEndReasons and ExecutorMetricUpdate 
events.
    
    This exclusion list was originally added in 
https://github.com/apache/spark/pull/17412 and originally only applied to 
StageInfo and TaskInfo logging.
    
    ### Why are the changes needed?
    
    Reduce event log size and improve event logger throughput by avoiding the 
need to process possibly large / expensive accumulators.
    
    This gap for ExecutorMetricsUpdate was noted in a comment 
(https://github.com/apache/spark/pull/17412#issuecomment-331018069); I recently 
rediscovered it in my own Spark usage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50141 from 
JoshRosen/apply-accumulableExcludeList-to-other-event-log-fields.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 62f7f14edc53b12ecd6a3eb3b65254124efcc9f0)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 14 ++++------
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 30 +++++-----------------
 2 files changed, 11 insertions(+), 33 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index e30380f41566..df809f4fad74 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -376,9 +376,8 @@ private[spark] object JsonProtocol extends JsonUtils {
       g.writeNumberField("Task ID", taskId)
       g.writeNumberField("Stage ID", stageId)
       g.writeNumberField("Stage Attempt ID", stageAttemptId)
-      g.writeArrayFieldStart("Accumulator Updates")
-      updates.foreach(accumulableInfoToJson(_, g))
-      g.writeEndArray()
+      g.writeFieldName("Accumulator Updates")
+      accumulablesToJson(updates, g)
       g.writeEndObject()
     }
     g.writeEndArray()
@@ -496,7 +495,7 @@ private[spark] object JsonProtocol extends JsonUtils {
   def accumulablesToJson(
       accumulables: Iterable[AccumulableInfo],
       g: JsonGenerator,
-    includeTaskMetricsAccumulators: Boolean = true): Unit = {
+      includeTaskMetricsAccumulators: Boolean = true): Unit = {
     g.writeStartArray()
     accumulables
         .filterNot { acc =>
@@ -714,11 +713,8 @@ private[spark] object JsonProtocol extends JsonUtils {
         reason.foreach(g.writeStringField("Loss Reason", _))
       case taskKilled: TaskKilled =>
         g.writeStringField("Kill Reason", taskKilled.reason)
-        g.writeArrayFieldStart("Accumulator Updates")
-        taskKilled.accumUpdates.foreach { info =>
-          accumulableInfoToJson(info, g)
-        }
-        g.writeEndArray()
+        g.writeFieldName("Accumulator Updates")
+        accumulablesToJson(taskKilled.accumUpdates, g)
       case _ =>
         // no extra fields to write
     }
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 30c9693e6dee..cfba4c02c944 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -1166,7 +1166,9 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
             assert(taskId1 === taskId2)
             assert(stageId1 === stageId2)
             assert(stageAttemptId1 === stageAttemptId2)
-            assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => 
a.equals(b))
+            val filteredUpdates = updates1
+              .filterNot { acc => 
acc.name.exists(accumulableExcludeList.contains) }
+            assertSeqEquals[AccumulableInfo](filteredUpdates, updates2, (a, b) 
=> a.equals(b))
           })
         assertSeqEquals[((Int, Int), ExecutorMetrics)](
           e1.executorUpdates.toSeq.sortBy(_._1),
@@ -1299,7 +1301,9 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
         assert(r1.description === r2.description)
         assertSeqEquals(r1.stackTrace, r2.stackTrace, 
assertStackTraceElementEquals)
         assert(r1.fullStackTrace === r2.fullStackTrace)
-        assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, 
b) => a.equals(b))
+        val filteredUpdates = r1.accumUpdates
+          .filterNot { acc => acc.name.exists(accumulableExcludeList.contains) 
}
+        assertSeqEquals[AccumulableInfo](filteredUpdates, r2.accumUpdates, (a, 
b) => a.equals(b))
       case (TaskResultLost, TaskResultLost) =>
       case (r1: TaskKilled, r2: TaskKilled) =>
         assert(r1.reason == r2.reason)
@@ -2790,28 +2794,6 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 12,
-      |          "Name": "$UPDATED_BLOCK_STATUSES",
-      |          "Update": [
-      |            {
-      |              "Block ID": "rdd_0_0",
-      |              "Status": {
-      |                "Storage Level": {
-      |                  "Use Disk": true,
-      |                  "Use Memory": true,
-      |                  "Use Off Heap": false,
-      |                  "Deserialized": false,
-      |                  "Replication": 2
-      |                },
-      |                "Memory Size": 0,
-      |                "Disk Size": 0
-      |              }
-      |            }
-      |          ],
-      |          "Internal": true,
-      |          "Count Failed Values": true
-      |        },
-      |        {
       |          "ID": 13,
       |          "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}",
       |          "Update": 0,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to