This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bde87d66d0d [SPARK-44252][SS] Define a new error class and apply for 
the case where loading state from DFS fails
bde87d66d0d is described below

commit bde87d66d0d23d35ed82d412dac602c105b959a4
Author: Lucy Yao <lucy....@databricks.com>
AuthorDate: Fri Jul 21 08:51:32 2023 +0900

    [SPARK-44252][SS] Define a new error class and apply for the case where 
loading state from DFS fails
    
    ### What changes were proposed in this pull request?
    
    Migrated errors from the StateStoreProvider.getStore() and 
StateStoreProvider.getReadStore() entry points to the new error class framework.
    
    The ticket for this issue is: 
https://issues.apache.org/jira/browse/SPARK-44252.
    
    ### Why are the changes needed?
    
    Essentially, we are creating a wrapping loading error in getStore and 
getReadStore to give better error context.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    
    ### How was this patch tested?
    
    Wrote and updated tests that ensures the new errors are thrown as expected.
    
    Closes #41705 from lucyyao-db/SC-132521-OSS.
    
    Authored-by: Lucy Yao <lucy....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  69 +++++++++++--
 ...nditions-cannot-load-state-store-error-class.md |  69 +++++++++++++
 docs/sql-error-conditions.md                       |   8 ++
 .../spark/sql/errors/QueryExecutionErrors.scala    | 107 +++++++++++++++++++--
 .../state/HDFSBackedStateStoreProvider.scala       |  31 +++---
 .../sql/execution/streaming/state/RocksDB.scala    |   8 +-
 .../streaming/state/RocksDBFileManager.scala       |   9 +-
 .../state/RocksDBStateStoreProvider.scala          |  27 ++++--
 .../sql/execution/streaming/state/StateStore.scala |   9 +-
 .../streaming/state/StateStoreChangelog.scala      |   6 +-
 .../execution/streaming/state/RocksDBSuite.scala   |  77 +++++++++++++--
 .../streaming/state/StateStoreSuite.scala          |  63 ++++++++++--
 12 files changed, 416 insertions(+), 67 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 4debf3da0b8..7913a9b9241 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -143,6 +143,65 @@
       "Could not load Protobuf class with name <protobufClassName>. 
<explanation>."
     ]
   },
+  "CANNOT_LOAD_STATE_STORE" : {
+    "message" : [
+      "An error occurred during loading state."
+    ],
+    "subClass" : {
+      "CANNOT_READ_CHECKPOINT" : {
+        "message" : [
+          "Cannot read RocksDB checkpoint metadata. Expected 
<expectedVersion>, but found <actualVersion>."
+        ]
+      },
+      "CANNOT_READ_DELTA_FILE_KEY_SIZE" : {
+        "message" : [
+          "Error reading delta file <fileToRead> of <clazz>: key size cannot 
be <keySize>."
+        ]
+      },
+      "CANNOT_READ_DELTA_FILE_NOT_EXISTS" : {
+        "message" : [
+          "Error reading delta file <fileToRead> of <clazz>: <fileToRead> does 
not exist."
+        ]
+      },
+      "CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE" : {
+        "message" : [
+          "Error reading snapshot file <fileToRead> of <clazz>: key size 
cannot be <keySize>."
+        ]
+      },
+      "CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE" : {
+        "message" : [
+          "Error reading snapshot file <fileToRead> of <clazz>: value size 
cannot be <valueSize>."
+        ]
+      },
+      "CANNOT_READ_STREAMING_STATE_FILE" : {
+        "message" : [
+          "Error reading streaming state file of <fileToRead> does not exist. 
If the stream job is restarted with a new or updated state operation, please 
create a new checkpoint location or clear the existing checkpoint location."
+        ]
+      },
+      "UNCATEGORIZED" : {
+        "message" : [
+          ""
+        ]
+      },
+      "UNEXPECTED_FILE_SIZE" : {
+        "message" : [
+          "Copied <dfsFile> to <localFile>, expected <expectedSize> bytes, 
found <localFileSize> bytes."
+        ]
+      },
+      "UNEXPECTED_VERSION" : {
+        "message" : [
+          "Version cannot be <version> because it is less than 0."
+        ]
+      },
+      "UNRELEASED_THREAD_ERROR" : {
+        "message" : [
+          "<loggingId>: RocksDB instance could not be acquired by 
<newAcquiredThreadInfo> as it was not released by <acquiredThreadInfo> after 
<timeWaitedMs> ms.",
+          "Thread holding the lock has trace: <stackTraceOutput>"
+        ]
+      }
+    },
+    "sqlState" : "58030"
+  },
   "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE" : {
     "message" : [
       "Failed to merge incompatible data types <left> and <right>. Please 
check the data types of the columns being merged and ensure that they are 
compatible. If necessary, consider casting the columns to compatible data types 
before attempting the merge."
@@ -5749,16 +5808,6 @@
       "Foreach writer has been aborted due to a task failure."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2258" : {
-    "message" : [
-      "Error reading delta file <fileToRead> of <clazz>: key size cannot be 
<keySize>."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2259" : {
-    "message" : [
-      "Error reading snapshot file <fileToRead> of <clazz>: <message>"
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2260" : {
     "message" : [
       "Cannot purge as it might break internal state."
diff --git a/docs/sql-error-conditions-cannot-load-state-store-error-class.md 
b/docs/sql-error-conditions-cannot-load-state-store-error-class.md
new file mode 100644
index 00000000000..50450be689f
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-load-state-store-error-class.md
@@ -0,0 +1,69 @@
+---
+layout: global
+title: CANNOT_LOAD_STATE_STORE error class
+displayTitle: CANNOT_LOAD_STATE_STORE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+SQLSTATE: 58030
+
+An error occurred during loading state.
+
+This error class has the following derived error classes:
+
+## CANNOT_READ_CHECKPOINT
+
+Cannot read RocksDB checkpoint metadata. Expected `<expectedVersion>`, but 
found `<actualVersion>`.
+
+## CANNOT_READ_DELTA_FILE_KEY_SIZE
+
+Error reading delta file `<fileToRead>` of `<clazz>`: key size cannot be 
`<keySize>`.
+
+## CANNOT_READ_DELTA_FILE_NOT_EXISTS
+
+Error reading delta file `<fileToRead>` of `<clazz>`: `<fileToRead>` does not 
exist.
+
+## CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE
+
+Error reading snapshot file `<fileToRead>` of `<clazz>`: key size cannot be 
`<keySize>`.
+
+## CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE
+
+Error reading snapshot file `<fileToRead>` of `<clazz>`: value size cannot be 
`<valueSize>`.
+
+## CANNOT_READ_STREAMING_STATE_FILE
+
+Error reading streaming state file of `<fileToRead>` does not exist. If the 
stream job is restarted with a new or updated state operation, please create a 
new checkpoint location or clear the existing checkpoint location.
+
+## UNCATEGORIZED
+
+
+
+## UNEXPECTED_FILE_SIZE
+
+Copied `<dfsFile>` to `<localFile>`, expected `<expectedSize>` bytes, found 
`<localFileSize>` bytes.
+
+## UNEXPECTED_VERSION
+
+Version cannot be `<version>` because it is less than 0.
+
+## UNRELEASED_THREAD_ERROR
+
+`<loggingId>`: RocksDB instance could not be acquired by 
`<newAcquiredThreadInfo>` as it was not released by `<acquiredThreadInfo>` 
after `<timeWaitedMs>` ms.
+Thread holding the lock has trace: `<stackTraceOutput>`
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 00fe6d75f53..cd04c414df3 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -171,6 +171,14 @@ SQLSTATE: none assigned
 
 Could not load Protobuf class with name `<protobufClassName>`. `<explanation>`.
 
+### 
[CANNOT_LOAD_STATE_STORE](sql-error-conditions-cannot-load-state-store-error-class.html)
+
+SQLSTATE: 58030
+
+An error occurred during loading state.
+
+For more details see 
[CANNOT_LOAD_STATE_STORE](sql-error-conditions-cannot-load-state-store-error-class.html)
+
 ### CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE
 
 [SQLSTATE: 
42825](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 433d23eaf3f..983648ff673 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.errors
 
-import java.io.{FileNotFoundException, IOException}
+import java.io.{File, FileNotFoundException, IOException}
 import java.lang.reflect.InvocationTargetException
 import java.net.{URISyntaxException, URL}
 import java.time.{DateTimeException, LocalDate}
@@ -2442,26 +2442,64 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         s"but it's ${endSeconds.toString} now.")
   }
 
-  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: 
Int): Throwable = {
+  def failedToReadDeltaFileKeySizeError(
+      fileToRead: Path,
+      clazz: String,
+      keySize: Int): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2258",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_KEY_SIZE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
-        "keySize" -> keySize.toString()),
+        "keySize" -> keySize.toString),
       cause = null)
   }
 
-  def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: 
String): Throwable = {
+  def failedToReadDeltaFileNotExistsError(
+      fileToRead: Path,
+      clazz: String,
+      f: Throwable): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2259",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+      messageParameters = Map(
+        "fileToRead" -> fileToRead.toString(),
+        "clazz" -> clazz),
+      cause = f)
+  }
+
+  def failedToReadSnapshotFileKeySizeError(
+      fileToRead: Path,
+      clazz: String,
+      keySize: Int): Throwable = {
+    new SparkException(
+      errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
-        "message" -> message),
+        "keySize" -> keySize.toString),
       cause = null)
   }
 
+  def failedToReadSnapshotFileValueSizeError(
+      fileToRead: Path,
+      clazz: String,
+      valueSize: Int): Throwable = {
+    new SparkException(
+      errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE",
+      messageParameters = Map(
+        "fileToRead" -> fileToRead.toString(),
+        "clazz" -> clazz,
+        "valueSize" -> valueSize.toString),
+      cause = null)
+  }
+
+  def failedToReadStreamingStateFileError(fileToRead: Path, f: Throwable): 
Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+      messageParameters = Map("fileToRead" -> fileToRead.toString()),
+      cause = f)
+  }
+
   def cannotPurgeAsBreakInternalStateError(): 
SparkUnsupportedOperationException = {
     new SparkUnsupportedOperationException(
       errorClass = "_LEGACY_ERROR_TEMP_2260",
@@ -2836,6 +2874,61 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(
+      loggingId: String,
+      newAcquiredThreadInfo: String,
+      acquiredThreadInfo: String,
+      timeWaitedMs: Long,
+      stackTraceOutput: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+      messageParameters = Map(
+        "loggingId" -> loggingId,
+        "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+        "acquiredThreadInfo" -> acquiredThreadInfo,
+        "timeWaitedMs" -> timeWaitedMs.toString,
+        "stackTraceOutput" -> stackTraceOutput),
+      cause = null)
+  }
+
+  def cannotReadCheckpoint(expectedVersion: String, actualVersion: String): 
Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+      messageParameters = Map(
+        "expectedVersion" -> expectedVersion,
+        "actualVersion" -> actualVersion),
+      cause = null)
+  }
+
+  def unexpectedFileSize(
+      dfsFile: Path,
+      localFile: File,
+      expectedSize: Long,
+      localFileSize: Long): Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_FILE_SIZE",
+      messageParameters = Map(
+        "dfsFile" -> dfsFile.toString,
+        "localFile" -> localFile.toString,
+        "expectedSize" -> expectedSize.toString,
+        "localFileSize" -> localFileSize.toString),
+      cause = null)
+  }
+
+  def unexpectedStateStoreVersion(version: Long): Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+      messageParameters = Map("version" -> version.toString),
+      cause = null)
+  }
+
+  def cannotLoadStore(e: Throwable): Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+      messageParameters = Map.empty,
+      cause = e)
+  }
+
   def hllInvalidLgK(function: String, min: Int, max: Int, value: String): 
Throwable = {
     new SparkRuntimeException(
       errorClass = "HLL_INVALID_LG_K",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index fbf4b357a35..afa1fdaa223 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -218,12 +218,19 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   private def getLoadedMapForStore(version: Long): HDFSBackedStateStoreMap = 
synchronized {
-    require(version >= 0, "Version cannot be less than 0")
-    val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
-    if (version > 0) {
-      newMap.putAll(loadMap(version))
+    try {
+      if (version < 0) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+      }
+      val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
+      if (version > 0) {
+        newMap.putAll(loadMap(version))
+      }
+      newMap
+    }
+    catch {
+      case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
     }
-    newMap
   }
 
   override def init(
@@ -457,8 +464,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       fm.open(fileToRead)
     } catch {
       case f: FileNotFoundException =>
-        throw new IllegalStateException(
-          s"Error reading delta file $fileToRead of $this: $fileToRead does 
not exist", f)
+        throw 
QueryExecutionErrors.failedToReadDeltaFileNotExistsError(fileToRead, 
toString(), f)
     }
     try {
       input = decompressStream(sourceStream)
@@ -469,7 +475,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         if (keySize == -1) {
           eof = true
         } else if (keySize < 0) {
-          throw QueryExecutionErrors.failedToReadDeltaFileError(fileToRead, 
toString(), keySize)
+          throw QueryExecutionErrors.failedToReadDeltaFileKeySizeError(
+            fileToRead, toString(), keySize)
         } else {
           val keyRowBuffer = new Array[Byte](keySize)
           ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
@@ -572,8 +579,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         if (keySize == -1) {
           eof = true
         } else if (keySize < 0) {
-          throw QueryExecutionErrors.failedToReadSnapshotFileError(
-            fileToRead, toString(), s"key size cannot be $keySize")
+          throw QueryExecutionErrors.failedToReadSnapshotFileKeySizeError(
+            fileToRead, toString(), keySize)
         } else {
           val keyRowBuffer = new Array[Byte](keySize)
           ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
@@ -583,8 +590,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
           val valueSize = input.readInt()
           if (valueSize < 0) {
-            throw QueryExecutionErrors.failedToReadSnapshotFileError(
-              fileToRead, toString(), s"value size cannot be $valueSize")
+            throw QueryExecutionErrors.failedToReadSnapshotFileValueSizeError(
+              fileToRead, toString(), valueSize)
           } else {
             val valueRowBuffer = new Array[Byte](valueSize)
             ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 65299ea37ef..7961c5e716b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -34,6 +34,7 @@ import org.rocksdb.TickerType._
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.{NextIterator, Utils}
 
 /**
@@ -553,11 +554,8 @@ class RocksDB(
     }
     if (isAcquiredByDifferentThread) {
       val stackTraceOutput = 
acquiredThreadInfo.threadRef.get.get.getStackTrace.mkString("\n")
-      val msg = s"RocksDB instance could not be acquired by 
$newAcquiredThreadInfo as it " +
-        s"was not released by $acquiredThreadInfo after $timeWaitedMs ms.\n" +
-        s"Thread holding the lock has trace: $stackTraceOutput"
-      logError(msg)
-      throw new IllegalStateException(s"$loggingId: $msg")
+      throw QueryExecutionErrors.unreleasedThreadError(loggingId, 
newAcquiredThreadInfo.toString,
+        acquiredThreadInfo.toString, timeWaitedMs, stackTraceOutput)
     } else {
       acquiredThreadInfo = newAcquiredThreadInfo
       // Add a listener to always release the lock when the task (if active) 
completes
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 0891d773713..ed04472b62c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -40,6 +40,7 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import org.apache.spark.util.Utils
 
@@ -526,9 +527,8 @@ class RocksDBFileManager(
         val localFileSize = localFile.length()
         val expectedSize = file.sizeBytes
         if (localFileSize != expectedSize) {
-          throw new IllegalStateException(
-            s"Copied $dfsFile to $localFile," +
-              s" expected $expectedSize bytes, found $localFileSize bytes ")
+          throw QueryExecutionErrors.unexpectedFileSize(dfsFile, localFile, 
expectedSize,
+            localFileSize)
         }
         filesCopied += 1
         bytesCopied += localFileSize
@@ -717,8 +717,7 @@ object RocksDBCheckpointMetadata {
     try {
       val versionLine = reader.readLine()
       if (versionLine != s"v$VERSION") {
-        throw new IllegalStateException(
-          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+        throw QueryExecutionErrors.cannotReadCheckpoint(versionLine, 
s"v$VERSION")
       }
       Serialization.read[RocksDBCheckpointMetadata](reader)
     } finally {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 10f207c7ec1..53fd06fd24c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -190,15 +191,29 @@ private[sql] class RocksDBStateStoreProvider
   override def stateStoreId: StateStoreId = stateStoreId_
 
   override def getStore(version: Long): StateStore = {
-    require(version >= 0, "Version cannot be less than 0")
-    rocksDB.load(version)
-    new RocksDBStateStore(version)
+    try {
+      if (version < 0) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+      }
+      rocksDB.load(version)
+      new RocksDBStateStore(version)
+    }
+    catch {
+      case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
+    }
   }
 
   override def getReadStore(version: Long): StateStore = {
-    require(version >= 0, "Version cannot be less than 0")
-    rocksDB.load(version, true)
-    new RocksDBStateStore(version)
+    try {
+      if (version < 0) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+      }
+      rocksDB.load(version, true)
+      new RocksDBStateStore(version)
+    }
+    catch {
+      case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
+    }
   }
 
   override def doMaintenance(): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 96c7b61f205..359cff81aea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import org.apache.spark.sql.types.StructType
@@ -486,7 +487,9 @@ object StateStore extends Logging {
       version: Long,
       storeConf: StateStoreConf,
       hadoopConf: Configuration): ReadStateStore = {
-    require(version >= 0)
+    if (version < 0) {
+      throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+    }
     val storeProvider = getStateStoreProvider(storeProviderId, keySchema, 
valueSchema,
       numColsPrefixKey, storeConf, hadoopConf)
     storeProvider.getReadStore(version)
@@ -501,7 +504,9 @@ object StateStore extends Logging {
       version: Long,
       storeConf: StateStoreConf,
       hadoopConf: Configuration): StateStore = {
-    require(version >= 0)
+    if (version < 0) {
+      throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
+    }
     val storeProvider = getStateStoreProvider(storeProviderId, keySchema, 
valueSchema,
       numColsPrefixKey, storeConf, hadoopConf)
     storeProvider.getStore(version)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 372cbb6d986..f15feb2b2ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FSError, Path}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 import org.apache.spark.util.NextIterator
@@ -130,10 +131,7 @@ class StateStoreChangelogReader(
     fm.open(fileToRead)
   } catch {
     case f: FileNotFoundException =>
-      throw new IllegalStateException(
-        s"Error reading streaming state file of $fileToRead does not exist. " +
-          "If the stream job is restarted with a new or updated state 
operation, please" +
-          " create a new checkpoint location or clear the existing checkpoint 
location.", f)
+      throw 
QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f)
   }
   private val input: DataInputStream = decompressStream(sourceStream)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index e31b05c362f..b4b67f381d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
@@ -123,12 +124,37 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
   }
 
   test("RocksDB: load version that doesn't exist") {
+    val provider = new RocksDBStateStoreProvider()
+    var ex = intercept[SparkException] {
+      provider.getStore(-1)
+    }
+    checkError(
+      ex,
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+      parameters = Map.empty
+    )
+    ex = intercept[SparkException] {
+      provider.getReadStore(-1)
+    }
+    checkError(
+      ex,
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+      parameters = Map.empty
+    )
+
     val remoteDir = Utils.createTempDir().toString
     new File(remoteDir).delete()  // to make sure that the directory gets 
created
     withDB(remoteDir) { db =>
-      intercept[IllegalStateException] {
+      ex = intercept[SparkException] {
         db.load(1)
       }
+      checkError(
+        ex,
+        errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+        parameters = Map(
+          "fileToRead" -> s"$remoteDir/1.changelog"
+        )
+      )
     }
   }
 
@@ -704,12 +730,21 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.load(0)  // Current thread should be able to load again
 
         // Another thread should not be able to load while current thread is 
using it
-        val ex = intercept[IllegalStateException] {
+        var ex = intercept[SparkException] {
           ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
         }
-        // Assert that the error message contains the stack trace
-        assert(ex.getMessage.contains("Thread holding the lock has trace:"))
-        assert(ex.getMessage.contains("runInNewThread"))
+        checkError(
+          ex,
+          errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+          parameters = Map(
+            "loggingId" -> "\\[Thread-\\d+\\]",
+            "newAcquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+            "acquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+            "timeWaitedMs" -> "\\d+",
+            "stackTraceOutput" -> "(?s).*"
+          ),
+          matchPVals = true
+        )
 
         // Commit should release the instance allowing other threads to load 
new version
         db.commit()
@@ -720,9 +755,21 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 
         // Another thread should not be able to load while current thread is 
using it
         db.load(2)
-        intercept[IllegalStateException] {
+        ex = intercept[SparkException] {
           ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
         }
+        checkError(
+          ex,
+          errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+          parameters = Map(
+            "loggingId" -> "\\[Thread-\\d+\\]",
+            "newAcquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+            "acquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]",
+            "timeWaitedMs" -> "\\d+",
+            "stackTraceOutput" -> "(?s).*"
+          ),
+          matchPVals = true
+        )
 
         // Rollback should release the instance allowing other threads to load 
new version
         db.rollback()
@@ -752,6 +799,24 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
   }
 
   test("checkpoint metadata serde roundtrip") {
+    // expect read metadata error when metadata uses unsupported version
+    withTempDir { dir =>
+      val file2 = new File(dir, "json")
+      val json2 = """{"sstFiles":[],"numKeys":0}"""
+      FileUtils.write(file2, s"v2\n$json2")
+      val e = intercept[SparkException] {
+        RocksDBCheckpointMetadata.readFromFile(file2)
+      }
+      checkError(
+        e,
+        errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+        parameters = Map(
+          "expectedVersion" -> "v2",
+          "actualVersion" -> "v1"
+        )
+      )
+    }
+
     def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): 
Unit = {
       assert(metadata.json == json)
       withTempDir { dir =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 02aa12b325f..6c4e259bac5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -236,22 +236,40 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       // Corrupt snapshot file and verify that it throws error
       assert(getData(provider, snapshotVersion) === Set(("a", 0) -> 
snapshotVersion))
       corruptFile(provider, snapshotVersion, isSnapshot = true)
-      intercept[Exception] {
+      var e = intercept[SparkException] {
         getData(provider, snapshotVersion)
       }
+      checkError(
+        e,
+        errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+        parameters = Map.empty
+      )
 
       // Corrupt delta file and verify that it throws error
       assert(getData(provider, snapshotVersion - 1) === Set(("a", 0) -> 
(snapshotVersion - 1)))
       corruptFile(provider, snapshotVersion - 1, isSnapshot = false)
-      intercept[Exception] {
+      e = intercept[SparkException] {
         getData(provider, snapshotVersion - 1)
       }
+      checkError(
+        e,
+        errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+        parameters = Map.empty
+      )
 
       // Delete delta file and verify that it throws error
       deleteFilesEarlierThanVersion(provider, snapshotVersion)
-      intercept[Exception] {
+      e = intercept[SparkException] {
         getData(provider, snapshotVersion - 1)
       }
+      checkError(
+        e.getCause.asInstanceOf[SparkThrowable],
+        errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+        parameters = Map(
+          "fileToRead" -> 
s"${provider.stateStoreId.storeCheckpointLocation()}/1.delta",
+          "clazz" -> s"${provider.toString}"
+        )
+      )
     }
   }
 
@@ -900,12 +918,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
       assert(getLatestData(provider) === Set(("b", 0) -> 2))
 
       // Trying to get newer versions should fail
-      intercept[Exception] {
+      var e = intercept[SparkException] {
         provider.getStore(2)
       }
-      intercept[Exception] {
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("does not exist"))
+
+      e = intercept[SparkException] {
         getData(provider, 2)
       }
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("does not exist"))
 
       // New updates to the reloaded store with new version, and does not 
change old version
       tryWithProviderResource(newStoreProvider(store.id)) { reloadedProvider =>
@@ -1043,9 +1066,14 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   testWithAllCodec("getStore with invalid versions") {
     tryWithProviderResource(newStoreProvider()) { provider =>
       def checkInvalidVersion(version: Int): Unit = {
-        intercept[Exception] {
+        val e = intercept[SparkException] {
           provider.getStore(version)
         }
+        checkError(
+          e,
+          errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+          parameters = Map.empty
+        )
       }
 
       checkInvalidVersion(-1)
@@ -1196,16 +1224,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
           val hadoopConf = new Configuration()
 
           // Verify that trying to get incorrect versions throw errors
-          intercept[IllegalArgumentException] {
+          var e = intercept[SparkException] {
             StateStore.get(
               storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf)
           }
-          assert(!StateStore.isLoaded(storeId)) // version -1 should not 
attempt to load the store
-
-          intercept[IllegalStateException] {
+          checkError(
+            e,
+            errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+            parameters = Map(
+              "version" -> "-1"
+            )
+          )
+
+          e = intercept[SparkException] {
             StateStore.get(
               storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
           }
+          checkError(
+            e.getCause.asInstanceOf[SparkThrowable],
+            errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+            parameters = Map(
+              "fileToRead" -> s"$dir/0/0/1.delta",
+              "clazz" -> "HDFSStateStoreProvider\\[.+\\]"
+            ),
+            matchPVals = true
+          )
 
           // Increase version of the store and try to get again
           val store0 = StateStore.get(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to