This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6e60b232c769 [SPARK-46968][SQL] Replace
`UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql`
6e60b232c769 is described below
commit 6e60b232c7693738b1d005858e5dac24e7bafcaf
Author: Max Gekk <[email protected]>
AuthorDate: Sat Feb 3 00:22:06 2024 -0800
[SPARK-46968][SQL] Replace `UnsupportedOperationException` by
`SparkUnsupportedOperationException` in `sql`
### What changes were proposed in this pull request?
In the PR, I propose to replace all `UnsupportedOperationException` by
`SparkUnsupportedOperationException` in `sql` code base, and introduce new
legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix.
### Why are the changes needed?
To unify Spark SQL exception, and port Java exceptions on Spark exceptions
with error classes.
### Does this PR introduce _any_ user-facing change?
Yes, it can if user's code assumes some particular format of
`UnsupportedOperationException` messages.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44937 from MaxGekk/migrate-UnsupportedOperationException-api.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
common/utils/src/main/resources/error/error-classes.json | 10 ++++++++++
.../org/apache/spark/sql/catalyst/trees/QueryContexts.scala | 12 ++++++------
.../scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala | 3 ++-
.../org/apache/spark/sql/execution/UnsafeRowSerializer.scala | 2 +-
.../sql/execution/streaming/CompactibleFileStreamLog.scala | 4 ++--
.../spark/sql/execution/streaming/ValueStateImpl.scala | 2 --
.../streaming/state/HDFSBackedStateStoreProvider.scala | 5 ++---
.../apache/spark/sql/execution/streaming/state/RocksDB.scala | 7 ++++---
8 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 8399311cbfc4..ef9e81c98e05 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -7489,6 +7489,16 @@
"Datatype not supported <dt>"
]
},
+ "_LEGACY_ERROR_TEMP_3193" : {
+ "message" : [
+ "Creating multiple column families with HDFSBackedStateStoreProvider is
not supported"
+ ]
+ },
+ "_LEGACY_ERROR_TEMP_3197" : {
+ "message" : [
+ "Failed to create column family with reserved name=<colFamilyName>"
+ ]
+ },
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
index 57271e535afb..c716002ef35c 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.trees
-import org.apache.spark.{QueryContext, QueryContextType}
+import org.apache.spark.{QueryContext, QueryContextType,
SparkUnsupportedOperationException}
/** The class represents error context of a SQL query. */
case class SQLQueryContext(
@@ -131,16 +131,16 @@ case class SQLQueryContext(
originStartIndex.get <= originStopIndex.get
}
- override def callSite: String = throw new UnsupportedOperationException
+ override def callSite: String = throw SparkUnsupportedOperationException()
}
case class DataFrameQueryContext(stackTrace: Seq[StackTraceElement]) extends
QueryContext {
override val contextType = QueryContextType.DataFrame
- override def objectType: String = throw new UnsupportedOperationException
- override def objectName: String = throw new UnsupportedOperationException
- override def startIndex: Int = throw new UnsupportedOperationException
- override def stopIndex: Int = throw new UnsupportedOperationException
+ override def objectType: String = throw SparkUnsupportedOperationException()
+ override def objectName: String = throw SparkUnsupportedOperationException()
+ override def startIndex: Int = throw SparkUnsupportedOperationException()
+ override def stopIndex: Int = throw SparkUnsupportedOperationException()
override val fragment: String = {
stackTrace.headOption.map { firstElem =>
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
index 98768a35e8a5..a98aa26d02ef 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.util
import scala.util.control.NonFatal
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.util.SparkClassUtils
@@ -53,6 +54,6 @@ private[sql] object UDTUtils extends UDTUtils {
private[sql] object DefaultUDTUtils extends UDTUtils {
override def toRow(value: Any, udt: UserDefinedType[Any]): Any = {
- throw new UnsupportedOperationException()
+ throw SparkUnsupportedOperationException()
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 8563bbcd7960..42fcfa8d60fa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -37,7 +37,7 @@ import org.apache.spark.unsafe.Platform
* instance that is backed by an on-heap byte array.
*
* Note that this serializer implements only the [[Serializer]] methods that
are used during
- * shuffle, so certain [[SerializerInstance]] methods will throw
UnsupportedOperationException.
+ * shuffle, so certain [[SerializerInstance]] methods will throw
SparkUnsupportedOperationException.
*
* @param numFields the number of fields in the row being serialized.
*/
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index ef7cefe2394d..8d38bba1f2a6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -178,8 +178,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef :
ClassTag](
* CompactibleFileStreamLog maintains logs by itself, and manual purging
might break internal
* state, specifically which latest compaction batch is purged.
*
- * To simplify the situation, this method just throws
UnsupportedOperationException regardless
- * of given parameter, and let CompactibleFileStreamLog handles purging by
itself.
+ * To simplify the situation, this method just throws
SparkUnsupportedOperationException
+ * regardless of given parameter, and let CompactibleFileStreamLog handles
purging by itself.
*/
override def purge(thresholdBatchId: Long): Unit =
throw QueryExecutionErrors.cannotPurgeAsBreakInternalStateError()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
index d82ce5ba1125..11ae7f65b43d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.streaming
-import java.io.Serializable
-
import org.apache.commons.lang3.SerializationUtils
import org.apache.spark.internal.Logging
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index ffb618d0fbb0..dd04053c5471 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv,
SparkUnsupportedOperationException}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -115,8 +115,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
override def id: StateStoreId =
HDFSBackedStateStoreProvider.this.stateStoreId
override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
- throw new UnsupportedOperationException("Creating multiple column
families with " +
- "HDFSBackedStateStoreProvider is not supported")
+ throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3193")
}
override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index bf1a1c50d350..b3d981e4b25d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _}
import org.rocksdb.CompressionType._
import org.rocksdb.TickerType._
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkUnsupportedOperationException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -252,8 +252,9 @@ class RocksDB(
*/
def createColFamilyIfAbsent(colFamilyName: String): Unit = {
if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
- throw new UnsupportedOperationException("Failed to create column family
with reserved " +
- s"name=$colFamilyName")
+ throw new SparkUnsupportedOperationException(
+ errorClass = "_LEGACY_ERROR_TEMP_3197",
+ messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
}
if (!checkColFamilyExists(colFamilyName)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]