This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 357193d5550 [SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGACY_ERROR_TEMP_2051-2075 357193d5550 is described below commit 357193d55508051369228c5e7e252c17089da317 Author: itholic <haejoon....@databricks.com> AuthorDate: Sun Oct 9 22:45:43 2022 +0300 [SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGACY_ERROR_TEMP_2051-2075 ### What changes were proposed in this pull request? This PR proposes to migrate 25 execution errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_2051` to `_LEGACY_ERROR_TEMP_2075`. The error classes are prefixed with `_LEGACY_ERROR_TEMP_` indicates the dev-facing error messages, and won't be exposed to end users. ### Why are the changes needed? To speed-up the error class migration. The migration on temporary error classes allow us to analyze the errors, so we can detect the most popular error classes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "test:testOnly *SQLQuerySuite" ``` Closes #38116 from itholic/SPARK-40540-2051-2075. Lead-authored-by: itholic <haejoon....@databricks.com> Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 129 +++++++++++++++ .../spark/sql/errors/QueryExecutionErrors.scala | 182 +++++++++++++-------- .../sql/errors/QueryExecutionErrorsSuite.scala | 5 +- .../datasources/parquet/ParquetSchemaSuite.scala | 5 +- 4 files changed, 249 insertions(+), 72 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 146c99d5ca5..3e1655d80e4 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -3264,5 +3264,134 @@ "message" : [ "Expected exactly one path to be specified, but got: <paths>" ] + }, + "_LEGACY_ERROR_TEMP_2051" : { + "message" : [ + "Failed to find data source: <provider>. Please find packages at https://spark.apache.org/third-party-projects.html" + ] + }, + "_LEGACY_ERROR_TEMP_2052" : { + "message" : [ + "<className> was removed in Spark 2.0. Please check if your library is compatible with Spark 2.0" + ] + }, + "_LEGACY_ERROR_TEMP_2053" : { + "message" : [ + "buildReader is not supported for <format>" + ] + }, + "_LEGACY_ERROR_TEMP_2054" : { + "message" : [ + "Task failed while writing rows." + ] + }, + "_LEGACY_ERROR_TEMP_2055" : { + "message" : [ + "<message>", + "It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved." + ] + }, + "_LEGACY_ERROR_TEMP_2056" : { + "message" : [ + "Unable to clear output directory <staticPrefixPath> prior to writing to it" + ] + }, + "_LEGACY_ERROR_TEMP_2057" : { + "message" : [ + "Unable to clear partition directory <path> prior to writing to it" + ] + }, + "_LEGACY_ERROR_TEMP_2058" : { + "message" : [ + "Failed to cast value `<value>` to `<dataType>` for partition column `<columnName>`" + ] + }, + "_LEGACY_ERROR_TEMP_2059" : { + "message" : [ + "End of stream" + ] + }, + "_LEGACY_ERROR_TEMP_2060" : { + "message" : [ + "The fallback v1 relation reports inconsistent schema:", + "Schema of v2 scan: <v2Schema>", + "Schema of v1 relation: <v1Schema>" + ] + }, + "_LEGACY_ERROR_TEMP_2061" : { + "message" : [ + "No records should be returned from EmptyDataReader" + ] + }, + "_LEGACY_ERROR_TEMP_2062" : { + "message" : [ + "<message>", + "It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by recreating the Dataset/DataFrame involved." + ] + }, + "_LEGACY_ERROR_TEMP_2063" : { + "message" : [ + "Parquet column cannot be converted in file <filePath>. Column: <column>, Expected: <logicalType>, Found: <physicalType>" + ] + }, + "_LEGACY_ERROR_TEMP_2064" : { + "message" : [ + "Encountered error while reading file <path>. Details:" + ] + }, + "_LEGACY_ERROR_TEMP_2065" : { + "message" : [ + "Cannot create columnar reader." + ] + }, + "_LEGACY_ERROR_TEMP_2066" : { + "message" : [ + "Invalid namespace name: <namespace>" + ] + }, + "_LEGACY_ERROR_TEMP_2067" : { + "message" : [ + "Unsupported partition transform: <transform>" + ] + }, + "_LEGACY_ERROR_TEMP_2068" : { + "message" : [ + "Missing database location" + ] + }, + "_LEGACY_ERROR_TEMP_2069" : { + "message" : [ + "Cannot remove reserved property: <property>" + ] + }, + "_LEGACY_ERROR_TEMP_2070" : { + "message" : [ + "Writing job failed." + ] + }, + "_LEGACY_ERROR_TEMP_2071" : { + "message" : [ + "Commit denied for partition <partId> (task <taskId>, attempt <attemptId>, stage <stageId>.<stageAttempt>)" + ] + }, + "_LEGACY_ERROR_TEMP_2072" : { + "message" : [ + "Table implementation does not support writes: <ident>" + ] + }, + "_LEGACY_ERROR_TEMP_2073" : { + "message" : [ + "Cannot create JDBC table with partition" + ] + }, + "_LEGACY_ERROR_TEMP_2074" : { + "message" : [ + "user-specified schema" + ] + }, + "_LEGACY_ERROR_TEMP_2075" : { + "message" : [ + "Write is not supported for binary file data source" + ] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 341c3e72de8..a3e1b980d1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.apache.spark._ -import org.apache.spark.executor.CommitDeniedException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.sql.catalyst.{TableIdentifier, WalkedTypePath} @@ -742,17 +741,19 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("paths" -> allPaths.mkString(", "))) } - def failedToFindDataSourceError(provider: String, error: Throwable): Throwable = { - new ClassNotFoundException( - s""" - |Failed to find data source: $provider. Please find packages at - |https://spark.apache.org/third-party-projects.html - """.stripMargin, error) + def failedToFindDataSourceError( + provider: String, error: Throwable): SparkClassNotFoundException = { + new SparkClassNotFoundException( + errorClass = "_LEGACY_ERROR_TEMP_2051", + messageParameters = Map("provider" -> provider), + cause = error) } - def removedClassInSpark2Error(className: String, e: Throwable): Throwable = { - new ClassNotFoundException(s"$className was removed in Spark 2.0. " + - "Please check if your library is compatible with Spark 2.0", e) + def removedClassInSpark2Error(className: String, e: Throwable): SparkClassNotFoundException = { + new SparkClassNotFoundException( + errorClass = "_LEGACY_ERROR_TEMP_2052", + messageParameters = Map("className" -> className), + cause = e) } def incompatibleDataSourceRegisterError(e: Throwable): Throwable = { @@ -784,22 +785,24 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { ) } - def buildReaderUnsupportedForFileFormatError(format: String): Throwable = { - new UnsupportedOperationException(s"buildReader is not supported for $format") + def buildReaderUnsupportedForFileFormatError( + format: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2053", + messageParameters = Map("format" -> format)) } def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = { - new SparkException("Task failed while writing rows.", cause) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2054", + messageParameters = Map.empty, + cause = cause) } - def readCurrentFileNotFoundError(e: FileNotFoundException): Throwable = { - new FileNotFoundException( - s""" - |${e.getMessage}\n - |It is possible the underlying files have been updated. You can explicitly invalidate - |the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by - |recreating the Dataset/DataFrame involved. - """.stripMargin) + def readCurrentFileNotFoundError(e: FileNotFoundException): SparkFileNotFoundException = { + new SparkFileNotFoundException( + errorClass = "_LEGACY_ERROR_TEMP_2055", + messageParameters = Map("message" -> e.getMessage)) } def saveModeUnsupportedError(saveMode: Any, pathExists: Boolean): Throwable = { @@ -810,41 +813,54 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def cannotClearOutputDirectoryError(staticPrefixPath: Path): Throwable = { - new IOException(s"Unable to clear output directory $staticPrefixPath prior to writing to it") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2056", + messageParameters = Map("staticPrefixPath" -> staticPrefixPath.toString()), + cause = null) } def cannotClearPartitionDirectoryError(path: Path): Throwable = { - new IOException(s"Unable to clear partition directory $path prior to writing to it") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2057", + messageParameters = Map("path" -> path.toString()), + cause = null) } def failedToCastValueToDataTypeForPartitionColumnError( - value: String, dataType: DataType, columnName: String): Throwable = { - new RuntimeException(s"Failed to cast value `$value` to " + - s"`$dataType` for partition column `$columnName`") + value: String, dataType: DataType, columnName: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2058", + messageParameters = Map( + "value" -> value, + "dataType" -> dataType.toString(), + "columnName" -> columnName)) } def endOfStreamError(): Throwable = { - new NoSuchElementException("End of stream") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2059", + messageParameters = Map.empty, + cause = null) } def fallbackV1RelationReportsInconsistentSchemaError( - v2Schema: StructType, v1Schema: StructType): Throwable = { - new IllegalArgumentException( - "The fallback v1 relation reports inconsistent schema:\n" + - "Schema of v2 scan: " + v2Schema + "\n" + - "Schema of v1 relation: " + v1Schema) + v2Schema: StructType, v1Schema: StructType): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2060", + messageParameters = Map("v2Schema" -> v2Schema.toString(), "v1Schema" -> v1Schema.toString())) } def noRecordsFromEmptyDataReaderError(): Throwable = { - new IOException("No records should be returned from EmptyDataReader") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2061", + messageParameters = Map.empty, + cause = null) } - def fileNotFoundError(e: FileNotFoundException): Throwable = { - new FileNotFoundException( - e.getMessage + "\n" + - "It is possible the underlying files have been updated. " + - "You can explicitly invalidate the cache in Spark by " + - "recreating the Dataset/DataFrame involved.") + def fileNotFoundError(e: FileNotFoundException): SparkFileNotFoundException = { + new SparkFileNotFoundException( + errorClass = "_LEGACY_ERROR_TEMP_2062", + messageParameters = Map("message" -> e.getMessage)) } def unsupportedSchemaColumnConvertError( @@ -853,66 +869,100 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { logicalType: String, physicalType: String, e: Exception): Throwable = { - val message = "Parquet column cannot be converted in " + - s"file $filePath. Column: $column, " + - s"Expected: $logicalType, Found: $physicalType" - new QueryExecutionException(message, e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2063", + messageParameters = Map( + "filePath" -> filePath, + "column" -> column, + "logicalType" -> logicalType, + "physicalType" -> physicalType), + cause = e) } def cannotReadFilesError( e: Throwable, path: String): Throwable = { - val message = s"Encountered error while reading file $path. Details: " - new QueryExecutionException(message, e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2064", + messageParameters = Map("path" -> path), + cause = e) } def cannotCreateColumnarReaderError(): Throwable = { - new UnsupportedOperationException("Cannot create columnar reader.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2065", + messageParameters = Map.empty, + cause = null) } - def invalidNamespaceNameError(namespace: Array[String]): Throwable = { - new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}") + def invalidNamespaceNameError(namespace: Array[String]): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2066", + messageParameters = Map("namespace" -> namespace.quoted)) } - def unsupportedPartitionTransformError(transform: Transform): Throwable = { - new UnsupportedOperationException( - s"Unsupported partition transform: $transform") + def unsupportedPartitionTransformError( + transform: Transform): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2067", + messageParameters = Map("transform" -> transform.toString())) } - def missingDatabaseLocationError(): Throwable = { - new IllegalArgumentException("Missing database location") + def missingDatabaseLocationError(): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2068", + messageParameters = Map.empty) } - def cannotRemoveReservedPropertyError(property: String): Throwable = { - new UnsupportedOperationException(s"Cannot remove reserved property: $property") + def cannotRemoveReservedPropertyError(property: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2069", + messageParameters = Map("property" -> property)) } def writingJobFailedError(cause: Throwable): Throwable = { - new SparkException("Writing job failed.", cause) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2070", + messageParameters = Map.empty, + cause = cause) } def commitDeniedError( partId: Int, taskId: Long, attemptId: Int, stageId: Int, stageAttempt: Int): Throwable = { - val message = s"Commit denied for partition $partId (task $taskId, attempt $attemptId, " + - s"stage $stageId.$stageAttempt)" - new CommitDeniedException(message, stageId, partId, attemptId) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2071", + messageParameters = Map( + "partId" -> partId.toString(), + "taskId" -> taskId.toString(), + "attemptId" -> attemptId.toString(), + "stageId" -> stageId.toString(), + "stageAttempt" -> stageAttempt.toString()), + cause = null) } def unsupportedTableWritesError(ident: Identifier): Throwable = { new SparkException( - s"Table implementation does not support writes: ${ident.quoted}") + errorClass = "_LEGACY_ERROR_TEMP_2072", + messageParameters = Map("idnt" -> ident.quoted), + cause = null) } - def cannotCreateJDBCTableWithPartitionsError(): Throwable = { - new UnsupportedOperationException("Cannot create JDBC table with partition") + def cannotCreateJDBCTableWithPartitionsError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2073", + messageParameters = Map.empty) } - def unsupportedUserSpecifiedSchemaError(): Throwable = { - new UnsupportedOperationException("user-specified schema") + def unsupportedUserSpecifiedSchemaError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2074", + messageParameters = Map.empty) } - def writeUnsupportedForBinaryFileDataSourceError(): Throwable = { - new UnsupportedOperationException("Write is not supported for binary file data source") + def writeUnsupportedForBinaryFileDataSourceError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2075", + messageParameters = Map.empty) } def fileLengthExceedsMaxLengthError(status: FileStatus, maxLength: Int): Throwable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 62adf28aca4..c047603934d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -30,7 +30,6 @@ import org.mockito.Mockito.{mock, when} import org.apache.spark._ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.util.BadRecordException -import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, JDBCOptions} import org.apache.spark.sql.execution.datasources.orc.OrcTest import org.apache.spark.sql.execution.datasources.parquet.ParquetTest @@ -329,9 +328,9 @@ class QueryExecutionErrorsSuite .load(path.getAbsolutePath).select($"money").collect() } } - assert(e1.getCause.isInstanceOf[QueryExecutionException]) + assert(e1.getCause.isInstanceOf[SparkException]) - val e2 = e1.getCause.asInstanceOf[QueryExecutionException] + val e2 = e1.getCause.asInstanceOf[SparkException] assert(e2.getCause.isInstanceOf[SparkException]) val e3 = e2.getCause.asInstanceOf[SparkException] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 1bef75ef0d9..96f9ff58e85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -28,7 +28,6 @@ import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._ @@ -982,7 +981,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" - assert(e.getCause.isInstanceOf[QueryExecutionException]) + assert(e.getCause.isInstanceOf[SparkException]) assert(e.getCause.getCause.isInstanceOf[ParquetDecodingException]) assert(e.getCause.getMessage.contains(expectedMessage)) } @@ -991,7 +990,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { test("schema mismatch failure error message for parquet vectorized reader") { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[QueryExecutionException]) + assert(e.getCause.isInstanceOf[SparkException]) assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) // Check if the physical type is reporting correctly --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org