This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e65dd7848c4 [SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGACY_ERROR_TEMP_2226-2250 e65dd7848c4 is described below commit e65dd7848c44d1a087490a08f69834f0ec5adb27 Author: itholic <haejoon....@databricks.com> AuthorDate: Mon Oct 17 10:40:26 2022 +0300 [SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGACY_ERROR_TEMP_2226-2250 ### What changes were proposed in this pull request? This PR proposes to migrate 25 execution errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_2226` to `_LEGACY_ERROR_TEMP_2250`. The error classes are prefixed with `_LEGACY_ERROR_TEMP_` indicates the dev-facing error messages, and won't be exposed to end users. ### Why are the changes needed? To speed-up the error class migration. The migration on temporary error classes allow us to analyze the errors, so we can detect the most popular error classes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "test:testOnly *SQLQuerySuite" $ build/sbt -Phive-thriftserver "hive-thriftserver/testOnly org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite" ``` Closes #38173 from itholic/SPARK-40540-2226-2250. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 126 +++++++++++++ .../spark/sql/errors/QueryExecutionErrors.scala | 201 ++++++++++++++------- .../test/scala/org/apache/spark/sql/RowTest.scala | 3 +- 3 files changed, 267 insertions(+), 63 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 8462fe2a42b..784b8c04d89 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -4081,5 +4081,131 @@ "this message in error, you can disable it with the SQL conf", "<StreamingCheckpointEscaptedPathCheckEnabled>." ] + }, + "_LEGACY_ERROR_TEMP_2226" : { + "message" : [ + "null literals can't be casted to <name>" + ] + }, + "_LEGACY_ERROR_TEMP_2227" : { + "message" : [ + "<name> is not an UserDefinedType. Please make sure registering an UserDefinedType for <userClass>" + ] + }, + "_LEGACY_ERROR_TEMP_2228" : { + "message" : [ + "Can not load in UserDefinedType <name> for user class <userClass>." + ] + }, + "_LEGACY_ERROR_TEMP_2229" : { + "message" : [ + "<name> is not a public class. Only public classes are supported." + ] + }, + "_LEGACY_ERROR_TEMP_2230" : { + "message" : [ + "Primitive types are not supported." + ] + }, + "_LEGACY_ERROR_TEMP_2231" : { + "message" : [ + "fieldIndex on a Row without schema is undefined." + ] + }, + "_LEGACY_ERROR_TEMP_2232" : { + "message" : [ + "Value at index <index> is null" + ] + }, + "_LEGACY_ERROR_TEMP_2233" : { + "message" : [ + "Only Data Sources providing FileFormat are supported: <providingClass>" + ] + }, + "_LEGACY_ERROR_TEMP_2234" : { + "message" : [ + "Failed to set original ACL <aclEntries> back to the created path: <path>. Exception: <message>" + ] + }, + "_LEGACY_ERROR_TEMP_2235" : { + "message" : [ + "Multiple failures in stage materialization." + ] + }, + "_LEGACY_ERROR_TEMP_2236" : { + "message" : [ + "Unrecognized compression scheme type ID: <typeId>" + ] + }, + "_LEGACY_ERROR_TEMP_2237" : { + "message" : [ + "<className>.getParentLogger is not yet implemented." + ] + }, + "_LEGACY_ERROR_TEMP_2238" : { + "message" : [ + "Unable to create Parquet converter for <typeName> whose Parquet type is <parquetType> without decimal metadata. Please read this column/field as Spark BINARY type." + ] + }, + "_LEGACY_ERROR_TEMP_2239" : { + "message" : [ + "Unable to create Parquet converter for decimal type <t> whose Parquet type is <parquetType>. Parquet DECIMAL type can only be backed by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or BINARY." + ] + }, + "_LEGACY_ERROR_TEMP_2240" : { + "message" : [ + "Unable to create Parquet converter for data type <t> whose Parquet type is <parquetType>" + ] + }, + "_LEGACY_ERROR_TEMP_2241" : { + "message" : [ + "Nonatomic partition table <tableName> can not add multiple partitions." + ] + }, + "_LEGACY_ERROR_TEMP_2242" : { + "message" : [ + "<provider> source does not support user-specified schema." + ] + }, + "_LEGACY_ERROR_TEMP_2243" : { + "message" : [ + "Nonatomic partition table <tableName> can not drop multiple partitions." + ] + }, + "_LEGACY_ERROR_TEMP_2244" : { + "message" : [ + "The table <tableName> does not support truncation of multiple partition." + ] + }, + "_LEGACY_ERROR_TEMP_2245" : { + "message" : [ + "Table does not support overwrite by expression: <table>" + ] + }, + "_LEGACY_ERROR_TEMP_2246" : { + "message" : [ + "Table does not support dynamic partition overwrite: <table>" + ] + }, + "_LEGACY_ERROR_TEMP_2247" : { + "message" : [ + "Failed merging schema:", + "<schema>" + ] + }, + "_LEGACY_ERROR_TEMP_2248" : { + "message" : [ + "Cannot broadcast the table over <maxBroadcastTableRows> rows: <numRows> rows" + ] + }, + "_LEGACY_ERROR_TEMP_2249" : { + "message" : [ + "Cannot broadcast the table that is larger than <maxBroadcastTableBytes>GB: <dataSize> GB" + ] + }, + "_LEGACY_ERROR_TEMP_2250" : { + "message" : [ + "Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting <autoBroadcastjoinThreshold> to -1 or increase the spark driver memory by setting <driverMemory> to a higher value<analyzeTblMsg>" + ] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 10b197b0b94..89c0cf5fafa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2134,38 +2134,64 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { new UnsupportedOperationException } - def nullLiteralsCannotBeCastedError(name: String): Throwable = { - new UnsupportedOperationException(s"null literals can't be casted to $name") + def nullLiteralsCannotBeCastedError(name: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2226", + messageParameters = Map( + "name" -> name)) } def notUserDefinedTypeError(name: String, userClass: String): Throwable = { - new SparkException(s"$name is not an UserDefinedType. Please make sure registering " + - s"an UserDefinedType for ${userClass}") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2227", + messageParameters = Map( + "name" -> name, + "userClass" -> userClass), + cause = null) } def cannotLoadUserDefinedTypeError(name: String, userClass: String): Throwable = { - new SparkException(s"Can not load in UserDefinedType ${name} for user class ${userClass}.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2228", + messageParameters = Map( + "name" -> name, + "userClass" -> userClass), + cause = null) } - def notPublicClassError(name: String): Throwable = { - new UnsupportedOperationException( - s"$name is not a public class. Only public classes are supported.") + def notPublicClassError(name: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2229", + messageParameters = Map( + "name" -> name)) } - def primitiveTypesNotSupportedError(): Throwable = { - new UnsupportedOperationException("Primitive types are not supported.") + def primitiveTypesNotSupportedError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2230", + messageParameters = Map.empty) } - def fieldIndexOnRowWithoutSchemaError(): Throwable = { - new UnsupportedOperationException("fieldIndex on a Row without schema is undefined.") + def fieldIndexOnRowWithoutSchemaError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2231", + messageParameters = Map.empty) } def valueIsNullError(index: Int): Throwable = { - new NullPointerException(s"Value at index ${toSQLValue(index, IntegerType)} is null") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2232", + messageParameters = Map( + "index" -> toSQLValue(index, IntegerType)), + cause = null) } def onlySupportDataSourcesProvidingFileFormatError(providingClass: String): Throwable = { - new SparkException(s"Only Data Sources providing FileFormat are supported: $providingClass") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2233", + messageParameters = Map( + "providingClass" -> providingClass), + cause = null) } def failToSetOriginalPermissionBackError( @@ -2180,90 +2206,139 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "message" -> e.getMessage)) } - def failToSetOriginalACLBackError(aclEntries: String, path: Path, e: Throwable): Throwable = { - new SecurityException(s"Failed to set original ACL $aclEntries back to " + - s"the created path: $path. Exception: ${e.getMessage}") + def failToSetOriginalACLBackError( + aclEntries: String, path: Path, e: Throwable): SparkSecurityException = { + new SparkSecurityException( + errorClass = "_LEGACY_ERROR_TEMP_2234", + messageParameters = Map( + "aclEntries" -> aclEntries, + "path" -> path.toString(), + "message" -> e.getMessage)) } def multiFailuresInStageMaterializationError(error: Throwable): Throwable = { - new SparkException("Multiple failures in stage materialization.", error) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2235", + messageParameters = Map.empty, + cause = error) } - def unrecognizedCompressionSchemaTypeIDError(typeId: Int): Throwable = { - new UnsupportedOperationException(s"Unrecognized compression scheme type ID: $typeId") + def unrecognizedCompressionSchemaTypeIDError(typeId: Int): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2236", + messageParameters = Map( + "typeId" -> typeId.toString())) } - def getParentLoggerNotImplementedError(className: String): Throwable = { - new SQLFeatureNotSupportedException(s"$className.getParentLogger is not yet implemented.") + def getParentLoggerNotImplementedError( + className: String): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2237", + messageParameters = Map( + "className" -> className)) } - def cannotCreateParquetConverterForTypeError(t: DecimalType, parquetType: String): Throwable = { - new RuntimeException( - s""" - |Unable to create Parquet converter for ${t.typeName} - |whose Parquet type is $parquetType without decimal metadata. Please read this - |column/field as Spark BINARY type. - """.stripMargin.replaceAll("\n", " ")) + def cannotCreateParquetConverterForTypeError( + t: DecimalType, parquetType: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2238", + messageParameters = Map( + "typeName" -> t.typeName, + "parquetType" -> parquetType)) } def cannotCreateParquetConverterForDecimalTypeError( - t: DecimalType, parquetType: String): Throwable = { - new RuntimeException( - s""" - |Unable to create Parquet converter for decimal type ${t.json} whose Parquet type is - |$parquetType. Parquet DECIMAL type can only be backed by INT32, INT64, - |FIXED_LEN_BYTE_ARRAY, or BINARY. - """.stripMargin.replaceAll("\n", " ")) + t: DecimalType, parquetType: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2239", + messageParameters = Map( + "t" -> t.json, + "parquetType" -> parquetType)) } def cannotCreateParquetConverterForDataTypeError( - t: DataType, parquetType: String): Throwable = { - new RuntimeException(s"Unable to create Parquet converter for data type ${t.json} " + - s"whose Parquet type is $parquetType") + t: DataType, parquetType: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2240", + messageParameters = Map( + "t" -> t.json, + "parquetType" -> parquetType)) } - def cannotAddMultiPartitionsOnNonatomicPartitionTableError(tableName: String): Throwable = { - new UnsupportedOperationException( - s"Nonatomic partition table $tableName can not add multiple partitions.") + def cannotAddMultiPartitionsOnNonatomicPartitionTableError( + tableName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2241", + messageParameters = Map( + "tableName" -> tableName)) } - def userSpecifiedSchemaUnsupportedByDataSourceError(provider: TableProvider): Throwable = { - new UnsupportedOperationException( - s"${provider.getClass.getSimpleName} source does not support user-specified schema.") + def userSpecifiedSchemaUnsupportedByDataSourceError( + provider: TableProvider): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2242", + messageParameters = Map( + "provider" -> provider.getClass.getSimpleName)) } - def cannotDropMultiPartitionsOnNonatomicPartitionTableError(tableName: String): Throwable = { - new UnsupportedOperationException( - s"Nonatomic partition table $tableName can not drop multiple partitions.") + def cannotDropMultiPartitionsOnNonatomicPartitionTableError( + tableName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2243", + messageParameters = Map( + "tableName" -> tableName)) } - def truncateMultiPartitionUnsupportedError(tableName: String): Throwable = { - new UnsupportedOperationException( - s"The table $tableName does not support truncation of multiple partition.") + def truncateMultiPartitionUnsupportedError( + tableName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2244", + messageParameters = Map( + "tableName" -> tableName)) } def overwriteTableByUnsupportedExpressionError(table: Table): Throwable = { - new SparkException(s"Table does not support overwrite by expression: $table") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2245", + messageParameters = Map( + "table" -> table.toString()), + cause = null) } def dynamicPartitionOverwriteUnsupportedByTableError(table: Table): Throwable = { - new SparkException(s"Table does not support dynamic partition overwrite: $table") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2246", + messageParameters = Map( + "table" -> table.toString()), + cause = null) } def failedMergingSchemaError(schema: StructType, e: SparkException): Throwable = { - new SparkException(s"Failed merging schema:\n${schema.treeString}", e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2247", + messageParameters = Map( + "schema" -> schema.treeString), + cause = e) } def cannotBroadcastTableOverMaxTableRowsError( maxBroadcastTableRows: Long, numRows: Long): Throwable = { new SparkException( - s"Cannot broadcast the table over $maxBroadcastTableRows rows: $numRows rows") + errorClass = "_LEGACY_ERROR_TEMP_2248", + messageParameters = Map( + "maxBroadcastTableRows" -> maxBroadcastTableRows.toString(), + "numRows" -> numRows.toString()), + cause = null) } def cannotBroadcastTableOverMaxTableBytesError( maxBroadcastTableBytes: Long, dataSize: Long): Throwable = { - new SparkException("Cannot broadcast the table that is larger than" + - s" ${maxBroadcastTableBytes >> 30}GB: ${dataSize >> 30} GB") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2249", + messageParameters = Map( + "maxBroadcastTableBytes" -> (maxBroadcastTableBytes >> 30).toString(), + "dataSize" -> (dataSize >> 30).toString()), + cause = null) } def notEnoughMemoryToBuildAndBroadcastTableError( @@ -2274,11 +2349,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } else { "." } - new OutOfMemoryError("Not enough memory to build and broadcast the table to all " + - "worker nodes. As a workaround, you can either disable broadcast by setting " + - s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark " + - s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value$analyzeTblMsg") - .initCause(oe.getCause) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2250", + messageParameters = Map( + "autoBroadcastjoinThreshold" -> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, + "driverMemory" -> SparkLauncher.DRIVER_MEMORY, + "analyzeTblMsg" -> analyzeTblMsg), + cause = oe).initCause(oe.getCause) } def executeCodePathUnsupportedError(execName: String): Throwable = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala index 82731cdb220..ec40989e6b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala @@ -24,6 +24,7 @@ import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema} import org.apache.spark.sql.types._ @@ -86,7 +87,7 @@ class RowTest extends AnyFunSpec with Matchers { } it("getAs() on type extending AnyVal throws an exception when accessing field that is null") { - intercept[NullPointerException] { + intercept[SparkException] { sampleRowWithoutCol3.getInt(sampleRowWithoutCol3.fieldIndex("col3")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org