This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5a4b075f95f [SPARK-40261][CORE] Exclude DirectTaskResult metadata when calculating result size 5a4b075f95f is described below commit 5a4b075f95f4cb305ba96d6de34d3c004e15f241 Author: Ziqi Liu <ziqi....@databricks.com> AuthorDate: Wed Aug 31 17:38:35 2022 -0700 [SPARK-40261][CORE] Exclude DirectTaskResult metadata when calculating result size ### What changes were proposed in this pull request? When calculating driver result size, only counting actual result value while excluding other metadata (e.g., accumUpdates) in the serialized result task object. ### Why are the changes needed? metadata should not be counted because they will be discarded by the driver immediately after being processed, and will lead to unexpected exception when running jobs with tons of task but actually return small results. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test Closes #37713 from liuzqt/SPARK-40261. Lead-authored-by: Ziqi Liu <ziqi....@databricks.com> Co-authored-by: liuzqt <ziqi....@databricks.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> --- .../scala/org/apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../org/apache/spark/scheduler/TaskResultGetterSuite.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 2dabee39131..cfc1f79fab2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -63,7 +63,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul try { val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => - if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { + if (!taskSetManager.canFetchMoreResults(directResult.valueBytes.limit())) { // kill the task so that it will not become zombie task scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled( "Tasks result size has exceeded maxResultSize")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ea44a2d948c..1583d3b96ee 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.TestUtils.JavaSourceFromString +import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.storage.TaskResultBlockId import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, ThreadUtils, Utils} @@ -297,6 +298,18 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(unknownFailure.findFirstMatchIn(message).isDefined) } + test("SPARK-40261: task result metadata should not be counted into result size") { + val conf = new SparkConf().set(MAX_RESULT_SIZE.key, "1M") + sc = new SparkContext("local", "test", conf) + val rdd = sc.parallelize(1 to 10000, 10000) + // This will trigger 10k task but return empty result. The total serialized return tasks + // size(including accumUpdates metadata) would be ~10M in total in this example, but the result + // value itself is pretty small(empty arrays) + // Even setting MAX_RESULT_SIZE to a small value(1M here), it should not throw exception + // because the actual result is small + assert(rdd.filter(_ < 0).collect().isEmpty) + } + } private class UndeserializableException extends Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org