This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 7cb92a5d25a7 [SPARK-51378][CORE] Apply JsonProtocol's accumulableExcludeList to ExecutorMetricsUpdate and TaskEndReason 7cb92a5d25a7 is described below commit 7cb92a5d25a7cba730be3b6c88333f4574cf7c37 Author: Josh Rosen <joshro...@databricks.com> AuthorDate: Tue Mar 4 15:36:50 2025 +0900 [SPARK-51378][CORE] Apply JsonProtocol's accumulableExcludeList to ExecutorMetricsUpdate and TaskEndReason ### What changes were proposed in this pull request? This PR updates `JsonProtocol` so that its `accumulableExcludeList` is considered when logging accumulable in TaskEndReasons and ExecutorMetricUpdate events. This exclusion list was originally added in https://github.com/apache/spark/pull/17412 and originally only applied to StageInfo and TaskInfo logging. ### Why are the changes needed? Reduce event log size and improve event logger throughput by avoiding the need to process possibly large / expensive accumulators. This gap for ExecutorMetricsUpdate was noted in a comment (https://github.com/apache/spark/pull/17412#issuecomment-331018069); I recently rediscovered it in my own Spark usage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50141 from JoshRosen/apply-accumulableExcludeList-to-other-event-log-fields. Authored-by: Josh Rosen <joshro...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 62f7f14edc53b12ecd6a3eb3b65254124efcc9f0) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../scala/org/apache/spark/util/JsonProtocol.scala | 14 ++++------ .../org/apache/spark/util/JsonProtocolSuite.scala | 30 +++++----------------- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e30380f41566..df809f4fad74 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -376,9 +376,8 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeNumberField("Task ID", taskId) g.writeNumberField("Stage ID", stageId) g.writeNumberField("Stage Attempt ID", stageAttemptId) - g.writeArrayFieldStart("Accumulator Updates") - updates.foreach(accumulableInfoToJson(_, g)) - g.writeEndArray() + g.writeFieldName("Accumulator Updates") + accumulablesToJson(updates, g) g.writeEndObject() } g.writeEndArray() @@ -496,7 +495,7 @@ private[spark] object JsonProtocol extends JsonUtils { def accumulablesToJson( accumulables: Iterable[AccumulableInfo], g: JsonGenerator, - includeTaskMetricsAccumulators: Boolean = true): Unit = { + includeTaskMetricsAccumulators: Boolean = true): Unit = { g.writeStartArray() accumulables .filterNot { acc => @@ -714,11 +713,8 @@ private[spark] object JsonProtocol extends JsonUtils { reason.foreach(g.writeStringField("Loss Reason", _)) case taskKilled: TaskKilled => g.writeStringField("Kill Reason", taskKilled.reason) - g.writeArrayFieldStart("Accumulator Updates") - taskKilled.accumUpdates.foreach { info => - accumulableInfoToJson(info, g) - } - g.writeEndArray() + g.writeFieldName("Accumulator Updates") + accumulablesToJson(taskKilled.accumUpdates, g) case _ => // no extra fields to write } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 30c9693e6dee..cfba4c02c944 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1166,7 +1166,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(taskId1 === taskId2) assert(stageId1 === stageId2) assert(stageAttemptId1 === stageAttemptId2) - assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b)) + val filteredUpdates = updates1 + .filterNot { acc => acc.name.exists(accumulableExcludeList.contains) } + assertSeqEquals[AccumulableInfo](filteredUpdates, updates2, (a, b) => a.equals(b)) }) assertSeqEquals[((Int, Int), ExecutorMetrics)]( e1.executorUpdates.toSeq.sortBy(_._1), @@ -1299,7 +1301,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(r1.description === r2.description) assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals) assert(r1.fullStackTrace === r2.fullStackTrace) - assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b)) + val filteredUpdates = r1.accumUpdates + .filterNot { acc => acc.name.exists(accumulableExcludeList.contains) } + assertSeqEquals[AccumulableInfo](filteredUpdates, r2.accumUpdates, (a, b) => a.equals(b)) case (TaskResultLost, TaskResultLost) => case (r1: TaskKilled, r2: TaskKilled) => assert(r1.reason == r2.reason) @@ -2790,28 +2794,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Count Failed Values": true | }, | { - | "ID": 12, - | "Name": "$UPDATED_BLOCK_STATUSES", - | "Update": [ - | { - | "Block ID": "rdd_0_0", - | "Status": { - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Use Off Heap": false, - | "Deserialized": false, - | "Replication": 2 - | }, - | "Memory Size": 0, - | "Disk Size": 0 - | } - | } - | ], - | "Internal": true, - | "Count Failed Values": true - | }, - | { | "ID": 13, | "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}", | "Update": 0, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org