This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4531d710ba79 [SPARK-45138][SS] Define a new error class and apply it when checkpointing state to DFS fails 4531d710ba79 is described below commit 4531d710ba79216c8c6626650675f83425c06fa4 Author: Neil Ramaswamy <neil.ramasw...@databricks.com> AuthorDate: Thu Sep 21 11:59:03 2023 +0900 [SPARK-45138][SS] Define a new error class and apply it when checkpointing state to DFS fails ### What changes were proposed in this pull request? In this change, we add a new a new error class when checkpointing state to the DFS, for either state store provider, fails during `commit`. ### Why are the changes needed? Users might be confused when they see an `IOException`, for example, if a call to `commit` fails. This is a neat and self-explanatory wrapper around such exceptions. ### Does this PR introduce _any_ user-facing change? Yes, the error message when DFS checkpoint fails is now a `SparkException` with error class. ### How was this patch tested? - Modified existing UTs ### Was this patch authored or co-authored using generative AI tooling? No Closes #42895 from neilramaswamy/nr-state-files-commit-logging. Authored-by: Neil Ramaswamy <neil.ramasw...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 12 +++++ ...ditions-cannot-write-state-store-error-class.md | 32 +++++++++++++ docs/sql-error-conditions.md | 8 ++++ .../spark/sql/errors/QueryExecutionErrors.scala | 7 +++ .../state/HDFSBackedStateStoreProvider.scala | 8 ++-- .../state/RocksDBStateStoreProvider.scala | 15 ++++-- .../streaming/state/RocksDBStateStoreSuite.scala | 9 +++- .../streaming/state/StateStoreSuite.scala | 54 +++++++++++++++------- 8 files changed, 117 insertions(+), 28 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 186e7b4640d8..d92ccfce5c52 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -314,6 +314,18 @@ "<details>" ] }, + "CANNOT_WRITE_STATE_STORE" : { + "message" : [ + "Error writing state store files for provider <providerClass>." + ], + "subClass" : { + "CANNOT_COMMIT" : { + "message" : [ + "Cannot perform commit during state checkpoint." + ] + } + } + }, "CAST_INVALID_INPUT" : { "message" : [ "The value <expression> of the type <sourceType> cannot be cast to <targetType> because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set <ansiConfig> to \"false\" to bypass this error." diff --git a/docs/sql-error-conditions-cannot-write-state-store-error-class.md b/docs/sql-error-conditions-cannot-write-state-store-error-class.md new file mode 100644 index 000000000000..ab7b852f892b --- /dev/null +++ b/docs/sql-error-conditions-cannot-write-state-store-error-class.md @@ -0,0 +1,32 @@ +--- +layout: global +title: CANNOT_WRITE_STATE_STORE error class +displayTitle: CANNOT_WRITE_STATE_STORE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +SQLSTATE: none assigned + +Error writing state store files for provider `<providerClass>`. + +This error class has the following derived error classes: + +## CANNOT_COMMIT + +Cannot perform commit during state checkpoint. + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 4f982e52bc86..1df00f72bc97 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -271,6 +271,14 @@ SQLSTATE: none assigned Cannot up cast `<expression>` from `<sourceType>` to `<targetType>`. `<details>` +### [CANNOT_WRITE_STATE_STORE](sql-error-conditions-cannot-write-state-store-error-class.html) + +SQLSTATE: none assigned + +Error writing state store files for provider `<providerClass>`. + +For more details see [CANNOT_WRITE_STATE_STORE](sql-error-conditions-cannot-write-state-store-error-class.html) + ### CAST_INVALID_INPUT [SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 643cbc3cbdb9..e14fef1fad72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2249,6 +2249,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE cause = f) } + def failedToCommitStateFileError(providerClass: String, f: Throwable): Throwable = { + new SparkException( + errorClass = "CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT", + messageParameters = Map("providerClass" -> providerClass), + cause = f) + } + def cannotPurgeAsBreakInternalStateError(): SparkUnsupportedOperationException = { new SparkUnsupportedOperationException( errorClass = "_LEGACY_ERROR_TEMP_2260", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index afa1fdaa2237..66832400aa14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -135,17 +135,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with /** Commit all the updates that have been made to the store, and return the new version. */ override def commit(): Long = { - verify(state == UPDATING, "Cannot commit after already committed or aborted") - try { + verify(state == UPDATING, "Cannot commit after already committed or aborted") commitUpdates(newVersion, mapToUpdate, compressedStream) state = COMMITTED logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") newVersion } catch { - case NonFatal(e) => - throw new IllegalStateException( - s"Error committing version $newVersion into $this", e) + case e: Throwable => + throw QueryExecutionErrors.failedToCommitStateFileError(this.toString(), e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 37a8785f04d6..4254640201c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -90,11 +90,16 @@ private[sql] class RocksDBStateStoreProvider } override def commit(): Long = synchronized { - verify(state == UPDATING, "Cannot commit after already committed or aborted") - val newVersion = rocksDB.commit() - state = COMMITTED - logInfo(s"Committed $newVersion for $id") - newVersion + try { + verify(state == UPDATING, "Cannot commit after already committed or aborted") + val newVersion = rocksDB.commit() + state = COMMITTED + logInfo(s"Committed $newVersion for $id") + newVersion + } catch { + case e: Throwable => + throw QueryExecutionErrors.failedToCommitStateFileError(this.toString(), e) + } } override def abort(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index d113085fd1c4..d1cc7e0b3b9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -152,6 +152,10 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid newStoreProvider(storeId, numColsPrefixKey = 0) } + def newStoreProvider(storeId: StateStoreId, conf: Configuration): RocksDBStateStoreProvider = { + newStoreProvider(storeId, numColsPrefixKey = -1, conf = conf) + } + override def newStoreProvider(numPrefixCols: Int): RocksDBStateStoreProvider = { newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), numColsPrefixKey = numPrefixCols) } @@ -159,11 +163,12 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid def newStoreProvider( storeId: StateStoreId, numColsPrefixKey: Int, - sqlConf: Option[SQLConf] = None): RocksDBStateStoreProvider = { + sqlConf: Option[SQLConf] = None, + conf: Configuration = new Configuration): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() provider.init( storeId, keySchema, valueSchema, numColsPrefixKey = numColsPrefixKey, - new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), new Configuration) + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 093f42620112..e6d2f63267fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -534,21 +534,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } - testQuietly("SPARK-18342: commit fails when rename fails") { - import RenameReturnsFalseFileSystem._ - val dir = scheme + "://" + newDir() - val conf = new Configuration() - conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) - tryWithProviderResource(newStoreProvider( - opId = Random.nextInt, partition = 0, dir = dir, hadoopConf = conf)) { provider => - - val store = provider.getStore(0) - put(store, "a", 0, 0) - val e = intercept[IllegalStateException](store.commit()) - assert(e.getCause.getMessage.contains("Failed to rename")) - } - } - test("SPARK-18416: do not create temp delta file until the store is updated") { val dir = newDir() val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0), UUID.randomUUID) @@ -692,8 +677,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Fail commit for next version and verify that reloading resets the files CreateAtomicTestManager.shouldFailInCreateAtomic = true put(store, "11", 0, 11) - val e = intercept[IllegalStateException] { quietly { store.commit() } } + val e = intercept[SparkException] { quietly { store.commit() } } assert(e.getCause.isInstanceOf[IOException]) + assert(e.getMessage.contains("Cannot perform commit")) CreateAtomicTestManager.shouldFailInCreateAtomic = false // Abort commit for next version and verify that reloading resets the files @@ -799,6 +785,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] newStoreProvider(storeId.operatorId, storeId.partitionId, dir = storeId.checkpointRootLocation) } + def newStoreProvider(storeId: StateStoreId, conf: Configuration): HDFSBackedStateStoreProvider = { + newStoreProvider( + storeId.operatorId, + storeId.partitionId, + dir = storeId.checkpointRootLocation, + hadoopConf = conf) + } + override def newStoreProvider( minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): HDFSBackedStateStoreProvider = { @@ -1152,6 +1146,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } + testQuietly("SPARK-18342: commit fails when rename fails") { + import RenameReturnsFalseFileSystem._ + + val ROCKSDB_STATE_STORE = "RocksDBStateStore" + val dir = scheme + "://" + newDir() + val conf = new Configuration() + conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) + + val storeId = StateStoreId(dir, operatorId = 0, partitionId = 0) + tryWithProviderResource(newStoreProvider(storeId, conf)) { provider => + val store = provider.getStore(0) + put(store, "a", 0, 0) + val e = intercept[SparkException](quietly { store.commit() } ) + + assert(e.getErrorClass == "CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT") + if (store.getClass.getName contains ROCKSDB_STATE_STORE) { + assert(e.getMessage contains "RocksDBStateStore[id=(op=0,part=0)") + } else { + assert(e.getMessage contains "HDFSStateStore[id=(op=0,part=0)") + } + assert(e.getMessage contains "Error writing state store files") + assert(e.getCause.getMessage.contains("Failed to rename")) + } + } + // This test illustrates state store iterator behavior differences leading to SPARK-38320. testWithAllCodec("SPARK-38320 - state store iterator behavior differences") { val ROCKSDB_STATE_STORE = "RocksDBStateStore" @@ -1430,6 +1449,9 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] /** Return a new provider with the given id */ def newStoreProvider(storeId: StateStoreId): ProviderClass + /** Return a new provider with the given id and configuration */ + def newStoreProvider(storeId: StateStoreId, conf: Configuration): ProviderClass + /** Return a new provider with minimum delta and version to retain in memory */ def newStoreProvider(minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): ProviderClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org