This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3df41f5f211e [SPARK-55275] SQL State Coverage: IllegalStateException
3df41f5f211e is described below
commit 3df41f5f211e495103a1b934872a25683637944f
Author: Garland Zhang <[email protected]>
AuthorDate: Tue Mar 3 09:30:52 2026 -0400
[SPARK-55275] SQL State Coverage: IllegalStateException
### What changes were proposed in this pull request?
Update IllegalStateException => SparkIllegalStateException in spark connect
layer
### Why are the changes needed?
This keeps the Spark Connect layer more traceable for errors
### Does this PR introduce _any_ user-facing change?
Yes. Changes exception type to their Spark equivalent
### How was this patch tested?
Updated testing
### Was this patch authored or co-authored using generative AI tooling?
Yes
Closes #54056 from garlandz-db/SPARK-55275.
Authored-by: Garland Zhang <[email protected]>
Signed-off-by: Herman van Hövell <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 78 ++++++++
.../src/main/resources/error/error-states.json | 6 +
.../spark/sql/connect/IllegalStateErrors.scala | 123 +++++++++++++
.../execution/ExecuteResponseObserver.scala | 12 +-
.../connect/pipelines/PipelineEventSender.scala | 4 +-
.../planner/StreamingForeachBatchHelper.scala | 11 +-
.../planner/StreamingQueryListenerHelper.scala | 8 +-
.../sql/connect/service/ExecuteEventsManager.scala | 18 +-
.../sql/connect/service/SessionEventsManager.scala | 10 +-
.../spark/sql/connect/service/SessionHolder.scala | 9 +-
.../service/SparkConnectExecutionManager.scala | 3 +-
.../sql/connect/service/SparkConnectService.scala | 4 +-
.../sql/connect/IllegalStateErrorsSuite.scala | 203 +++++++++++++++++++++
.../pipelines/PipelineEventSenderSuite.scala | 4 +-
14 files changed, 456 insertions(+), 37 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 63f57f0315c0..1c1f0a9eedf5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5516,6 +5516,84 @@
],
"sqlState" : "42601"
},
+ "SPARK_CONNECT_ILLEGAL_STATE" : {
+ "message" : [
+ "Spark Connect encountered an illegal state condition."
+ ],
+ "subClass" : {
+ "DATA_INTEGRITY_CURSOR_OUT_OF_BOUNDS" : {
+ "message" : [
+ "Cursor out of bounds: <cursor> exceeds batch size <batchSize>."
+ ]
+ },
+ "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS" : {
+ "message" : [
+ "ExecuteHolder with opId=<operationId> already exists!"
+ ]
+ },
+ "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH" : {
+ "message" : [
+ "Pipeline execution for graph ID <graphId> already exists. Stop the
existing execution before starting a new one."
+ ]
+ },
+ "EXECUTION_STATE_OPERATION_ORPHANED" : {
+ "message" : [
+ "Operation <key> has been orphaned."
+ ]
+ },
+ "SESSION_MANAGEMENT_SERVICE_NOT_STARTED" : {
+ "message" : [
+ "Attempting to stop the Spark Connect service that has not been
started."
+ ]
+ },
+ "SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED" : {
+ "message" : [
+ "Session <key> is already closed."
+ ]
+ },
+ "STATE_CONSISTENCY_CLEANER_ALREADY_SET" : {
+ "message" : [
+ "Cleaner for query <queryKey> has already been set for session
<key>."
+ ]
+ },
+
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH"
: {
+ "message" : [
+ "operationId: <operationId> with status <currentStatus> is not
within statuses <validStatuses> for event <eventStatus>"
+ ]
+ },
+
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED" : {
+ "message" : [
+ "sessionId: <sessionId> with status <sessionStatus> is not Started
for event <eventStatus>"
+ ]
+ },
+ "STATE_CONSISTENCY_NO_BATCHES_AVAILABLE" : {
+ "message" : [
+ "No batches available. Invalid response: <response>."
+ ]
+ },
+ "STATE_CONSISTENCY_SESSION_STATE_TRANSITION_INVALID" : {
+ "message" : [
+ "sessionId: <sessionId> with status <fromState> is not within
statuses <validStates> for event <toState>"
+ ]
+ },
+ "STREAMING_QUERY_UNEXPECTED_RETURN_VALUE" : {
+ "message" : [
+ "Unexpected return value <value> from Python worker in <context> for
session <key>."
+ ]
+ },
+ "STREAM_LIFECYCLE_ALREADY_COMPLETED" : {
+ "message" : [
+ "Stream <operation> can't be called after stream completed"
+ ]
+ },
+ "STREAM_LIFECYCLE_EVENT_SEND_AFTER_SHUTDOWN" : {
+ "message" : [
+ "Cannot send event after shutdown for session <key>."
+ ]
+ }
+ },
+ "sqlState" : "XXSC0"
+ },
"SPARK_JOB_CANCELLED" : {
"message" : [
"Job <jobId> cancelled <reason>"
diff --git a/common/utils/src/main/resources/error/error-states.json
b/common/utils/src/main/resources/error/error-states.json
index 7b3050bd2266..c2b2bb2ed463 100644
--- a/common/utils/src/main/resources/error/error-states.json
+++ b/common/utils/src/main/resources/error/error-states.json
@@ -7530,6 +7530,12 @@
"standard": "N",
"usedBy": ["Spark"]
},
+ "XXSC0": {
+ "description": "Connect Server - Illegal State",
+ "origin": "Spark",
+ "standard": "N",
+ "usedBy": ["Spark"]
+ },
"XXKD0": {
"description": "Analysis - Bad plan",
"origin": "Databricks",
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
new file mode 100644
index 000000000000..475b703c8292
--- /dev/null
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.spark.SparkIllegalStateException
+import org.apache.spark.sql.connect.service.{ExecuteStatus, SessionStatus}
+
+object IllegalStateErrors {
+
+ def streamLifecycleAlreadyCompleted(operation: String):
SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.STREAM_LIFECYCLE_ALREADY_COMPLETED",
+ messageParameters = Map("operation" -> operation))
+
+ def cursorOutOfBounds(cursor: Long, batchSize: Long):
SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.DATA_INTEGRITY_CURSOR_OUT_OF_BOUNDS",
+ messageParameters = Map("cursor" -> cursor.toString, "batchSize" ->
batchSize.toString))
+
+ def executionStateTransitionInvalidOperationStatus(
+ operationId: String,
+ currentStatus: ExecuteStatus,
+ validStatuses: List[ExecuteStatus],
+ eventStatus: ExecuteStatus): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
+
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH",
+ messageParameters = Map(
+ "operationId" -> operationId,
+ "currentStatus" -> currentStatus.toString,
+ "validStatuses" -> validStatuses.map(_.toString).mkString(", "),
+ "eventStatus" -> eventStatus.toString))
+
+ def executionStateTransitionInvalidSessionNotStarted(
+ sessionId: String,
+ sessionStatus: SessionStatus,
+ eventStatus: ExecuteStatus): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
+
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED",
+ messageParameters = Map(
+ "sessionId" -> sessionId,
+ "sessionStatus" -> sessionStatus.toString,
+ "eventStatus" -> eventStatus.toString))
+
+ def executeHolderAlreadyExists(operationId: String):
SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS",
+ messageParameters = Map("operationId" -> operationId))
+
+ def executeHolderAlreadyExistsGraphId(graphId: String):
SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
+ "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH",
+ messageParameters = Map("graphId" -> graphId))
+
+ def sessionAlreadyClosed(sessionKey: String): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED",
+ messageParameters = Map("key" -> sessionKey))
+
+ def operationOrphaned(executeKey: String): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.EXECUTION_STATE_OPERATION_ORPHANED",
+ messageParameters = Map("key" -> executeKey))
+
+ def sessionStateTransitionInvalid(
+ sessionId: String,
+ fromState: SessionStatus,
+ toState: SessionStatus,
+ validStates: List[SessionStatus]): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
+
"SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_SESSION_STATE_TRANSITION_INVALID",
+ messageParameters = Map(
+ "sessionId" -> sessionId,
+ "fromState" -> fromState.toString,
+ "toState" -> toState.toString,
+ "validStates" -> validStates.map(_.toString).mkString(", ")))
+
+ def serviceNotStarted(): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.SESSION_MANAGEMENT_SERVICE_NOT_STARTED",
+ messageParameters = Map.empty)
+
+ def streamingQueryUnexpectedReturnValue(
+ key: String,
+ value: Int,
+ context: String): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.STREAMING_QUERY_UNEXPECTED_RETURN_VALUE",
+ messageParameters = Map("key" -> key, "value" -> value.toString,
"context" -> context))
+
+ def cleanerAlreadySet(key: String, queryKey: String):
SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_CLEANER_ALREADY_SET",
+ messageParameters = Map("key" -> key, "queryKey" -> queryKey))
+
+ def eventSendAfterShutdown(key: String): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.STREAM_LIFECYCLE_EVENT_SEND_AFTER_SHUTDOWN",
+ messageParameters = Map("key" -> key))
+
+ def noBatchesAvailable(response: String): SparkIllegalStateException =
+ new SparkIllegalStateException(
+ errorClass =
"SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_NO_BATCHES_AVAILABLE",
+ messageParameters = Map("response" -> response))
+}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 2473df0e53f1..9984b7c2ea5a 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys
+import org.apache.spark.sql.connect.IllegalStateErrors
import
org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE
import org.apache.spark.sql.connect.service.ExecuteHolder
@@ -133,7 +134,7 @@ private[connect] class ExecuteResponseObserver[T <:
Message](val executeHolder:
def onNext(r: T): Unit = {
if (!tryOnNext(r)) {
- throw new IllegalStateException("Stream onNext can't be called after
stream completed")
+ throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onNext")
}
}
@@ -142,14 +143,14 @@ private[connect] class ExecuteResponseObserver[T <:
Message](val executeHolder:
*/
def onNextComplete(r: T): Unit = responseLock.synchronized {
if (!tryOnNext(r)) {
- throw new IllegalStateException("Stream onNext can't be called after
stream completed")
+ throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onNext")
}
onCompleted()
}
def onError(t: Throwable): Unit = responseLock.synchronized {
if (finalProducedIndex.nonEmpty) {
- throw new IllegalStateException("Stream onError can't be called after
stream completed")
+ throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onError")
}
error = Some(t)
finalProducedIndex = Some(lastProducedIndex) // no responses to be send
after error.
@@ -161,7 +162,7 @@ private[connect] class ExecuteResponseObserver[T <:
Message](val executeHolder:
def onCompleted(): Unit = responseLock.synchronized {
if (finalProducedIndex.nonEmpty) {
- throw new IllegalStateException("Stream onCompleted can't be called
after stream completed")
+ throw IllegalStateErrors.streamLifecycleAlreadyCompleted("onCompleted")
}
finalProducedIndex = Some(lastProducedIndex)
logDebug(
@@ -203,8 +204,7 @@ private[connect] class ExecuteResponseObserver[T <:
Message](val executeHolder:
messageParameters = Map("index" -> index.toString, "responseId" ->
responseId))
} else if (getLastResponseIndex().exists(index > _)) {
// If index > lastIndex, it's out of bounds. This is an internal error.
- throw new IllegalStateException(
- s"Cursor position $index is beyond last index
${getLastResponseIndex()}.")
+ throw IllegalStateErrors.cursorOutOfBounds(index,
getLastResponseIndex().get)
}
ret
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
index 5ea8c6f70312..79dd07b6a658 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
@@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.pipelines.common.FlowStatus
@@ -86,8 +87,7 @@ class PipelineEventSender(
})
}
} else {
- throw new IllegalStateException(
- s"Cannot send event after shutdown for session
${sessionHolder.sessionId}")
+ throw
IllegalStateErrors.eventSendAfterShutdown(sessionHolder.key.toString)
}
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
index 72662d2cb048..97da96894e2a 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, PYTHON_EXEC, QUERY_ID,
RUN_ID_STRING, SESSION_ID, USER_ID}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder,
AgnosticEncoders}
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.sql.connect.common.ForeachWriterPacket
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.service.SessionHolder
@@ -184,10 +185,10 @@ object StreamingForeachBatchHelper extends Logging {
errorClass = "PYTHON_EXCEPTION",
messageParameters = Map("msg" -> msg, "traceback" -> traceback))
case otherValue =>
- throw new IllegalStateException(
- s"[session: ${sessionHolder.sessionId}] [userId:
${sessionHolder.userId}] " +
- s"Unexpected return value $otherValue from the " +
- s"Python worker.")
+ throw IllegalStateErrors.streamingQueryUnexpectedReturnValue(
+ sessionHolder.key.toString,
+ otherValue,
+ "foreachBatch function")
}
} catch {
// TODO: Better handling (e.g. retries) on exceptions like
EOFException to avoid
@@ -233,7 +234,7 @@ object StreamingForeachBatchHelper extends Logging {
Option(cleanerCache.putIfAbsent(key, cleaner)) match {
case Some(_) =>
- throw new IllegalStateException(s"Unexpected: a cleaner for query
$key is already set")
+ throw
IllegalStateErrors.cleanerAlreadySet(sessionHolder.key.toString, key.toString)
case None => // Inserted. Normal.
}
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
index faab81778482..1a48cd094cbd 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils,
SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.FUNCTION_NAME
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.service.{SessionHolder,
SparkConnectService}
import org.apache.spark.sql.streaming.StreamingQueryListener
@@ -98,9 +99,10 @@ class PythonStreamingQueryListener(listener:
SimplePythonFunction, sessionHolder
errorClass = "PYTHON_EXCEPTION",
messageParameters = Map("msg" -> msg, "traceback" -> traceback))
case otherValue =>
- throw new IllegalStateException(
- s"Unexpected return value $otherValue from the " +
- s"Python worker.")
+ throw IllegalStateErrors.streamingQueryUnexpectedReturnValue(
+ sessionHolder.key.toString,
+ otherValue,
+ s"streaming query listener function $functionName")
}
} catch {
case eof: EOFException =>
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index 351be8875ba1..fcf01d5d29ab 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -24,6 +24,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.sql.catalyst.{QueryPlanningTracker,
QueryPlanningTrackerCallback}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.util.{Clock, Utils}
@@ -350,16 +351,17 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
validStatuses: List[ExecuteStatus],
eventStatus: ExecuteStatus): Unit = {
if (validStatuses.find(s => s == status).isEmpty) {
- throw new IllegalStateException(s"""
- operationId: $operationId with status ${status}
- is not within statuses $validStatuses for event $eventStatus
- """)
+ throw IllegalStateErrors.executionStateTransitionInvalidOperationStatus(
+ executeHolder.operationId,
+ status,
+ validStatuses,
+ eventStatus)
}
if (sessionHolder.eventManager.status != SessionStatus.Started) {
- throw new IllegalStateException(s"""
- sessionId: $sessionId with status $sessionStatus
- is not Started for event $eventStatus
- """)
+ throw
IllegalStateErrors.executionStateTransitionInvalidSessionNotStarted(
+ sessionHolder.sessionId,
+ sessionStatus,
+ eventStatus)
}
_status = eventStatus
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
index 0a466594fad7..20abc2fbf335 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connect.service
import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.util.{Clock}
sealed abstract class SessionStatus(value: Int)
@@ -82,10 +83,11 @@ case class SessionEventsManager(sessionHolder:
SessionHolder, clock: Clock) {
validStatuses: List[SessionStatus],
eventStatus: SessionStatus): Unit = {
if (validStatuses.find(s => s == status).isEmpty) {
- throw new IllegalStateException(s"""
- sessionId: $sessionId with status ${status}
- is not within statuses $validStatuses for event $eventStatus
- """)
+ throw IllegalStateErrors.sessionStateTransitionInvalid(
+ sessionHolder.sessionId,
+ status,
+ eventStatus,
+ validStatuses)
}
_status = eventStatus
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 912543ac13dd..e5f9b7fe85f1 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -35,6 +35,7 @@ import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.classic.SparkSession
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.ml.MLCache
@@ -175,7 +176,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
activeOperationIds.synchronized {
if (activeOperationIds.contains(operationId)) {
- throw new IllegalStateException(s"ExecuteHolder with
opId=${operationId} already exists!")
+ throw IllegalStateErrors.executeHolderAlreadyExists(operationId)
}
activeOperationIds.add(operationId)
}
@@ -368,7 +369,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
// called only once, since removing the session from
SparkConnectSessionManager.sessionStore is
// synchronized and guaranteed to happen only once.
if (closedTimeMs.isDefined) {
- throw new IllegalStateException(s"Session $key is already closed.")
+ throw IllegalStateErrors.sessionAlreadyClosed(key.toString)
}
logInfo(
log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
@@ -532,9 +533,7 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
graphId,
(_, existing) => {
if (Option(existing).isDefined) {
- throw new IllegalStateException(
- s"Pipeline execution for graph ID $graphId already exists. " +
- s"Stop the existing execution before starting a new one.")
+ throw IllegalStateErrors.executeHolderAlreadyExistsGraphId(graphId)
}
pipelineUpdateContext
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 768c6a858188..bb51438ce90f 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
+import org.apache.spark.sql.connect.IllegalStateErrors
import
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT,
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
import org.apache.spark.util.ThreadUtils
@@ -225,7 +226,7 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
} else if (executeHolder.isOrphan()) {
logWarning(log"Reattach to an orphan operation.")
removeExecuteHolder(executeHolder.key)
- throw new IllegalStateException("Operation was orphaned because of an
internal error.")
+ throw IllegalStateErrors.operationOrphaned(executeHolder.key.toString)
}
val responseSender =
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index c14c21bd6ccb..f2dd9c1b1cef 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -40,6 +40,7 @@ import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerEvent}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.classic.ClassicConversions._
+import org.apache.spark.sql.connect.IllegalStateErrors
import org.apache.spark.sql.connect.config.Connect.{getAuthenticateToken,
CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT,
CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE,
CONNECT_GRPC_PORT_MAX_RETRIES}
import org.apache.spark.sql.connect.execution.ConnectProgressExecutionListener
import org.apache.spark.sql.connect.ui.{SparkConnectServerAppStatusStore,
SparkConnectServerListener, SparkConnectServerTab}
@@ -468,8 +469,7 @@ object SparkConnectService extends Logging {
}
if (!started) {
- throw new IllegalStateException(
- "Attempting to stop the Spark Connect service that has not been
started.")
+ throw IllegalStateErrors.serviceNotStarted()
}
if (server != null) {
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/IllegalStateErrorsSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/IllegalStateErrorsSuite.scala
new file mode 100644
index 000000000000..2708a0a3fe2b
--- /dev/null
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/IllegalStateErrorsSuite.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.connect.service.{ExecuteStatus, SessionStatus}
+
+class IllegalStateErrorsSuite extends SparkFunSuite {
+
+ test("streamLifecycleAlreadyCompleted should construct error correctly") {
+ val operation = "next"
+ val error = IllegalStateErrors.streamLifecycleAlreadyCompleted(operation)
+ assert(error.isInstanceOf[org.apache.spark.SparkIllegalStateException])
+ assert(error.getCondition.contains("STREAM_LIFECYCLE_ALREADY_COMPLETED"))
+ assert(error.getMessage.contains(operation))
+ }
+
+ test("cursorOutOfBounds should construct error correctly") {
+ val cursor = 15L
+ val batchSize = 10L
+ val error = IllegalStateErrors.cursorOutOfBounds(cursor, batchSize)
+ assert(error.getCondition.contains("DATA_INTEGRITY_CURSOR_OUT_OF_BOUNDS"))
+ assert(error.getMessage.contains(cursor.toString))
+ assert(error.getMessage.contains(batchSize.toString))
+ }
+
+ test("executionStateTransitionInvalidOperationStatus should construct error
correctly") {
+ val operationId = "op-123"
+ val currentStatus = ExecuteStatus.Pending
+ val validStatuses = List(ExecuteStatus.Started, ExecuteStatus.Finished)
+ val eventStatus = ExecuteStatus.Analyzed
+ val error =
IllegalStateErrors.executionStateTransitionInvalidOperationStatus(
+ operationId,
+ currentStatus,
+ validStatuses,
+ eventStatus)
+ val expectedCondition = "SPARK_CONNECT_ILLEGAL_STATE." +
+
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH"
+ assert(error.getCondition.contains(expectedCondition))
+ assert(error.getMessage.contains(operationId))
+ assert(error.getMessage.contains(currentStatus.toString))
+ }
+
+ test("executionStateTransitionInvalidSessionNotStarted should construct
error correctly") {
+ val sessionId = "session-456"
+ val sessionStatus = SessionStatus.Pending
+ val eventStatus = ExecuteStatus.Started
+ val error =
IllegalStateErrors.executionStateTransitionInvalidSessionNotStarted(
+ sessionId,
+ sessionStatus,
+ eventStatus)
+ val expectedCondition = "SPARK_CONNECT_ILLEGAL_STATE." +
+
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED"
+ assert(error.getCondition.contains(expectedCondition))
+ assert(error.getMessage.contains(sessionId))
+ assert(error.getMessage.contains(sessionStatus.toString))
+ }
+
+ test("executeHolderAlreadyExists should construct error correctly") {
+ val operationId = "op-789"
+ val error = IllegalStateErrors.executeHolderAlreadyExists(operationId)
+
assert(error.getCondition.contains("EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS"))
+ assert(error.getMessage.contains(operationId))
+ }
+
+ test("executeHolderAlreadyExistsGraphId should construct error correctly") {
+ val graphId = "graph-123"
+ val error = IllegalStateErrors.executeHolderAlreadyExistsGraphId(graphId)
+ val expectedCondition =
+ "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH"
+ assert(error.getCondition.contains(expectedCondition))
+ assert(error.getMessage.contains(graphId))
+ }
+
+ test("sessionAlreadyClosed should construct error correctly") {
+ val sessionKey = "session-key-456"
+ val error = IllegalStateErrors.sessionAlreadyClosed(sessionKey)
+
assert(error.getCondition.contains("SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED"))
+ assert(error.getMessage.contains(sessionKey))
+ }
+
+ test("operationOrphaned should construct error correctly") {
+ val executeKey = "execute-key-789"
+ val error = IllegalStateErrors.operationOrphaned(executeKey)
+ assert(error.getCondition.contains("EXECUTION_STATE_OPERATION_ORPHANED"))
+ assert(error.getMessage.contains(executeKey))
+ }
+
+ test("sessionStateTransitionInvalid should construct error correctly") {
+ val sessionId = "session-111"
+ val fromState = SessionStatus.Started
+ val toState = SessionStatus.Closed
+ val validStates = List(SessionStatus.Pending, SessionStatus.Started)
+ val error =
+ IllegalStateErrors.sessionStateTransitionInvalid(sessionId, fromState,
toState, validStates)
+ val expectedCondition =
+ "STATE_CONSISTENCY_SESSION_STATE_TRANSITION_INVALID"
+ assert(error.getCondition.contains(expectedCondition))
+ assert(error.getMessage.contains(sessionId))
+ assert(error.getMessage.contains(fromState.toString))
+ assert(error.getMessage.contains(toState.toString))
+ }
+
+ test("serviceNotStarted should construct error correctly") {
+ val error = IllegalStateErrors.serviceNotStarted()
+
assert(error.getCondition.contains("SESSION_MANAGEMENT_SERVICE_NOT_STARTED"))
+ }
+
+ test("streamingQueryUnexpectedReturnValue should construct error correctly")
{
+ val key = "query-key-123"
+ val value = 42
+ val context = "test-context"
+ val error = IllegalStateErrors.streamingQueryUnexpectedReturnValue(key,
value, context)
+
assert(error.getCondition.contains("STREAMING_QUERY_UNEXPECTED_RETURN_VALUE"))
+ assert(error.getMessage.contains(key))
+ assert(error.getMessage.contains(value.toString))
+ assert(error.getMessage.contains(context))
+ }
+
+ test("cleanerAlreadySet should construct error correctly") {
+ val key = "session-key-222"
+ val queryKey = "query-key-333"
+ val error = IllegalStateErrors.cleanerAlreadySet(key, queryKey)
+
assert(error.getCondition.contains("STATE_CONSISTENCY_CLEANER_ALREADY_SET"))
+ assert(error.getMessage.contains(key))
+ assert(error.getMessage.contains(queryKey))
+ }
+
+ test("eventSendAfterShutdown should construct error correctly") {
+ val key = "session-key-444"
+ val error = IllegalStateErrors.eventSendAfterShutdown(key)
+
assert(error.getCondition.contains("STREAM_LIFECYCLE_EVENT_SEND_AFTER_SHUTDOWN"))
+ assert(error.getMessage.contains(key))
+ }
+
+ test("noBatchesAvailable should construct error correctly") {
+ val response = "empty-response"
+ val error = IllegalStateErrors.noBatchesAvailable(response)
+
assert(error.getCondition.contains("STATE_CONSISTENCY_NO_BATCHES_AVAILABLE"))
+ assert(error.getMessage.contains(response))
+ }
+
+ test("error messages should handle special characters correctly") {
+ val operation = "operation with spaces and special chars: <>&\""
+ val error = IllegalStateErrors.streamLifecycleAlreadyCompleted(operation)
+ assert(error.getMessage.contains(operation))
+ }
+
+ test("error messages should handle empty strings") {
+ val emptyString = ""
+ val error = IllegalStateErrors.streamLifecycleAlreadyCompleted(emptyString)
+ assert(error.isInstanceOf[org.apache.spark.SparkIllegalStateException])
+ }
+
+ test("all errors should be SparkIllegalStateException instances") {
+ val errors = Seq(
+ IllegalStateErrors.streamLifecycleAlreadyCompleted("op"),
+ IllegalStateErrors.cursorOutOfBounds(1L, 2L),
+ IllegalStateErrors.executionStateTransitionInvalidOperationStatus(
+ "op",
+ ExecuteStatus.Pending,
+ List(ExecuteStatus.Started),
+ ExecuteStatus.Analyzed),
+ IllegalStateErrors.executionStateTransitionInvalidSessionNotStarted(
+ "session",
+ SessionStatus.Pending,
+ ExecuteStatus.Started),
+ IllegalStateErrors.executeHolderAlreadyExists("op"),
+ IllegalStateErrors.executeHolderAlreadyExistsGraphId("graph"),
+ IllegalStateErrors.sessionAlreadyClosed("key"),
+ IllegalStateErrors.operationOrphaned("key"),
+ IllegalStateErrors.sessionStateTransitionInvalid(
+ "session",
+ SessionStatus.Started,
+ SessionStatus.Closed,
+ List(SessionStatus.Pending)),
+ IllegalStateErrors.serviceNotStarted(),
+ IllegalStateErrors.streamingQueryUnexpectedReturnValue("key", 123,
"context"),
+ IllegalStateErrors.cleanerAlreadySet("key", "queryKey"),
+ IllegalStateErrors.eventSendAfterShutdown("key"),
+ IllegalStateErrors.noBatchesAvailable("response"))
+
+ errors.foreach { error =>
+ assert(error.isInstanceOf[org.apache.spark.SparkIllegalStateException])
+ assert(error.getCondition.contains("SPARK_CONNECT_ILLEGAL_STATE"))
+ }
+ }
+}
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
index fc3cce0f7459..d6942132b908 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSenderSuite.scala
@@ -26,7 +26,7 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.sql.classic.{RuntimeConfig, SparkSession}
-import org.apache.spark.sql.connect.service.SessionHolder
+import org.apache.spark.sql.connect.service.{SessionHolder, SessionKey}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.pipelines.common.FlowStatus
import org.apache.spark.sql.pipelines.common.RunState.{COMPLETED, RUNNING}
@@ -39,7 +39,9 @@ class PipelineEventSenderSuite extends
SparkDeclarativePipelinesServerTest with
val mockObserver = mock[StreamObserver[ExecutePlanResponse]]
val mockSessionHolder = mock[SessionHolder]
when(mockSessionHolder.sessionId).thenReturn("test-session-id")
+ when(mockSessionHolder.userId).thenReturn("test-user-id")
when(mockSessionHolder.serverSessionId).thenReturn("test-server-session-id")
+ when(mockSessionHolder.key).thenReturn(SessionKey("test-user-id",
"test-session-id"))
val mockSession = mock[SparkSession]
val mockConf = mock[RuntimeConfig]
when(mockSessionHolder.session).thenReturn(mockSession)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]