Repository: spark Updated Branches: refs/heads/branch-2.3 632c0d911 -> 49e1eb8bd
[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when cleaning up stages ## What changes were proposed in this pull request? * Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage. * This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly. Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data. ## How was this patch tested? Using existing tests in AppStatusListenerSuite This is my original work and I license the work to the project under the projectâs open source license. Closes #22883 from patrickbrownsync/cleanup-stages-fix. Authored-by: Patrick Brown <patrick.br...@blyncsy.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> (cherry picked from commit e9d3ca0b7993995f24f5c555a570bc2521119e12) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49e1eb8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49e1eb8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49e1eb8b Branch: refs/heads/branch-2.3 Commit: 49e1eb8bdeff2ea13e235ed3a82173887c48643e Parents: 632c0d9 Author: Patrick Brown <patrick.br...@blyncsy.com> Authored: Thu Nov 1 09:34:29 2018 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Nov 1 09:38:21 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/status/AppStatusListener.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/49e1eb8b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index d57c977..3164dc7 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -950,16 +950,6 @@ private[spark] class AppStatusListener( kvstore.delete(e.getClass(), e.id) } - val tasks = kvstore.view(classOf[TaskDataWrapper]) - .index("stage") - .first(key) - .last(key) - .asScala - - tasks.foreach { t => - kvstore.delete(t.getClass(), t.taskId) - } - // Check whether there are remaining attempts for the same stage. If there aren't, then // also delete the RDD graph data. val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) @@ -982,6 +972,15 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) } + + // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow + val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala + val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet + tasks.foreach { t => + if (keys.contains((t.stageId, t.stageAttemptId))) { + kvstore.delete(t.getClass(), t.taskId) + } + } } private def cleanupTasks(stage: LiveStage): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org