This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bde87d66d0d [SPARK-44252][SS] Define a new error class and apply for
the case where loading state from DFS fails
bde87d66d0d is described below
commit bde87d66d0d23d35ed82d412dac602c105b959a4
Author: Lucy Yao <[email protected]>
AuthorDate: Fri Jul 21 08:51:32 2023 +0900
[SPARK-44252][SS] Define a new error class and apply for the case where
loading state from DFS fails
### What changes were proposed in this pull request?
Migrated errors from the StateStoreProvider.getStore() and
StateStoreProvider.getReadStore() entry points to the new error class framework.
The ticket for this issue is:
https://issues.apache.org/jira/browse/SPARK-44252.
### Why are the changes needed?
Essentially, we are creating a wrapping loading error in getStore and
getReadStore to give better error context.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Wrote and updated tests that ensures the new errors are thrown as expected.
Closes #41705 from lucyyao-db/SC-132521-OSS.
Authored-by: Lucy Yao <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-classes.json | 69 +++++++++++--
...nditions-cannot-load-state-store-error-class.md | 69 +++++++++++++
docs/sql-error-conditions.md | 8 ++
.../spark/sql/errors/QueryExecutionErrors.scala | 107 +++++++++++++++++++--
.../state/HDFSBackedStateStoreProvider.scala | 31 +++---
.../sql/execution/streaming/state/RocksDB.scala | 8 +-
.../streaming/state/RocksDBFileManager.scala | 9 +-
.../state/RocksDBStateStoreProvider.scala | 27 ++++--
.../sql/execution/streaming/state/StateStore.scala | 9 +-
.../streaming/state/StateStoreChangelog.scala | 6 +-
.../execution/streaming/state/RocksDBSuite.scala | 77 +++++++++++++--
.../streaming/state/StateStoreSuite.scala | 63 ++++++++++--
12 files changed, 416 insertions(+), 67 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 4debf3da0b8..7913a9b9241 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -143,6 +143,65 @@
"Could not load Protobuf class with name <protobufClassName>.
<explanation>."
]
},
+ "CANNOT_LOAD_STATE_STORE" : {
+ "message" : [
+ "An error occurred during loading state."
+ ],
+ "subClass" : {
+ "CANNOT_READ_CHECKPOINT" : {
+ "message" : [
+ "Cannot read RocksDB checkpoint metadata. Expected
<expectedVersion>, but found <actualVersion>."
+ ]
+ },
+ "CANNOT_READ_DELTA_FILE_KEY_SIZE" : {
+ "message" : [
+ "Error reading delta file <fileToRead> of <clazz>: key size cannot
be <keySize>."
+ ]
+ },
+ "CANNOT_READ_DELTA_FILE_NOT_EXISTS" : {
+ "message" : [
+ "Error reading delta file <fileToRead> of <clazz>: <fileToRead> does
not exist."
+ ]
+ },
+ "CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE" : {
+ "message" : [
+ "Error reading snapshot file <fileToRead> of <clazz>: key size
cannot be <keySize>."
+ ]
+ },
+ "CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE" : {
+ "message" : [
+ "Error reading snapshot file <fileToRead> of <clazz>: value size
cannot be <valueSize>."
+ ]
+ },
+ "CANNOT_READ_STREAMING_STATE_FILE" : {
+ "message" : [
+ "Error reading streaming state file of <fileToRead> does not exist.
If the stream job is restarted with a new or updated state operation, please
create a new checkpoint location or clear the existing checkpoint location."
+ ]
+ },
+ "UNCATEGORIZED" : {
+ "message" : [
+ ""
+ ]
+ },
+ "UNEXPECTED_FILE_SIZE" : {
+ "message" : [
+ "Copied <dfsFile> to <localFile>, expected <expectedSize> bytes,
found <localFileSize> bytes."
+ ]
+ },
+ "UNEXPECTED_VERSION" : {
+ "message" : [
+ "Version cannot be <version> because it is less than 0."
+ ]
+ },
+ "UNRELEASED_THREAD_ERROR" : {
+ "message" : [
+ "<loggingId>: RocksDB instance could not be acquired by
<newAcquiredThreadInfo> as it was not released by <acquiredThreadInfo> after
<timeWaitedMs> ms.",
+ "Thread holding the lock has trace: <stackTraceOutput>"
+ ]
+ }
+ },
+ "sqlState" : "58030"
+ },
"CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE" : {
"message" : [
"Failed to merge incompatible data types <left> and <right>. Please
check the data types of the columns being merged and ensure that they are
compatible. If necessary, consider casting the columns to compatible data types
before attempting the merge."
@@ -5749,16 +5808,6 @@
"Foreach writer has been aborted due to a task failure."
]
},
- "_LEGACY_ERROR_TEMP_2258" : {
- "message" : [
- "Error reading delta file <fileToRead> of <clazz>: key size cannot be
<keySize>."
- ]
- },
- "_LEGACY_ERROR_TEMP_2259" : {
- "message" : [
- "Error reading snapshot file <fileToRead> of <clazz>: <message>"
- ]
- },
"_LEGACY_ERROR_TEMP_2260" : {
"message" : [
"Cannot purge as it might break internal state."
diff --git a/docs/sql-error-conditions-cannot-load-state-store-error-class.md
b/docs/sql-error-conditions-cannot-load-state-store-error-class.md
new file mode 100644
index 00000000000..50450be689f
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-load-state-store-error-class.md
@@ -0,0 +1,69 @@
+---
+layout: global
+title: CANNOT_LOAD_STATE_STORE error class
+displayTitle: CANNOT_LOAD_STATE_STORE error class
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+SQLSTATE: 58030
+
+An error occurred during loading state.
+
+This error class has the following derived error classes:
+
+## CANNOT_READ_CHECKPOINT
+
+Cannot read RocksDB checkpoint metadata. Expected `<expectedVersion>`, but
found `<actualVersion>`.
+
+## CANNOT_READ_DELTA_FILE_KEY_SIZE
+
+Error reading delta file `<fileToRead>` of `<clazz>`: key size cannot be
`<keySize>`.
+
+## CANNOT_READ_DELTA_FILE_NOT_EXISTS
+
+Error reading delta file `<fileToRead>` of `<clazz>`: `<fileToRead>` does not
exist.
+
+## CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE
+
+Error reading snapshot file `<fileToRead>` of `<clazz>`: key size cannot be
`<keySize>`.
+
+## CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE
+
+Error reading snapshot file `<fileToRead>` of `<clazz>`: value size cannot be
`<valueSize>`.
+
+## CANNOT_READ_STREAMING_STATE_FILE
+
+Error reading streaming state file of `<fileToRead>` does not exist. If the
stream job is restarted with a new or updated state operation, please create a
new checkpoint location or clear the existing checkpoint location.
+
+## UNCATEGORIZED
+
+
+
+## UNEXPECTED_FILE_SIZE
+
+Copied `<dfsFile>` to `<localFile>`, expected `<expectedSize>` bytes, found
`<localFileSize>` bytes.
+
+## UNEXPECTED_VERSION
+
+Version cannot be `<version>` because it is less than 0.
+
+## UNRELEASED_THREAD_ERROR
+
+`<loggingId>`: RocksDB instance could not be acquired by
`<newAcquiredThreadInfo>` as it was not released by `<acquiredThreadInfo>`
after `<timeWaitedMs>` ms.
+Thread holding the lock has trace: `<stackTraceOutput>`
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 00fe6d75f53..cd04c414df3 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -171,6 +171,14 @@ SQLSTATE: none assigned
Could not load Protobuf class with name `<protobufClassName>`. `<explanation>`.
+###
[CANNOT_LOAD_STATE_STORE](sql-error-conditions-cannot-load-state-store-error-class.html)
+
+SQLSTATE: 58030
+
+An error occurred during loading state.
+
+For more details see
[CANNOT_LOAD_STATE_STORE](sql-error-conditions-cannot-load-state-store-error-class.html)
+
### CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE
[SQLSTATE:
42825](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 433d23eaf3f..983648ff673 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.errors
-import java.io.{FileNotFoundException, IOException}
+import java.io.{File, FileNotFoundException, IOException}
import java.lang.reflect.InvocationTargetException
import java.net.{URISyntaxException, URL}
import java.time.{DateTimeException, LocalDate}
@@ -2442,26 +2442,64 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
s"but it's ${endSeconds.toString} now.")
}
- def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize:
Int): Throwable = {
+ def failedToReadDeltaFileKeySizeError(
+ fileToRead: Path,
+ clazz: String,
+ keySize: Int): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2258",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_KEY_SIZE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
- "keySize" -> keySize.toString()),
+ "keySize" -> keySize.toString),
cause = null)
}
- def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message:
String): Throwable = {
+ def failedToReadDeltaFileNotExistsError(
+ fileToRead: Path,
+ clazz: String,
+ f: Throwable): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2259",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+ messageParameters = Map(
+ "fileToRead" -> fileToRead.toString(),
+ "clazz" -> clazz),
+ cause = f)
+ }
+
+ def failedToReadSnapshotFileKeySizeError(
+ fileToRead: Path,
+ clazz: String,
+ keySize: Int): Throwable = {
+ new SparkException(
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
- "message" -> message),
+ "keySize" -> keySize.toString),
cause = null)
}
+ def failedToReadSnapshotFileValueSizeError(
+ fileToRead: Path,
+ clazz: String,
+ valueSize: Int): Throwable = {
+ new SparkException(
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE",
+ messageParameters = Map(
+ "fileToRead" -> fileToRead.toString(),
+ "clazz" -> clazz,
+ "valueSize" -> valueSize.toString),
+ cause = null)
+ }
+
+ def failedToReadStreamingStateFileError(fileToRead: Path, f: Throwable):
Throwable = {
+ new SparkException(
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+ messageParameters = Map("fileToRead" -> fileToRead.toString()),
+ cause = f)
+ }
+
def cannotPurgeAsBreakInternalStateError():
SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
errorClass = "_LEGACY_ERROR_TEMP_2260",
@@ -2836,6 +2874,61 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
"enumString" -> enumString))
}
+ def unreleasedThreadError(
+ loggingId: String,
+ newAcquiredThreadInfo: String,
+ acquiredThreadInfo: String,
+ timeWaitedMs: Long,
+ stackTraceOutput: String): Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+ messageParameters = Map(
+ "loggingId" -> loggingId,
+ "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+ "acquiredThreadInfo" -> acquiredThreadInfo,
+ "timeWaitedMs" -> timeWaitedMs.toString,
+ "stackTraceOutput" -> stackTraceOutput),
+ cause = null)
+ }
+
+ def cannotReadCheckpoint(expectedVersion: String, actualVersion: String):
Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+ messageParameters = Map(
+ "expectedVersion" -> expectedVersion,
+ "actualVersion" -> actualVersion),
+ cause = null)
+ }
+
+ def unexpectedFileSize(
+ dfsFile: Path,
+ localFile: File,
+ expectedSize: Long,
+ localFileSize: Long): Throwable = {
+ new SparkException(
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_FILE_SIZE",
+ messageParameters = Map(
+ "dfsFile" -> dfsFile.toString,
+ "localFile" -> localFile.toString,
+ "expectedSize" -> expectedSize.toString,
+ "localFileSize" -> localFileSize.toString),
+ cause = null)
+ }
+
+ def unexpectedStateStoreVersion(version: Long): Throwable = {
+ new SparkException(
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+ messageParameters = Map("version" -> version.toString),
+ cause = null)
+ }
+
+ def cannotLoadStore(e: Throwable): Throwable = {
+ new SparkException(
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ messageParameters = Map.empty,
+ cause = e)
+ }
+
def hllInvalidLgK(function: String, min: Int, max: Int, value: String):
Throwable = {
new SparkRuntimeException(
errorClass = "HLL_INVALID_LG_K",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index fbf4b357a35..afa1fdaa223 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -218,12 +218,19 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
private def getLoadedMapForStore(version: Long): HDFSBackedStateStoreMap =
synchronized {
- require(version >= 0, "Version cannot be less than 0")
- val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
- if (version > 0) {
- newMap.putAll(loadMap(version))
+ try {
+ if (version < 0) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+ }
+ val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
+ if (version > 0) {
+ newMap.putAll(loadMap(version))
+ }
+ newMap
+ }
+ catch {
+ case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
- newMap
}
override def init(
@@ -457,8 +464,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
fm.open(fileToRead)
} catch {
case f: FileNotFoundException =>
- throw new IllegalStateException(
- s"Error reading delta file $fileToRead of $this: $fileToRead does
not exist", f)
+ throw
QueryExecutionErrors.failedToReadDeltaFileNotExistsError(fileToRead,
toString(), f)
}
try {
input = decompressStream(sourceStream)
@@ -469,7 +475,8 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
if (keySize == -1) {
eof = true
} else if (keySize < 0) {
- throw QueryExecutionErrors.failedToReadDeltaFileError(fileToRead,
toString(), keySize)
+ throw QueryExecutionErrors.failedToReadDeltaFileKeySizeError(
+ fileToRead, toString(), keySize)
} else {
val keyRowBuffer = new Array[Byte](keySize)
ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
@@ -572,8 +579,8 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
if (keySize == -1) {
eof = true
} else if (keySize < 0) {
- throw QueryExecutionErrors.failedToReadSnapshotFileError(
- fileToRead, toString(), s"key size cannot be $keySize")
+ throw QueryExecutionErrors.failedToReadSnapshotFileKeySizeError(
+ fileToRead, toString(), keySize)
} else {
val keyRowBuffer = new Array[Byte](keySize)
ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
@@ -583,8 +590,8 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
val valueSize = input.readInt()
if (valueSize < 0) {
- throw QueryExecutionErrors.failedToReadSnapshotFileError(
- fileToRead, toString(), s"value size cannot be $valueSize")
+ throw QueryExecutionErrors.failedToReadSnapshotFileValueSizeError(
+ fileToRead, toString(), valueSize)
} else {
val valueRowBuffer = new Array[Byte](valueSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 65299ea37ef..7961c5e716b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -34,6 +34,7 @@ import org.rocksdb.TickerType._
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.util.{NextIterator, Utils}
/**
@@ -553,11 +554,8 @@ class RocksDB(
}
if (isAcquiredByDifferentThread) {
val stackTraceOutput =
acquiredThreadInfo.threadRef.get.get.getStackTrace.mkString("\n")
- val msg = s"RocksDB instance could not be acquired by
$newAcquiredThreadInfo as it " +
- s"was not released by $acquiredThreadInfo after $timeWaitedMs ms.\n" +
- s"Thread holding the lock has trace: $stackTraceOutput"
- logError(msg)
- throw new IllegalStateException(s"$loggingId: $msg")
+ throw QueryExecutionErrors.unreleasedThreadError(loggingId,
newAcquiredThreadInfo.toString,
+ acquiredThreadInfo.toString, timeWaitedMs, stackTraceOutput)
} else {
acquiredThreadInfo = newAcquiredThreadInfo
// Add a listener to always release the lock when the task (if active)
completes
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 0891d773713..ed04472b62c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -40,6 +40,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils
@@ -526,9 +527,8 @@ class RocksDBFileManager(
val localFileSize = localFile.length()
val expectedSize = file.sizeBytes
if (localFileSize != expectedSize) {
- throw new IllegalStateException(
- s"Copied $dfsFile to $localFile," +
- s" expected $expectedSize bytes, found $localFileSize bytes ")
+ throw QueryExecutionErrors.unexpectedFileSize(dfsFile, localFile,
expectedSize,
+ localFileSize)
}
filesCopied += 1
bytesCopied += localFileSize
@@ -717,8 +717,7 @@ object RocksDBCheckpointMetadata {
try {
val versionLine = reader.readLine()
if (versionLine != s"v$VERSION") {
- throw new IllegalStateException(
- s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+ throw QueryExecutionErrors.cannotReadCheckpoint(versionLine,
s"v$VERSION")
}
Serialization.read[RocksDBCheckpointMetadata](reader)
} finally {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 10f207c7ec1..53fd06fd24c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -190,15 +191,29 @@ private[sql] class RocksDBStateStoreProvider
override def stateStoreId: StateStoreId = stateStoreId_
override def getStore(version: Long): StateStore = {
- require(version >= 0, "Version cannot be less than 0")
- rocksDB.load(version)
- new RocksDBStateStore(version)
+ try {
+ if (version < 0) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+ }
+ rocksDB.load(version)
+ new RocksDBStateStore(version)
+ }
+ catch {
+ case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
+ }
}
override def getReadStore(version: Long): StateStore = {
- require(version >= 0, "Version cannot be less than 0")
- rocksDB.load(version, true)
- new RocksDBStateStore(version)
+ try {
+ if (version < 0) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+ }
+ rocksDB.load(version, true)
+ new RocksDBStateStore(version)
+ }
+ catch {
+ case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
+ }
}
override def doMaintenance(): Unit = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 96c7b61f205..359cff81aea 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
import org.apache.spark.sql.types.StructType
@@ -486,7 +487,9 @@ object StateStore extends Logging {
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): ReadStateStore = {
- require(version >= 0)
+ if (version < 0) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+ }
val storeProvider = getStateStoreProvider(storeProviderId, keySchema,
valueSchema,
numColsPrefixKey, storeConf, hadoopConf)
storeProvider.getReadStore(version)
@@ -501,7 +504,9 @@ object StateStore extends Logging {
version: Long,
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStore = {
- require(version >= 0)
+ if (version < 0) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+ }
val storeProvider = getStateStoreProvider(storeProviderId, keySchema,
valueSchema,
numColsPrefixKey, storeConf, hadoopConf)
storeProvider.getStore(version)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 372cbb6d986..f15feb2b2ae 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FSError, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
import org.apache.spark.util.NextIterator
@@ -130,10 +131,7 @@ class StateStoreChangelogReader(
fm.open(fileToRead)
} catch {
case f: FileNotFoundException =>
- throw new IllegalStateException(
- s"Error reading streaming state file of $fileToRead does not exist. " +
- "If the stream job is restarted with a new or updated state
operation, please" +
- " create a new checkpoint location or clear the existing checkpoint
location.", f)
+ throw
QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f)
}
private val input: DataInputStream = decompressStream(sourceStream)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index e31b05c362f..b4b67f381d2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration
import org.scalactic.source.Position
import org.scalatest.Tag
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
@@ -123,12 +124,37 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
test("RocksDB: load version that doesn't exist") {
+ val provider = new RocksDBStateStoreProvider()
+ var ex = intercept[SparkException] {
+ provider.getStore(-1)
+ }
+ checkError(
+ ex,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
+ ex = intercept[SparkException] {
+ provider.getReadStore(-1)
+ }
+ checkError(
+ ex,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
+
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets
created
withDB(remoteDir) { db =>
- intercept[IllegalStateException] {
+ ex = intercept[SparkException] {
db.load(1)
}
+ checkError(
+ ex,
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+ parameters = Map(
+ "fileToRead" -> s"$remoteDir/1.changelog"
+ )
+ )
}
}
@@ -704,12 +730,21 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
db.load(0) // Current thread should be able to load again
// Another thread should not be able to load while current thread is
using it
- val ex = intercept[IllegalStateException] {
+ var ex = intercept[SparkException] {
ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
}
- // Assert that the error message contains the stack trace
- assert(ex.getMessage.contains("Thread holding the lock has trace:"))
- assert(ex.getMessage.contains("runInNewThread"))
+ checkError(
+ ex,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+ parameters = Map(
+ "loggingId" -> "\\[Thread-\\d+\\]",
+ "newAcquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+ "acquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+ "timeWaitedMs" -> "\\d+",
+ "stackTraceOutput" -> "(?s).*"
+ ),
+ matchPVals = true
+ )
// Commit should release the instance allowing other threads to load
new version
db.commit()
@@ -720,9 +755,21 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
// Another thread should not be able to load while current thread is
using it
db.load(2)
- intercept[IllegalStateException] {
+ ex = intercept[SparkException] {
ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
}
+ checkError(
+ ex,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+ parameters = Map(
+ "loggingId" -> "\\[Thread-\\d+\\]",
+ "newAcquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+ "acquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+ "timeWaitedMs" -> "\\d+",
+ "stackTraceOutput" -> "(?s).*"
+ ),
+ matchPVals = true
+ )
// Rollback should release the instance allowing other threads to load
new version
db.rollback()
@@ -752,6 +799,24 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
test("checkpoint metadata serde roundtrip") {
+ // expect read metadata error when metadata uses unsupported version
+ withTempDir { dir =>
+ val file2 = new File(dir, "json")
+ val json2 = """{"sstFiles":[],"numKeys":0}"""
+ FileUtils.write(file2, s"v2\n$json2")
+ val e = intercept[SparkException] {
+ RocksDBCheckpointMetadata.readFromFile(file2)
+ }
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+ parameters = Map(
+ "expectedVersion" -> "v2",
+ "actualVersion" -> "v1"
+ )
+ )
+ }
+
def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String):
Unit = {
assert(metadata.json == json)
withTempDir { dir =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 02aa12b325f..6c4e259bac5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -236,22 +236,40 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
// Corrupt snapshot file and verify that it throws error
assert(getData(provider, snapshotVersion) === Set(("a", 0) ->
snapshotVersion))
corruptFile(provider, snapshotVersion, isSnapshot = true)
- intercept[Exception] {
+ var e = intercept[SparkException] {
getData(provider, snapshotVersion)
}
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
// Corrupt delta file and verify that it throws error
assert(getData(provider, snapshotVersion - 1) === Set(("a", 0) ->
(snapshotVersion - 1)))
corruptFile(provider, snapshotVersion - 1, isSnapshot = false)
- intercept[Exception] {
+ e = intercept[SparkException] {
getData(provider, snapshotVersion - 1)
}
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
// Delete delta file and verify that it throws error
deleteFilesEarlierThanVersion(provider, snapshotVersion)
- intercept[Exception] {
+ e = intercept[SparkException] {
getData(provider, snapshotVersion - 1)
}
+ checkError(
+ e.getCause.asInstanceOf[SparkThrowable],
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+ parameters = Map(
+ "fileToRead" ->
s"${provider.stateStoreId.storeCheckpointLocation()}/1.delta",
+ "clazz" -> s"${provider.toString}"
+ )
+ )
}
}
@@ -900,12 +918,17 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
assert(getLatestData(provider) === Set(("b", 0) -> 2))
// Trying to get newer versions should fail
- intercept[Exception] {
+ var e = intercept[SparkException] {
provider.getStore(2)
}
- intercept[Exception] {
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("does not exist"))
+
+ e = intercept[SparkException] {
getData(provider, 2)
}
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("does not exist"))
// New updates to the reloaded store with new version, and does not
change old version
tryWithProviderResource(newStoreProvider(store.id)) { reloadedProvider =>
@@ -1043,9 +1066,14 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
testWithAllCodec("getStore with invalid versions") {
tryWithProviderResource(newStoreProvider()) { provider =>
def checkInvalidVersion(version: Int): Unit = {
- intercept[Exception] {
+ val e = intercept[SparkException] {
provider.getStore(version)
}
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
}
checkInvalidVersion(-1)
@@ -1196,16 +1224,31 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
val hadoopConf = new Configuration()
// Verify that trying to get incorrect versions throw errors
- intercept[IllegalArgumentException] {
+ var e = intercept[SparkException] {
StateStore.get(
storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf)
}
- assert(!StateStore.isLoaded(storeId)) // version -1 should not
attempt to load the store
-
- intercept[IllegalStateException] {
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+ parameters = Map(
+ "version" -> "-1"
+ )
+ )
+
+ e = intercept[SparkException] {
StateStore.get(
storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
}
+ checkError(
+ e.getCause.asInstanceOf[SparkThrowable],
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+ parameters = Map(
+ "fileToRead" -> s"$dir/0/0/1.delta",
+ "clazz" -> "HDFSStateStoreProvider\\[.+\\]"
+ ),
+ matchPVals = true
+ )
// Increase version of the store and try to get again
val store0 = StateStore.get(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]