This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a40e46fe6dc [SPARK-44591][CONNECT][WEBUI] Use jobTags in SparkListenerSQLExecutionStart to link SQL Execution IDs for Spark UI Connect page a40e46fe6dc is described below commit a40e46fe6dc35226b27335bb1431583f455f1e58 Author: Jason Li <jason...@databricks.com> AuthorDate: Tue Aug 1 00:15:09 2023 -0700 [SPARK-44591][CONNECT][WEBUI] Use jobTags in SparkListenerSQLExecutionStart to link SQL Execution IDs for Spark UI Connect page ### What changes were proposed in this pull request? Use jobTags in SparkListenerSQLExecutionStart to get the corresponding SQL Execution for Spark Connect request rather than SparkListenerJobStart.props.getProperty(SQLExecution.EXECUTION_ID_KEY), which won't work when the SQL Execution does not trigger a job. ### Why are the changes needed? This change handles cases where a SQL Execution doesn't trigger a job and we can't retrieve the SQL Execution ID from SparkListenerJobStart ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Update unit test + local manual testing Closes #42244 from jasonli-db/spark-connect-ui-sql-start. Authored-by: Jason Li <jason...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../connect/ui/SparkConnectServerListener.scala | 49 ++++++++++++-- .../ui/SparkConnectServerListenerSuite.scala | 74 ++++++++++++---------- 2 files changed, 83 insertions(+), 40 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala index b40e847f404..90f9afebcb6 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} import org.apache.spark.sql.connect.service._ -import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} private[connect] class SparkConnectServerListener( @@ -80,12 +80,9 @@ private[connect] class SparkConnectServerListener( } val executeJobTag = executeJobTagOpt.get val exec = executionList.get(executeJobTag) - val executionIdOpt: Option[String] = Option(jobStart.properties) - .flatMap { p => Option(p.getProperty(SQLExecution.EXECUTION_ID_KEY)) } if (exec.nonEmpty) { exec.foreach { exec => exec.jobId += jobStart.jobId.toString - executionIdOpt.foreach { execId => exec.sqlExecId += execId } updateLiveStore(exec) } } else { @@ -105,8 +102,8 @@ private[connect] class SparkConnectServerListener( exec.userId, exec.operationId, exec.sparkSessionTags) + liveExec.sqlExecId = exec.sqlExecId liveExec.jobId += jobStart.jobId.toString - executionIdOpt.foreach { execId => exec.sqlExecId += execId } updateStoreWithTriggerEnabled(liveExec) executionList.remove(liveExec.jobTag) } @@ -115,6 +112,7 @@ private[connect] class SparkConnectServerListener( override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { + case e: SparkListenerSQLExecutionStart => onSQLExecutionStart(e) case e: SparkListenerConnectOperationStarted => onOperationStarted(e) case e: SparkListenerConnectOperationAnalyzed => onOperationAnalyzed(e) case e: SparkListenerConnectOperationReadyForExecution => onOperationReadyForExecution(e) @@ -128,6 +126,45 @@ private[connect] class SparkConnectServerListener( } } + def onSQLExecutionStart(e: SparkListenerSQLExecutionStart): Unit = { + val executeJobTagOpt = e.jobTags.find { + case ExecuteJobTag(_) => true + case _ => false + } + if (executeJobTagOpt.isEmpty) { + return + } + val executeJobTag = executeJobTagOpt.get + val exec = executionList.get(executeJobTag) + if (exec.nonEmpty) { + exec.foreach { exec => + exec.sqlExecId += e.executionId.toString + updateLiveStore(exec) + } + } else { + // This block guards against potential event re-ordering where a SQLExecutionStart + // event is processed after a ConnectOperationClosed event, in which case the Execution + // has already been evicted from the executionList. + val storeExecInfo = + KVUtils.viewToSeq(kvstore.view(classOf[ExecutionInfo]), Int.MaxValue)(exec => + exec.jobTag == executeJobTag) + storeExecInfo.foreach { exec => + val liveExec = getOrCreateExecution( + exec.jobTag, + exec.statement, + exec.sessionId, + exec.startTimestamp, + exec.userId, + exec.operationId, + exec.sparkSessionTags) + liveExec.jobId = exec.jobId + liveExec.sqlExecId += e.executionId.toString + updateStoreWithTriggerEnabled(liveExec) + executionList.remove(liveExec.jobTag) + } + } + } + private def onOperationStarted(e: SparkListenerConnectOperationStarted) = synchronized { val executionData = getOrCreateExecution( e.jobTag, @@ -326,7 +363,7 @@ private[connect] class LiveExecutionData( var closeTimestamp: Long = 0L var detail: String = "" var state: ExecutionState.Value = ExecutionState.STARTED - val jobId: ArrayBuffer[String] = ArrayBuffer[String]() + var jobId: ArrayBuffer[String] = ArrayBuffer[String]() var sqlExecId: mutable.Set[String] = mutable.Set[String]() override protected def doUpdate(): Any = { diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala index 9292e44f177..7cdc0135201 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, LIVE_ENT import org.apache.spark.scheduler.SparkListenerJobStart import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT} import org.apache.spark.sql.connect.service._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.kvstore.InMemoryStore @@ -36,6 +37,8 @@ class SparkConnectServerListenerSuite private var kvstore: ElementTrackingStore = _ + private val jobTag = ExecuteJobTag("sessionId", "userId", "operationId") + after { if (kvstore != null) { kvstore.close() @@ -47,12 +50,11 @@ class SparkConnectServerListenerSuite test(s"listener events should store successfully (live = $live)") { val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = createAppStatusStore(live) - listener.onOtherEvent( SparkListenerConnectSessionStarted("sessionId", "user", System.currentTimeMillis())) listener.onOtherEvent( SparkListenerConnectOperationStarted( - ExecuteJobTag("sessionId", "userId", "operationId"), + jobTag, "operationId", System.currentTimeMillis(), "sessionId", @@ -62,22 +64,24 @@ class SparkConnectServerListenerSuite None, Set())) listener.onOtherEvent( - SparkListenerConnectOperationAnalyzed( - ExecuteJobTag("sessionId", "userId", "operationId"), - "operationId", - System.currentTimeMillis())) + SparkListenerConnectOperationAnalyzed(jobTag, "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerSQLExecutionStart( + 0, + None, + null, + null, + null, + null, + System.currentTimeMillis(), + null, + Set(jobTag))) listener.onJobStart( SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties)) listener.onOtherEvent( - SparkListenerConnectOperationFinished( - ExecuteJobTag("sessionId", "userId", "operationId"), - "sessionId", - System.currentTimeMillis())) + SparkListenerConnectOperationFinished(jobTag, "sessionId", System.currentTimeMillis())) listener.onOtherEvent( - SparkListenerConnectOperationClosed( - ExecuteJobTag("sessionId", "userId", "operationId"), - "sessionId", - System.currentTimeMillis())) + SparkListenerConnectOperationClosed(jobTag, "sessionId", System.currentTimeMillis())) if (live) { assert(statusStore.getOnlineSessionNum === 1) @@ -95,10 +99,11 @@ class SparkConnectServerListenerSuite val storeExecData = statusStore.getExecutionList.head - assert(storeExecData.jobTag === ExecuteJobTag("sessionId", "userId", "operationId")) + assert(storeExecData.jobTag === jobTag) assert(storeExecData.sessionId === "sessionId") assert(storeExecData.statement === "dummy query") assert(storeExecData.jobId === Seq("0")) + assert(storeExecData.sqlExecId === Set("0")) assert(listener.noLiveData()) } } @@ -132,15 +137,16 @@ class SparkConnectServerListenerSuite } } - test("update execution info when jobstart event come after execution end event") { + test( + "update execution info when event reordering causes job and sql" + + " start to come after operation closed") { val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) = createAppStatusStore(true) - listener.onOtherEvent( SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis())) listener.onOtherEvent( SparkListenerConnectOperationStarted( - ExecuteJobTag("sessionId", "userId", "operationId"), + jobTag, "operationId", System.currentTimeMillis(), "sessionId", @@ -150,21 +156,22 @@ class SparkConnectServerListenerSuite None, Set())) listener.onOtherEvent( - SparkListenerConnectOperationAnalyzed( - ExecuteJobTag("sessionId", "userId", "operationId"), - "operationId", - System.currentTimeMillis())) + SparkListenerConnectOperationAnalyzed(jobTag, "operationId", System.currentTimeMillis())) listener.onOtherEvent( - SparkListenerConnectOperationFinished( - ExecuteJobTag("sessionId", "userId", "operationId"), - "operationId", - System.currentTimeMillis())) + SparkListenerConnectOperationFinished(jobTag, "operationId", System.currentTimeMillis())) listener.onOtherEvent( - SparkListenerConnectOperationClosed( - ExecuteJobTag("sessionId", "userId", "operationId"), - "operationId", - System.currentTimeMillis())) - + SparkListenerConnectOperationClosed(jobTag, "operationId", System.currentTimeMillis())) + listener.onOtherEvent( + SparkListenerSQLExecutionStart( + 0, + None, + null, + null, + null, + null, + System.currentTimeMillis(), + null, + Set(jobTag))) listener.onJobStart( SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties)) listener.onOtherEvent( @@ -172,6 +179,7 @@ class SparkConnectServerListenerSuite val exec = statusStore.getExecution(ExecuteJobTag("sessionId", "userId", "operationId")) assert(exec.isDefined) assert(exec.get.jobId === Seq("0")) + assert(exec.get.sqlExecId === Set("0")) assert(listener.noLiveData()) } @@ -207,9 +215,7 @@ class SparkConnectServerListenerSuite private def createProperties: Properties = { val properties = new Properties() - properties.setProperty( - SparkContext.SPARK_JOB_TAGS, - ExecuteJobTag("sessionId", "userId", "operationId")) + properties.setProperty(SparkContext.SPARK_JOB_TAGS, jobTag) properties } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org