This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4531d710ba79 [SPARK-45138][SS] Define a new error class and apply it 
when checkpointing state to DFS fails
4531d710ba79 is described below

commit 4531d710ba79216c8c6626650675f83425c06fa4
Author: Neil Ramaswamy <neil.ramasw...@databricks.com>
AuthorDate: Thu Sep 21 11:59:03 2023 +0900

    [SPARK-45138][SS] Define a new error class and apply it when checkpointing 
state to DFS fails
    
    ### What changes were proposed in this pull request?
    
    In this change, we add a new a new error class when checkpointing state to 
the DFS, for either state store provider, fails during `commit`.
    
    ### Why are the changes needed?
    
    Users might be confused when they see an `IOException`, for example, if a 
call to `commit` fails. This is a neat and self-explanatory wrapper around such 
exceptions.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the error message when DFS checkpoint fails is now a `SparkException` 
with error class.
    
    ### How was this patch tested?
    
    - Modified existing UTs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #42895 from neilramaswamy/nr-state-files-commit-logging.
    
    Authored-by: Neil Ramaswamy <neil.ramasw...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    | 12 +++++
 ...ditions-cannot-write-state-store-error-class.md | 32 +++++++++++++
 docs/sql-error-conditions.md                       |  8 ++++
 .../spark/sql/errors/QueryExecutionErrors.scala    |  7 +++
 .../state/HDFSBackedStateStoreProvider.scala       |  8 ++--
 .../state/RocksDBStateStoreProvider.scala          | 15 ++++--
 .../streaming/state/RocksDBStateStoreSuite.scala   |  9 +++-
 .../streaming/state/StateStoreSuite.scala          | 54 +++++++++++++++-------
 8 files changed, 117 insertions(+), 28 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 186e7b4640d8..d92ccfce5c52 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -314,6 +314,18 @@
       "<details>"
     ]
   },
+  "CANNOT_WRITE_STATE_STORE" : {
+    "message" : [
+      "Error writing state store files for provider <providerClass>."
+    ],
+    "subClass" : {
+      "CANNOT_COMMIT" : {
+        "message" : [
+          "Cannot perform commit during state checkpoint."
+        ]
+      }
+    }
+  },
   "CAST_INVALID_INPUT" : {
     "message" : [
       "The value <expression> of the type <sourceType> cannot be cast to 
<targetType> because it is malformed. Correct the value as per the syntax, or 
change its target type. Use `try_cast` to tolerate malformed input and return 
NULL instead. If necessary set <ansiConfig> to \"false\" to bypass this error."
diff --git a/docs/sql-error-conditions-cannot-write-state-store-error-class.md 
b/docs/sql-error-conditions-cannot-write-state-store-error-class.md
new file mode 100644
index 000000000000..ab7b852f892b
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-write-state-store-error-class.md
@@ -0,0 +1,32 @@
+---
+layout: global
+title: CANNOT_WRITE_STATE_STORE error class
+displayTitle: CANNOT_WRITE_STATE_STORE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+SQLSTATE: none assigned
+
+Error writing state store files for provider `<providerClass>`.
+
+This error class has the following derived error classes:
+
+## CANNOT_COMMIT
+
+Cannot perform commit during state checkpoint.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 4f982e52bc86..1df00f72bc97 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -271,6 +271,14 @@ SQLSTATE: none assigned
 Cannot up cast `<expression>` from `<sourceType>` to `<targetType>`.
 `<details>`
 
+### 
[CANNOT_WRITE_STATE_STORE](sql-error-conditions-cannot-write-state-store-error-class.html)
+
+SQLSTATE: none assigned
+
+Error writing state store files for provider `<providerClass>`.
+
+For more details see 
[CANNOT_WRITE_STATE_STORE](sql-error-conditions-cannot-write-state-store-error-class.html)
+
 ### CAST_INVALID_INPUT
 
 [SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 643cbc3cbdb9..e14fef1fad72 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2249,6 +2249,13 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       cause = f)
   }
 
+  def failedToCommitStateFileError(providerClass: String, f: Throwable): 
Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT",
+      messageParameters = Map("providerClass" -> providerClass),
+      cause = f)
+  }
+
   def cannotPurgeAsBreakInternalStateError(): 
SparkUnsupportedOperationException = {
     new SparkUnsupportedOperationException(
       errorClass = "_LEGACY_ERROR_TEMP_2260",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index afa1fdaa2237..66832400aa14 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -135,17 +135,15 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
     /** Commit all the updates that have been made to the store, and return 
the new version. */
     override def commit(): Long = {
-      verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
-
       try {
+        verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
         commitUpdates(newVersion, mapToUpdate, compressedStream)
         state = COMMITTED
         logInfo(s"Committed version $newVersion for $this to file 
$finalDeltaFile")
         newVersion
       } catch {
-        case NonFatal(e) =>
-          throw new IllegalStateException(
-            s"Error committing version $newVersion into $this", e)
+        case e: Throwable =>
+          throw 
QueryExecutionErrors.failedToCommitStateFileError(this.toString(), e)
       }
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 37a8785f04d6..4254640201c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -90,11 +90,16 @@ private[sql] class RocksDBStateStoreProvider
     }
 
     override def commit(): Long = synchronized {
-      verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
-      val newVersion = rocksDB.commit()
-      state = COMMITTED
-      logInfo(s"Committed $newVersion for $id")
-      newVersion
+      try {
+        verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
+        val newVersion = rocksDB.commit()
+        state = COMMITTED
+        logInfo(s"Committed $newVersion for $id")
+        newVersion
+      } catch {
+        case e: Throwable =>
+          throw 
QueryExecutionErrors.failedToCommitStateFileError(this.toString(), e)
+      }
     }
 
     override def abort(): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index d113085fd1c4..d1cc7e0b3b9c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -152,6 +152,10 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     newStoreProvider(storeId, numColsPrefixKey = 0)
   }
 
+  def newStoreProvider(storeId: StateStoreId, conf: Configuration): 
RocksDBStateStoreProvider = {
+    newStoreProvider(storeId, numColsPrefixKey = -1, conf = conf)
+  }
+
   override def newStoreProvider(numPrefixCols: Int): RocksDBStateStoreProvider 
= {
     newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 
numColsPrefixKey = numPrefixCols)
   }
@@ -159,11 +163,12 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
   def newStoreProvider(
       storeId: StateStoreId,
       numColsPrefixKey: Int,
-      sqlConf: Option[SQLConf] = None): RocksDBStateStoreProvider = {
+      sqlConf: Option[SQLConf] = None,
+      conf: Configuration = new Configuration): RocksDBStateStoreProvider = {
     val provider = new RocksDBStateStoreProvider()
     provider.init(
       storeId, keySchema, valueSchema, numColsPrefixKey = numColsPrefixKey,
-      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), new Configuration)
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
     provider
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 093f42620112..e6d2f63267fd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -534,21 +534,6 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
-  testQuietly("SPARK-18342: commit fails when rename fails") {
-    import RenameReturnsFalseFileSystem._
-    val dir = scheme + "://" + newDir()
-    val conf = new Configuration()
-    conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
-    tryWithProviderResource(newStoreProvider(
-      opId = Random.nextInt, partition = 0, dir = dir, hadoopConf = conf)) { 
provider =>
-
-      val store = provider.getStore(0)
-      put(store, "a", 0, 0)
-      val e = intercept[IllegalStateException](store.commit())
-      assert(e.getCause.getMessage.contains("Failed to rename"))
-    }
-  }
-
   test("SPARK-18416: do not create temp delta file until the store is 
updated") {
     val dir = newDir()
     val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0), 
UUID.randomUUID)
@@ -692,8 +677,9 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       // Fail commit for next version and verify that reloading resets the 
files
       CreateAtomicTestManager.shouldFailInCreateAtomic = true
       put(store, "11", 0, 11)
-      val e = intercept[IllegalStateException] { quietly { store.commit() } }
+      val e = intercept[SparkException] { quietly { store.commit() } }
       assert(e.getCause.isInstanceOf[IOException])
+      assert(e.getMessage.contains("Cannot perform commit"))
       CreateAtomicTestManager.shouldFailInCreateAtomic = false
 
       // Abort commit for next version and verify that reloading resets the 
files
@@ -799,6 +785,14 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     newStoreProvider(storeId.operatorId, storeId.partitionId, dir = 
storeId.checkpointRootLocation)
   }
 
+  def newStoreProvider(storeId: StateStoreId, conf: Configuration): 
HDFSBackedStateStoreProvider = {
+    newStoreProvider(
+      storeId.operatorId,
+      storeId.partitionId,
+      dir = storeId.checkpointRootLocation,
+      hadoopConf = conf)
+  }
+
   override def newStoreProvider(
       minDeltasForSnapshot: Int,
       numOfVersToRetainInMemory: Int): HDFSBackedStateStoreProvider = {
@@ -1152,6 +1146,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     }
   }
 
+  testQuietly("SPARK-18342: commit fails when rename fails") {
+    import RenameReturnsFalseFileSystem._
+
+    val ROCKSDB_STATE_STORE = "RocksDBStateStore"
+    val dir = scheme + "://" + newDir()
+    val conf = new Configuration()
+    conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
+
+    val storeId = StateStoreId(dir, operatorId = 0, partitionId = 0)
+    tryWithProviderResource(newStoreProvider(storeId, conf)) { provider =>
+      val store = provider.getStore(0)
+      put(store, "a", 0, 0)
+      val e = intercept[SparkException](quietly { store.commit() } )
+
+      assert(e.getErrorClass == "CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT")
+      if (store.getClass.getName contains ROCKSDB_STATE_STORE) {
+        assert(e.getMessage contains "RocksDBStateStore[id=(op=0,part=0)")
+      } else {
+        assert(e.getMessage contains "HDFSStateStore[id=(op=0,part=0)")
+      }
+      assert(e.getMessage contains "Error writing state store files")
+      assert(e.getCause.getMessage.contains("Failed to rename"))
+    }
+  }
+
   // This test illustrates state store iterator behavior differences leading 
to SPARK-38320.
   testWithAllCodec("SPARK-38320 - state store iterator behavior differences") {
     val ROCKSDB_STATE_STORE = "RocksDBStateStore"
@@ -1430,6 +1449,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   /** Return a new provider with the given id */
   def newStoreProvider(storeId: StateStoreId): ProviderClass
 
+  /** Return a new provider with the given id and configuration */
+  def newStoreProvider(storeId: StateStoreId, conf: Configuration): 
ProviderClass
+
   /** Return a new provider with minimum delta and version to retain in memory 
*/
   def newStoreProvider(minDeltasForSnapshot: Int, numOfVersToRetainInMemory: 
Int): ProviderClass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to