Repository: spark
Updated Branches:
  refs/heads/branch-2.3 373ac642f -> 23ba4416e


[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

## What changes were proposed in this pull request?

The issue here is `AppStatusStore.lastStageAttempt` will return the next 
available stage in the store when a stage doesn't exist.

This PR adds `last(stageId)` to ensure it returns a correct `StageData`

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #20654 from zsxwing/SPARK-23481.

(cherry picked from commit 744d5af652ee8cece361cbca31e5201134e0fb42)
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ba4416
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ba4416
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ba4416

Branch: refs/heads/branch-2.3
Commit: 23ba4416e1bbbaa818876d7a837f7a5e260aa048
Parents: 373ac64
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Wed Feb 21 15:37:28 2018 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Wed Feb 21 15:37:36 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusStore.scala    |  6 +++-
 .../spark/status/AppStatusListenerSuite.scala   | 33 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index efc2853..688f25a 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
   }
 
   def lastStageAttempt(stageId: Int): v1.StageData = {
-    val it = 
store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
+    val it = store.view(classOf[StageDataWrapper])
+      .index("stageId")
+      .reverse()
+      .first(stageId)
+      .last(stageId)
       .closeableIterator()
     try {
       if (it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ba4416/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 01d76a2..f3fa4c9 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1057,6 +1057,39 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
   }
 
+  test("lastStageAttempt should fail when the stage doesn't exist") {
+    val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1)
+    val listener = new AppStatusListener(store, testConf, true)
+    val appStore = new AppStatusStore(store)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+    time += 1
+    stage1.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
+    stage1.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+    // Make stage 3 complete before stage 2 so that stage 3 will be evicted
+    time += 1
+    stage3.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new 
Properties()))
+    stage3.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage3))
+
+    time += 1
+    stage2.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
+    stage2.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+
+    assert(appStore.asOption(appStore.lastStageAttempt(1)) === None)
+    assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === 
Some(2))
+    assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
+  }
+
   test("driver logs") {
     val listener = new AppStatusListener(store, conf, true)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to