This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 59777222e72 [SPARK-45888][SS] Apply error class framework to State
(Metadata) Data Source
59777222e72 is described below
commit 59777222e726c63cbd9077a2c76f762e06f6a5b3
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Dec 6 22:38:40 2023 +0900
[SPARK-45888][SS] Apply error class framework to State (Metadata) Data
Source
### What changes were proposed in this pull request?
This PR proposes to apply error class framework to the new data source,
State (Metadata) Data Source.
### Why are the changes needed?
Error class framework is a standard to represent all exceptions in Spark.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44025 from HeartSaVioR/SPARK-45888.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
common/utils/src/main/resources/error/README.md | 1 +
.../src/main/resources/error/error-classes.json | 75 ++++++++++
...itions-stds-invalid-option-value-error-class.md | 40 ++++++
docs/sql-error-conditions.md | 60 ++++++++
.../datasources/v2/state/StateDataSource.scala | 33 +++--
.../v2/state/StateDataSourceErrors.scala | 160 +++++++++++++++++++++
.../datasources/v2/state/StateScanBuilder.scala | 3 +-
.../datasources/v2/state/StateTable.scala | 9 +-
.../StreamStreamJoinStatePartitionReader.scala | 2 +-
.../v2/state/metadata/StateMetadataSource.scala | 4 +-
.../v2/state/StateDataSourceReadSuite.scala | 33 +++--
.../state/OperatorStateMetadataSuite.scala | 6 +-
12 files changed, 389 insertions(+), 37 deletions(-)
diff --git a/common/utils/src/main/resources/error/README.md
b/common/utils/src/main/resources/error/README.md
index 556a634e992..b062c773907 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -636,6 +636,7 @@ The following SQLSTATEs are collated from:
|42613 |42 |Syntax Error or Access Rule Violation |613
|Clauses are mutually exclusive. |DB2 |N
|DB2
|
|42614 |42 |Syntax Error or Access Rule Violation |614 |A
duplicate keyword or clause is invalid. |DB2 |N
|DB2
|
|42615 |42 |Syntax Error or Access Rule Violation |615
|An invalid alternative was detected. |DB2 |N
|DB2
|
+|42616 |42 |Syntax Error or Access Rule Violation |616
|Invalid options specified |DB2 |N
|DB2
|
|42617 |42 |Syntax Error or Access Rule Violation |617
|The statement string is blank or empty. |DB2 |N
|DB2
|
|42618 |42 |Syntax Error or Access Rule Violation |618 |A
variable is not allowed. |DB2 |N
|DB2
|
|42620 |42 |Syntax Error or Access Rule Violation |620
|Read-only SCROLL was specified with the UPDATE clause. |DB2 |N
|DB2
|
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index e54d346e1bc..7a672fa5e55 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3066,6 +3066,81 @@
],
"sqlState" : "42713"
},
+ "STDS_COMMITTED_BATCH_UNAVAILABLE" : {
+ "message" : [
+ "No committed batch found, checkpoint location: <checkpointLocation>.
Ensure that the query has run and committed any microbatch before stopping."
+ ],
+ "sqlState" : "KD006"
+ },
+ "STDS_CONFLICT_OPTIONS" : {
+ "message" : [
+ "The options <options> cannot be specified together. Please specify the
one."
+ ],
+ "sqlState" : "42613"
+ },
+ "STDS_FAILED_TO_READ_STATE_SCHEMA" : {
+ "message" : [
+ "Failed to read the state schema. Either the file does not exist, or the
file is corrupted. options: <sourceOptions>.",
+ "Rerun the streaming query to construct the state schema, and report to
the corresponding communities or vendors if the error persists."
+ ],
+ "sqlState" : "42K03"
+ },
+ "STDS_INTERNAL_ERROR" : {
+ "message" : [
+ "Internal error: <message>",
+ "Please, report this bug to the corresponding communities or vendors,
and provide the full stack trace."
+ ],
+ "sqlState" : "XXKST"
+ },
+ "STDS_INVALID_OPTION_VALUE" : {
+ "message" : [
+ "Invalid value for source option '<optionName>':"
+ ],
+ "subClass" : {
+ "IS_EMPTY" : {
+ "message" : [
+ "cannot be empty."
+ ]
+ },
+ "IS_NEGATIVE" : {
+ "message" : [
+ "cannot be negative."
+ ]
+ },
+ "WITH_MESSAGE" : {
+ "message" : [
+ "<message>"
+ ]
+ }
+ },
+ "sqlState" : "42616"
+ },
+ "STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE" : {
+ "message" : [
+ "The state does not have any partition. Please double check that the
query points to the valid state. options: <sourceOptions>"
+ ],
+ "sqlState" : "KD006"
+ },
+ "STDS_OFFSET_LOG_UNAVAILABLE" : {
+ "message" : [
+ "The offset log for <batchId> does not exist, checkpoint location:
<checkpointLocation>.",
+ "Please specify the batch ID which is available for querying - you can
query the available batch IDs via using state metadata data source."
+ ],
+ "sqlState" : "KD006"
+ },
+ "STDS_OFFSET_METADATA_LOG_UNAVAILABLE" : {
+ "message" : [
+ "Metadata is not available for offset log for <batchId>, checkpoint
location: <checkpointLocation>.",
+ "The checkpoint seems to be only run with older Spark version(s). Run
the streaming query with the recent Spark version, so that Spark constructs the
state metadata."
+ ],
+ "sqlState" : "KD006"
+ },
+ "STDS_REQUIRED_OPTION_UNSPECIFIED" : {
+ "message" : [
+ "'<optionName>' must be specified."
+ ],
+ "sqlState" : "42601"
+ },
"STREAM_FAILED" : {
"message" : [
"Query [id = <id>, runId = <runId>] terminated with exception: <message>"
diff --git a/docs/sql-error-conditions-stds-invalid-option-value-error-class.md
b/docs/sql-error-conditions-stds-invalid-option-value-error-class.md
new file mode 100644
index 00000000000..ec0f15ed9f7
--- /dev/null
+++ b/docs/sql-error-conditions-stds-invalid-option-value-error-class.md
@@ -0,0 +1,40 @@
+---
+layout: global
+title: STDS_INVALID_OPTION_VALUE error class
+displayTitle: STDS_INVALID_OPTION_VALUE error class
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+[SQLSTATE:
42616](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Invalid value for source option '`<optionName>`':
+
+This error class has the following derived error classes:
+
+## IS_EMPTY
+
+cannot be empty.
+
+## IS_NEGATIVE
+
+cannot be negative.
+
+## WITH_MESSAGE
+
+`<message>`
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index c9990d3856c..d97e2ceef4c 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1930,6 +1930,66 @@ Star (*) is not allowed in a select list when GROUP BY
an ordinal position is us
Static partition column `<staticName>` is also specified in the column list.
+### STDS_COMMITTED_BATCH_UNAVAILABLE
+
+SQLSTATE: KD006
+
+No committed batch found, checkpoint location: `<checkpointLocation>`. Ensure
that the query has run and committed any microbatch before stopping.
+
+### STDS_CONFLICT_OPTIONS
+
+[SQLSTATE:
42613](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The options `<options>` cannot be specified together. Please specify the one.
+
+### STDS_FAILED_TO_READ_STATE_SCHEMA
+
+[SQLSTATE:
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to read the state schema. Either the file does not exist, or the file
is corrupted. options: `<sourceOptions>`.
+Rerun the streaming query to construct the state schema, and report to the
corresponding communities or vendors if the error persists.
+
+### STDS_INTERNAL_ERROR
+
+[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+Internal error: `<message>`
+Please, report this bug to the corresponding communities or vendors, and
provide the full stack trace.
+
+###
[STDS_INVALID_OPTION_VALUE](sql-error-conditions-stds-invalid-option-value-error-class.html)
+
+[SQLSTATE:
42616](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Invalid value for source option '`<optionName>`':
+
+For more details see
[STDS_INVALID_OPTION_VALUE](sql-error-conditions-stds-invalid-option-value-error-class.html)
+
+### STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE
+
+SQLSTATE: KD006
+
+The state does not have any partition. Please double check that the query
points to the valid state. options: `<sourceOptions>`
+
+### STDS_OFFSET_LOG_UNAVAILABLE
+
+SQLSTATE: KD006
+
+The offset log for `<batchId>` does not exist, checkpoint location:
`<checkpointLocation>`.
+Please specify the batch ID which is available for querying - you can query
the available batch IDs via using state metadata data source.
+
+### STDS_OFFSET_METADATA_LOG_UNAVAILABLE
+
+SQLSTATE: KD006
+
+Metadata is not available for offset log for `<batchId>`, checkpoint location:
`<checkpointLocation>`.
+The checkpoint seems to be only run with older Spark version(s). Run the
streaming query with the recent Spark version, so that Spark constructs the
state metadata.
+
+### STDS_REQUIRED_OPTION_UNSPECIFIED
+
+[SQLSTATE:
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+'`<optionName>`' must be specified.
+
### STREAM_FAILED
[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 55173a7e887..1192accaabe 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -85,8 +85,7 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
.add("value", valueSchema)
} catch {
case NonFatal(e) =>
- throw new IllegalArgumentException("Failed to read the state schema.
Either the file " +
- s"does not exist, or the file is corrupted. options:
$sourceOptions", e)
+ throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e)
}
}
@@ -96,8 +95,7 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
offsetLog.get(batchId) match {
case Some(value) =>
val metadata = value.metadata.getOrElse(
- throw new IllegalStateException(s"Metadata is not available for
offset log for " +
- s"$batchId, checkpoint location $checkpointLocation")
+ throw StateDataSourceErrors.offsetMetadataLogUnavailable(batchId,
checkpointLocation)
)
val clonedRuntimeConf = new
RuntimeConfig(session.sessionState.conf.clone())
@@ -105,8 +103,7 @@ class StateDataSource extends TableProvider with
DataSourceRegister {
StateStoreConf(clonedRuntimeConf.sqlConf)
case _ =>
- throw new IllegalStateException(s"The offset log for $batchId does not
exist, " +
- s"checkpoint location $checkpointLocation")
+ throw StateDataSourceErrors.offsetLogUnavailable(batchId,
checkpointLocation)
}
}
@@ -120,6 +117,11 @@ case class StateSourceOptions(
storeName: String,
joinSide: JoinSideValues) {
def stateCheckpointLocation: Path = new Path(resolvedCpLocation,
DIR_NAME_STATE)
+
+ override def toString: String = {
+ s"StateSourceOptions(checkpointLocation=$resolvedCpLocation,
batchId=$batchId, " +
+ s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+ }
}
object StateSourceOptions extends DataSourceOptions {
@@ -146,7 +148,7 @@ object StateSourceOptions extends DataSourceOptions {
hadoopConf: Configuration,
options: CaseInsensitiveStringMap): StateSourceOptions = {
val checkpointLocation = Option(options.get(PATH)).orElse {
- throw new IllegalArgumentException(s"'$PATH' must be specified.")
+ throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
}.get
val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf,
checkpointLocation)
@@ -156,14 +158,14 @@ object StateSourceOptions extends DataSourceOptions {
}.get
if (batchId < 0) {
- throw new IllegalArgumentException(s"'$BATCH_ID' cannot be negative.")
+ throw StateDataSourceErrors.invalidOptionValueIsNegative(BATCH_ID)
}
val operatorId = Option(options.get(OPERATOR_ID)).map(_.toInt)
.orElse(Some(0)).get
if (operatorId < 0) {
- throw new IllegalArgumentException(s"'$OPERATOR_ID' cannot be negative.")
+ throw StateDataSourceErrors.invalidOptionValueIsNegative(OPERATOR_ID)
}
val storeName = Option(options.get(STORE_NAME))
@@ -171,7 +173,7 @@ object StateSourceOptions extends DataSourceOptions {
.getOrElse(StateStoreId.DEFAULT_STORE_NAME)
if (storeName.isEmpty) {
- throw new IllegalArgumentException(s"'$STORE_NAME' cannot be an empty
string.")
+ throw StateDataSourceErrors.invalidOptionValueIsEmpty(STORE_NAME)
}
val joinSide = try {
@@ -179,14 +181,12 @@ object StateSourceOptions extends DataSourceOptions {
.map(JoinSideValues.withName).getOrElse(JoinSideValues.none)
} catch {
case _: NoSuchElementException =>
- // convert to IllegalArgumentException
- throw new IllegalArgumentException(s"Incorrect value of the option " +
- s"'$JOIN_SIDE'. Valid values are
${JoinSideValues.values.mkString(",")}")
+ throw StateDataSourceErrors.invalidOptionValue(JOIN_SIDE,
+ s"Valid values are ${JoinSideValues.values.mkString(",")}")
}
if (joinSide != JoinSideValues.none && storeName !=
StateStoreId.DEFAULT_STORE_NAME) {
- throw new IllegalArgumentException(s"The options '$JOIN_SIDE' and " +
- s"'$STORE_NAME' cannot be specified together. Please specify either
one.")
+ throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
}
StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName,
joinSide)
@@ -205,8 +205,7 @@ object StateSourceOptions extends DataSourceOptions {
new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
commitLog.getLatest() match {
case Some((lastId, _)) => lastId
- case None => throw new IllegalStateException("No committed batch found,
" +
- s"checkpoint location: $checkpointLocation")
+ case None => throw
StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala
new file mode 100644
index 00000000000..fe81d65c926
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.SparkRuntimeException
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown from State
Data Source.
+ * State Metadata Data Source may (re/co)use this object.
+ *
+ * ERROR_CLASS has a prefix of "STDS_" representing STateDataSource.
+ */
+object StateDataSourceErrors {
+ def internalError(message: String): StateDataSourceException = {
+ new StateDataSourceInternalError(message)
+ }
+
+ def invalidOptionValue(optionName: String, message: String):
StateDataSourceException = {
+ new StateDataSourceInvalidOptionValue(optionName, message)
+ }
+
+ def invalidOptionValueIsNegative(optionName: String):
StateDataSourceException = {
+ new StateDataSourceInvalidOptionValueIsNegative(optionName)
+ }
+
+ def invalidOptionValueIsEmpty(optionName: String): StateDataSourceException
= {
+ new StateDataSourceInvalidOptionValueIsEmpty(optionName)
+ }
+
+ def requiredOptionUnspecified(missingOptionName: String):
StateDataSourceException = {
+ new StateDataSourceUnspecifiedRequiredOption(missingOptionName)
+ }
+
+ def offsetLogUnavailable(
+ batchId: Long,
+ checkpointLocation: String): StateDataSourceException = {
+ new StateDataSourceOffsetLogUnavailable(batchId, checkpointLocation)
+ }
+
+ def offsetMetadataLogUnavailable(
+ batchId: Long,
+ checkpointLocation: String): StateDataSourceException = {
+ new StateDataSourceOffsetMetadataLogUnavailable(batchId,
checkpointLocation)
+ }
+
+ def failedToReadStateSchema(
+ sourceOptions: StateSourceOptions,
+ cause: Throwable): StateDataSourceException = {
+ new StateDataSourceReadStateSchemaFailure(sourceOptions, cause)
+ }
+
+ def conflictOptions(options: Seq[String]): StateDataSourceException = {
+ new StateDataSourceConflictOptions(options)
+ }
+
+ def committedBatchUnavailable(checkpointLocation: String):
StateDataSourceException = {
+ new StataDataSourceCommittedBatchUnavailable(checkpointLocation)
+ }
+
+ def noPartitionDiscoveredInStateStore(
+ sourceOptions: StateSourceOptions): StateDataSourceException = {
+ new StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions)
+ }
+}
+
+abstract class StateDataSourceException(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ cause: Throwable)
+ extends SparkRuntimeException(
+ errorClass,
+ messageParameters,
+ cause)
+
+class StateDataSourceInternalError(message: String, cause: Throwable = null)
+ extends StateDataSourceException(
+ "STDS_INTERNAL_ERROR",
+ Map("message" -> message),
+ cause)
+
+class StateDataSourceInvalidOptionValue(optionName: String, message: String)
+ extends StateDataSourceException(
+ "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE",
+ Map("optionName" -> optionName, "message" -> message),
+ cause = null)
+
+class StateDataSourceInvalidOptionValueIsNegative(optionName: String)
+ extends StateDataSourceException(
+ "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE",
+ Map("optionName" -> optionName),
+ cause = null)
+
+class StateDataSourceInvalidOptionValueIsEmpty(optionName: String)
+ extends StateDataSourceException(
+ "STDS_INVALID_OPTION_VALUE.IS_EMPTY",
+ Map("optionName" -> optionName),
+ cause = null)
+
+class StateDataSourceUnspecifiedRequiredOption(
+ missingOptionName: String)
+ extends StateDataSourceException(
+ "STDS_REQUIRED_OPTION_UNSPECIFIED",
+ Map("optionName" -> missingOptionName),
+ cause = null)
+
+class StateDataSourceOffsetLogUnavailable(
+ batchId: Long,
+ checkpointLocation: String)
+ extends StateDataSourceException(
+ "STDS_OFFSET_LOG_UNAVAILABLE",
+ Map("batchId" -> batchId.toString, "checkpointLocation" ->
checkpointLocation),
+ cause = null)
+
+class StateDataSourceOffsetMetadataLogUnavailable(
+ batchId: Long,
+ checkpointLocation: String)
+ extends StateDataSourceException(
+ "STDS_OFFSET_METADATA_LOG_UNAVAILABLE",
+ Map("batchId" -> batchId.toString, "checkpointLocation" ->
checkpointLocation),
+ cause = null)
+
+class StateDataSourceReadStateSchemaFailure(
+ sourceOptions: StateSourceOptions,
+ cause: Throwable)
+ extends StateDataSourceException(
+ "STDS_FAILED_TO_READ_STATE_SCHEMA",
+ Map("sourceOptions" -> sourceOptions.toString),
+ cause)
+
+class StateDataSourceConflictOptions(options: Seq[String])
+ extends StateDataSourceException(
+ "STDS_CONFLICT_OPTIONS",
+ Map("options" -> options.map(x => s"'$x'").mkString("[", ", ", "]")),
+ cause = null)
+
+class StataDataSourceCommittedBatchUnavailable(checkpointLocation: String)
+ extends StateDataSourceException(
+ "STDS_COMMITTED_BATCH_UNAVAILABLE",
+ Map("checkpointLocation" -> checkpointLocation),
+ cause = null)
+
+class StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions:
StateSourceOptions)
+ extends StateDataSourceException(
+ "STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE",
+ Map("sourceOptions" -> sourceOptions.toString),
+ cause = null)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
index 214f5f97330..0d69bf708e9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
@@ -67,8 +67,7 @@ class StateScan(
})
if (partitions.headOption.isEmpty) {
- throw new IllegalArgumentException("The state does not have any
partition. Please double " +
- s"check that the query points to the valid state. options:
$sourceOptions")
+ throw
StateDataSourceErrors.noPartitionDiscoveredInStateStore(sourceOptions)
} else {
// just a dummy query id because we are actually not running streaming
query
val queryId = UUID.randomUUID()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
index 8b4e2737744..96c1c01cede 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
@@ -41,10 +41,11 @@ class StateTable(
import StateTable._
if (!isValidSchema(schema)) {
- throw new IllegalStateException(s"Invalid schema is provided. Provided
schema: $schema for " +
- s"checkpoint location: ${sourceOptions.stateCheckpointLocation} ,
operatorId: " +
- s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName}, "
+
- s"joinSide: ${sourceOptions.joinSide}")
+ throw StateDataSourceErrors.internalError(
+ s"Invalid schema is provided. Provided schema: $schema for " +
+ s"checkpoint location: ${sourceOptions.stateCheckpointLocation} ,
operatorId: " +
+ s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName},
" +
+ s"joinSide: ${sourceOptions.joinSide}")
}
override def name(): String = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
index 1a3d42aa066..26492f8790c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
@@ -67,7 +67,7 @@ class StreamStreamJoinStatePartitionReader(
case JoinSideValues.left => LeftSide
case JoinSideValues.right => RightSide
case JoinSideValues.none =>
- throw new IllegalStateException("Unexpected join side for stream-stream
read!")
+ throw StateDataSourceErrors.internalError("Unexpected join side for
stream-stream read!")
}
/*
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
index ca123a9e501..4f88eccb748 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, SupportsRead, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{Batch, InputPartition,
PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
+import
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.PATH
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata,
OperatorStateMetadataReader, OperatorStateMetadataV1}
import org.apache.spark.sql.sources.DataSourceRegister
@@ -95,8 +96,7 @@ class StateMetadataTable extends Table with SupportsRead with
SupportsMetadataCo
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
() => {
if (!options.containsKey("path")) {
- throw new IllegalArgumentException("Checkpoint path is not specified
for" +
- " state metadata data source.")
+ throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
}
new StateMetadataScan(options.get("path"))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index bfc9ad2fe0f..86c3ab70af6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -49,7 +49,7 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
CheckLastBatch((6, 0), (7, 1), (8, 0))
)
- intercept[IllegalArgumentException] {
+ intercept[StateDataSourceReadStateSchemaFailure] {
spark.read.format("statestore").load(tempDir.getAbsolutePath)
}
}
@@ -67,7 +67,7 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
offsetLog.purgeAfter(0)
commitLog.purgeAfter(-1)
- intercept[IllegalStateException] {
+ intercept[StataDataSourceCommittedBatchUnavailable] {
spark.read.format("statestore").load(tempDir.getAbsolutePath)
}
}
@@ -98,67 +98,79 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
rewriteStateSchemaFileToDummy()
- intercept[IllegalArgumentException] {
+ intercept[StateDataSourceReadStateSchemaFailure] {
spark.read.format("statestore").load(tempDir.getAbsolutePath)
}
}
}
test("ERROR: path is not specified") {
- intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
spark.read.format("statestore").load()
}
+ checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+ Map("optionName" -> StateSourceOptions.PATH))
}
test("ERROR: operator ID specified to negative") {
withTempDir { tempDir =>
- intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
spark.read.format("statestore")
.option(StateSourceOptions.OPERATOR_ID, -1)
// trick to bypass getting the last committed batch before
validating operator ID
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+ Map("optionName" -> StateSourceOptions.OPERATOR_ID))
}
}
test("ERROR: batch ID specified to negative") {
withTempDir { tempDir =>
- intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
spark.read.format("statestore")
.option(StateSourceOptions.BATCH_ID, -1)
.load(tempDir.getAbsolutePath)
}
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+ Map("optionName" -> StateSourceOptions.BATCH_ID))
}
}
test("ERROR: store name is empty") {
withTempDir { tempDir =>
- intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceInvalidOptionValueIsEmpty] {
spark.read.format("statestore")
.option(StateSourceOptions.STORE_NAME, "")
// trick to bypass getting the last committed batch before
validating operator ID
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_EMPTY", "42616",
+ Map("optionName" -> StateSourceOptions.STORE_NAME))
}
}
test("ERROR: invalid value for joinSide option") {
withTempDir { tempDir =>
- intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceInvalidOptionValue] {
spark.read.format("statestore")
.option(StateSourceOptions.JOIN_SIDE, "both")
// trick to bypass getting the last committed batch before
validating operator ID
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.WITH_MESSAGE", "42616",
+ Map(
+ "optionName" -> StateSourceOptions.JOIN_SIDE,
+ "message" -> "Valid values are left,right,none"))
}
}
test("ERROR: both options `joinSide` and `storeName` are specified") {
withTempDir { tempDir =>
- intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceConflictOptions] {
spark.read.format("statestore")
.option(StateSourceOptions.JOIN_SIDE, "right")
.option(StateSourceOptions.STORE_NAME, "right-keyToNumValues")
@@ -166,6 +178,9 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)
}
+ checkError(exc, "STDS_CONFLICT_OPTIONS", "42613",
+ Map("options" ->
+ s"['${StateSourceOptions.JOIN_SIDE}',
'${StateSourceOptions.STORE_NAME}']"))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
index 340187fa495..9115af456eb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Column, Row}
+import
org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceUnspecifiedRequiredOption,
StateSourceOptions}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
@@ -208,9 +209,10 @@ class OperatorStateMetadataSuite extends StreamTest with
SharedSparkSession {
}
test("State metadata data source handle missing argument") {
- val e = intercept[IllegalArgumentException] {
+ val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
spark.read.format("state-metadata").load().collect()
}
- assert(e.getMessage == "Checkpoint path is not specified for state
metadata data source.")
+ checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+ Map("optionName" -> StateSourceOptions.PATH))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]