This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ff26b602c92b [SPARK-45798][CONNECT] Followup: add serverSessionId to SessionHolderInfo ff26b602c92b is described below commit ff26b602c92b920867fadf90eb6157ea291e14b8 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Wed Dec 13 12:16:21 2023 -0800 [SPARK-45798][CONNECT] Followup: add serverSessionId to SessionHolderInfo ### What changes were proposed in this pull request? Small followup to https://github.com/apache/spark/pull/43664 - add serverSessionId to SessionHolderInfo. ### Why are the changes needed? SessionHolderInfo should contain this kind of information about the session. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? NA ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44334 from juliuszsompolski/SPARK-45798-fup. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/sql/connect/service/SessionHolder.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 0fdf55ff42a0..f097f2db5889 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.connect.planner.PythonStreamingQueryListener import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper import org.apache.spark.sql.connect.service.SessionHolder.{ERROR_CACHE_SIZE, ERROR_CACHE_TIMEOUT_SEC} import org.apache.spark.sql.streaming.StreamingQueryListener -import org.apache.spark.util.SystemClock +import org.apache.spark.util.{SystemClock, Utils} // Unique key identifying session by combination of user, and session id case class SessionKey(userId: String, sessionId: String) @@ -95,8 +95,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio // Returns the server side session ID and asserts that it must be different from the client-side // session ID. def serverSessionId: String = { - assert(session.sessionUUID != sessionId) - session.sessionUUID + if (Utils.isTesting && session == null) { + // Testing-only: Some sessions created by SessionHolder.forTesting are not fully initialized + // and don't have an underlying SparkSession. + "" + } else { + assert(session.sessionUUID != sessionId) + session.sessionUUID + } } /** @@ -238,7 +244,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio logInfo(s"Closing session with userId: $userId and sessionId: $sessionId") closedTimeMs = Some(System.currentTimeMillis()) - if (eventManager.status == SessionStatus.Pending) { + if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { // Testing-only: Some sessions created by SessionHolder.forTesting are not fully initialized // and can't be closed. return @@ -288,6 +294,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio SessionHolderInfo( userId = userId, sessionId = sessionId, + serverSessionId = serverSessionId, status = eventManager.status, startTimeMs = startTimeMs, lastAccessTimeMs = lastAccessTimeMs, @@ -390,6 +397,7 @@ object SessionHolder { case class SessionHolderInfo( userId: String, sessionId: String, + serverSessionId: String, status: SessionStatus, customInactiveTimeoutMs: Option[Long], startTimeMs: Long, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org