This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4b075f95f [SPARK-40261][CORE] Exclude DirectTaskResult metadata when 
calculating result size
5a4b075f95f is described below

commit 5a4b075f95f4cb305ba96d6de34d3c004e15f241
Author: Ziqi Liu <ziqi....@databricks.com>
AuthorDate: Wed Aug 31 17:38:35 2022 -0700

    [SPARK-40261][CORE] Exclude DirectTaskResult metadata when calculating 
result size
    
    ### What changes were proposed in this pull request?
    When calculating driver result size, only counting actual result value 
while excluding other metadata (e.g., accumUpdates) in the serialized result 
task object.
    
    ### Why are the changes needed?
    metadata should not be counted because they will be discarded by the driver 
immediately after being processed, and will lead to unexpected exception when 
running jobs with tons of task but actually return small results.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Unit test
    
    Closes #37713 from liuzqt/SPARK-40261.
    
    Lead-authored-by: Ziqi Liu <ziqi....@databricks.com>
    Co-authored-by: liuzqt <ziqi....@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../scala/org/apache/spark/scheduler/TaskResultGetter.scala |  2 +-
 .../org/apache/spark/scheduler/TaskResultGetterSuite.scala  | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 2dabee39131..cfc1f79fab2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -63,7 +63,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
         try {
           val (result, size) = 
serializer.get().deserialize[TaskResult[_]](serializedData) match {
             case directResult: DirectTaskResult[_] =>
-              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) 
{
+              if 
(!taskSetManager.canFetchMoreResults(directResult.valueBytes.limit())) {
                 // kill the task so that it will not become zombie task
                 scheduler.handleFailedTask(taskSetManager, tid, 
TaskState.KILLED, TaskKilled(
                   "Tasks result size has exceeded maxResultSize"))
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index ea44a2d948c..1583d3b96ee 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.TestUtils.JavaSourceFromString
+import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.storage.TaskResultBlockId
 import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, ThreadUtils, 
Utils}
@@ -297,6 +298,18 @@ class TaskResultGetterSuite extends SparkFunSuite with 
BeforeAndAfter with Local
     assert(unknownFailure.findFirstMatchIn(message).isDefined)
   }
 
+  test("SPARK-40261: task result metadata should not be counted into result 
size") {
+    val conf = new SparkConf().set(MAX_RESULT_SIZE.key, "1M")
+    sc = new SparkContext("local", "test", conf)
+    val rdd = sc.parallelize(1 to 10000, 10000)
+    // This will trigger 10k task but return empty result. The total 
serialized return tasks
+    // size(including accumUpdates metadata) would be ~10M in total in this 
example, but the result
+    // value itself is pretty small(empty arrays)
+    // Even setting MAX_RESULT_SIZE to a small value(1M here), it should not 
throw exception
+    // because the actual result is small
+    assert(rdd.filter(_ < 0).collect().isEmpty)
+  }
+
 }
 
 private class UndeserializableException extends Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to