Repository: spark
Updated Branches:
  refs/heads/master fe65361b0 -> 7beb375bf


[SPARK-22861][SQL] SQLAppStatusListener handles multi-job executions.

When one execution has multiple jobs, we need to append to the set of
stages, not replace them on every job.

Added unit test and ran existing tests on jenkins

Author: Imran Rashid <[email protected]>

Closes #20047 from squito/SPARK-22861.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7beb375b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7beb375b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7beb375b

Branch: refs/heads/master
Commit: 7beb375bf4e8400f830a7fc7ff414634dd6efc78
Parents: fe65361
Author: Imran Rashid <[email protected]>
Authored: Thu Dec 21 15:37:55 2017 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Dec 21 15:37:55 2017 -0800

----------------------------------------------------------------------
 .../sql/execution/ui/SQLAppStatusListener.scala |  2 +-
 .../ui/SQLAppStatusListenerSuite.scala          | 43 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7beb375b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index aa78fa0..2295b8d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -87,7 +87,7 @@ class SQLAppStatusListener(
     }
 
     exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
-    exec.stages = event.stageIds.toSet
+    exec.stages ++= event.stageIds.toSet
     update(exec)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7beb375b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 5ebbeb4..7d84f45 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -383,6 +383,49 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with 
SharedSQLContext with
     assertJobs(statusStore.execution(executionId), failed = Seq(0))
   }
 
+  test("handle one execution with multiple jobs") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+
+    var stageId = 0
+    def twoStageJob(jobId: Int): Unit = {
+      val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 
0)}
+      stageId += 2
+      listener.onJobStart(SparkListenerJobStart(
+        jobId = jobId,
+        time = System.currentTimeMillis(),
+        stageInfos = stages,
+        createProperties(executionId)))
+      stages.foreach { s =>
+        listener.onStageSubmitted(SparkListenerStageSubmitted(s))
+        listener.onStageCompleted(SparkListenerStageCompleted(s))
+      }
+      listener.onJobEnd(SparkListenerJobEnd(
+        jobId = jobId,
+        time = System.currentTimeMillis(),
+        JobSucceeded
+      ))
+    }
+    // submit two jobs with the same executionId
+    twoStageJob(0)
+    twoStageJob(1)
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+
+    assertJobs(statusStore.execution(0), completed = 0 to 1)
+    assert(statusStore.execution(0).get.stages === (0 to 3).toSet)
+  }
+
   test("SPARK-11126: no memory leak when running non SQL jobs") {
     val listener = spark.sharedState.statusStore.listener.get
     // At the beginning of this test case, there should be no live data in the 
listener.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to