This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9237fb2 [SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version 9237fb2 is described below commit 9237fb2ca90d16208634d70cdff1a0ea9ddcce26 Author: Yuanjian Li <xyliyuanj...@gmail.com> AuthorDate: Wed Jul 8 09:36:06 2020 +0900 [SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version ### What changes were proposed in this pull request? Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version. ### Why are the changes needed? Follow up PR for #28941. ### Does this PR introduce _any_ user-facing change? When we use the Spark version 3.0 history server reading the event log written by the old Spark version, we use the invalid value -2 to fill the map index. ### How was this patch tested? Existing UT. Closes #28965 from xuanyuanking/follow-up. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> (cherry picked from commit 365961155a655f19c9184b16ccd493838c848213) Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 3 ++- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 8 ++++++-- core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index b13028f..6606d31 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -90,7 +90,8 @@ case class FetchFailed( extends TaskFailedReason { override def toErrorString: String = { val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString - s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndex, " + + val mapIndexString = if (mapIndex == Int.MinValue) "Unknown" else mapIndex.toString + s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndexString, " + s"mapId=$mapId, reduceId=$reduceId, message=\n$message\n)" } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 74ff5c7..78fbd0c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -993,8 +993,12 @@ private[spark] object JsonProtocol { val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address") val shuffleId = (json \ "Shuffle ID").extract[Int] val mapId = (json \ "Map ID").extract[Long] - val mapIndex = (json \ "Map Index") match { - case JNothing => 0 + val mapIndex = json \ "Map Index" match { + case JNothing => + // Note, we use the invalid value Int.MinValue here to fill the map index for backward + // compatibility. Otherwise, the fetch failed event will be dropped when the history + // server loads the event log written by the Spark version before 3.0. + Int.MinValue case x => x.extract[Int] } val reduceId = (json \ "Reduce ID").extract[Int] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 98aaa9e..b77cd81 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -312,7 +312,7 @@ class JsonProtocolSuite extends SparkFunSuite { val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed) .removeField({ _._1 == "Map Index" }) val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, - 0, 19, "ignored") + Int.MinValue, 19, "ignored") assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org