Repository: spark Updated Branches: refs/heads/master fe65361b0 -> 7beb375bf
[SPARK-22861][SQL] SQLAppStatusListener handles multi-job executions. When one execution has multiple jobs, we need to append to the set of stages, not replace them on every job. Added unit test and ran existing tests on jenkins Author: Imran Rashid <[email protected]> Closes #20047 from squito/SPARK-22861. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7beb375b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7beb375b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7beb375b Branch: refs/heads/master Commit: 7beb375bf4e8400f830a7fc7ff414634dd6efc78 Parents: fe65361 Author: Imran Rashid <[email protected]> Authored: Thu Dec 21 15:37:55 2017 -0800 Committer: Marcelo Vanzin <[email protected]> Committed: Thu Dec 21 15:37:55 2017 -0800 ---------------------------------------------------------------------- .../sql/execution/ui/SQLAppStatusListener.scala | 2 +- .../ui/SQLAppStatusListenerSuite.scala | 43 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7beb375b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index aa78fa0..2295b8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -87,7 +87,7 @@ class SQLAppStatusListener( } exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) - exec.stages = event.stageIds.toSet + exec.stages ++= event.stageIds.toSet update(exec) } http://git-wip-us.apache.org/repos/asf/spark/blob/7beb375b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 5ebbeb4..7d84f45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -383,6 +383,49 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with assertJobs(statusStore.execution(executionId), failed = Seq(0)) } + test("handle one execution with multiple jobs") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + + var stageId = 0 + def twoStageJob(jobId: Int): Unit = { + val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)} + stageId += 2 + listener.onJobStart(SparkListenerJobStart( + jobId = jobId, + time = System.currentTimeMillis(), + stageInfos = stages, + createProperties(executionId))) + stages.foreach { s => + listener.onStageSubmitted(SparkListenerStageSubmitted(s)) + listener.onStageCompleted(SparkListenerStageCompleted(s)) + } + listener.onJobEnd(SparkListenerJobEnd( + jobId = jobId, + time = System.currentTimeMillis(), + JobSucceeded + )) + } + // submit two jobs with the same executionId + twoStageJob(0) + twoStageJob(1) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + + assertJobs(statusStore.execution(0), completed = 0 to 1) + assert(statusStore.execution(0).get.stages === (0 to 3).toSet) + } + test("SPARK-11126: no memory leak when running non SQL jobs") { val listener = spark.sharedState.statusStore.listener.get // At the beginning of this test case, there should be no live data in the listener. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
