This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 357193d5550 [SPARK-40663][SQL] Migrate execution errors onto error 
classes: _LEGACY_ERROR_TEMP_2051-2075
357193d5550 is described below

commit 357193d55508051369228c5e7e252c17089da317
Author: itholic <haejoon....@databricks.com>
AuthorDate: Sun Oct 9 22:45:43 2022 +0300

    [SPARK-40663][SQL] Migrate execution errors onto error classes: 
_LEGACY_ERROR_TEMP_2051-2075
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate 25 execution errors onto temporary error 
classes with the prefix `_LEGACY_ERROR_TEMP_2051` to `_LEGACY_ERROR_TEMP_2075`.
    
    The error classes are prefixed with `_LEGACY_ERROR_TEMP_` indicates the 
dev-facing error messages, and won't be exposed to end users.
    
    ### Why are the changes needed?
    
    To speed-up the error class migration.
    
    The migration on temporary error classes allow us to analyze the errors, so 
we can detect the most popular error classes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    ```
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
    $ build/sbt "test:testOnly *SQLQuerySuite"
    ```
    
    Closes #38116 from itholic/SPARK-40540-2051-2075.
    
    Lead-authored-by: itholic <haejoon....@databricks.com>
    Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 129 +++++++++++++++
 .../spark/sql/errors/QueryExecutionErrors.scala    | 182 +++++++++++++--------
 .../sql/errors/QueryExecutionErrorsSuite.scala     |   5 +-
 .../datasources/parquet/ParquetSchemaSuite.scala   |   5 +-
 4 files changed, 249 insertions(+), 72 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 146c99d5ca5..3e1655d80e4 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -3264,5 +3264,134 @@
     "message" : [
       "Expected exactly one path to be specified, but got: <paths>"
     ]
+  },
+  "_LEGACY_ERROR_TEMP_2051" : {
+    "message" : [
+      "Failed to find data source: <provider>. Please find packages at 
https://spark.apache.org/third-party-projects.html";
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2052" : {
+    "message" : [
+      "<className> was removed in Spark 2.0. Please check if your library is 
compatible with Spark 2.0"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2053" : {
+    "message" : [
+      "buildReader is not supported for <format>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2054" : {
+    "message" : [
+      "Task failed while writing rows."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2055" : {
+    "message" : [
+      "<message>",
+      "It is possible the underlying files have been updated. You can 
explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' 
command in SQL or by recreating the Dataset/DataFrame involved."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2056" : {
+    "message" : [
+      "Unable to clear output directory <staticPrefixPath> prior to writing to 
it"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2057" : {
+    "message" : [
+      "Unable to clear partition directory <path> prior to writing to it"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2058" : {
+    "message" : [
+      "Failed to cast value `<value>` to `<dataType>` for partition column 
`<columnName>`"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2059" : {
+    "message" : [
+      "End of stream"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2060" : {
+    "message" : [
+      "The fallback v1 relation reports inconsistent schema:",
+      "Schema of v2 scan: <v2Schema>",
+      "Schema of v1 relation: <v1Schema>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2061" : {
+    "message" : [
+      "No records should be returned from EmptyDataReader"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2062" : {
+    "message" : [
+      "<message>",
+      "It is possible the underlying files have been updated. You can 
explicitly invalidate the cache in Spark by recreating the Dataset/DataFrame 
involved."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2063" : {
+    "message" : [
+      "Parquet column cannot be converted in file <filePath>. Column: 
<column>, Expected: <logicalType>, Found: <physicalType>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2064" : {
+    "message" : [
+      "Encountered error while reading file <path>. Details:"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2065" : {
+    "message" : [
+      "Cannot create columnar reader."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2066" : {
+    "message" : [
+      "Invalid namespace name: <namespace>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2067" : {
+    "message" : [
+      "Unsupported partition transform: <transform>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2068" : {
+    "message" : [
+      "Missing database location"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2069" : {
+    "message" : [
+      "Cannot remove reserved property: <property>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2070" : {
+    "message" : [
+      "Writing job failed."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2071" : {
+    "message" : [
+      "Commit denied for partition <partId> (task <taskId>, attempt 
<attemptId>, stage <stageId>.<stageAttempt>)"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2072" : {
+    "message" : [
+      "Table implementation does not support writes: <ident>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2073" : {
+    "message" : [
+      "Cannot create JDBC table with partition"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2074" : {
+    "message" : [
+      "user-specified schema"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_2075" : {
+    "message" : [
+      "Write is not supported for binary file data source"
+    ]
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 341c3e72de8..a3e1b980d1f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission
 import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerException}
 
 import org.apache.spark._
-import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.sql.catalyst.{TableIdentifier, WalkedTypePath}
@@ -742,17 +741,19 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       messageParameters = Map("paths" -> allPaths.mkString(", ")))
   }
 
-  def failedToFindDataSourceError(provider: String, error: Throwable): 
Throwable = {
-    new ClassNotFoundException(
-      s"""
-         |Failed to find data source: $provider. Please find packages at
-         |https://spark.apache.org/third-party-projects.html
-       """.stripMargin, error)
+  def failedToFindDataSourceError(
+      provider: String, error: Throwable): SparkClassNotFoundException = {
+    new SparkClassNotFoundException(
+      errorClass = "_LEGACY_ERROR_TEMP_2051",
+      messageParameters = Map("provider" -> provider),
+      cause = error)
   }
 
-  def removedClassInSpark2Error(className: String, e: Throwable): Throwable = {
-    new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
-      "Please check if your library is compatible with Spark 2.0", e)
+  def removedClassInSpark2Error(className: String, e: Throwable): 
SparkClassNotFoundException = {
+    new SparkClassNotFoundException(
+      errorClass = "_LEGACY_ERROR_TEMP_2052",
+      messageParameters = Map("className" -> className),
+      cause = e)
   }
 
   def incompatibleDataSourceRegisterError(e: Throwable): Throwable = {
@@ -784,22 +785,24 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
     )
   }
 
-  def buildReaderUnsupportedForFileFormatError(format: String): Throwable = {
-    new UnsupportedOperationException(s"buildReader is not supported for 
$format")
+  def buildReaderUnsupportedForFileFormatError(
+      format: String): SparkUnsupportedOperationException = {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2053",
+      messageParameters = Map("format" -> format))
   }
 
   def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
-    new SparkException("Task failed while writing rows.", cause)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2054",
+      messageParameters = Map.empty,
+      cause = cause)
   }
 
-  def readCurrentFileNotFoundError(e: FileNotFoundException): Throwable = {
-    new FileNotFoundException(
-      s"""
-         |${e.getMessage}\n
-         |It is possible the underlying files have been updated. You can 
explicitly invalidate
-         |the cache in Spark by running 'REFRESH TABLE tableName' command in 
SQL or by
-         |recreating the Dataset/DataFrame involved.
-       """.stripMargin)
+  def readCurrentFileNotFoundError(e: FileNotFoundException): 
SparkFileNotFoundException = {
+    new SparkFileNotFoundException(
+      errorClass = "_LEGACY_ERROR_TEMP_2055",
+      messageParameters = Map("message" -> e.getMessage))
   }
 
   def saveModeUnsupportedError(saveMode: Any, pathExists: Boolean): Throwable 
= {
@@ -810,41 +813,54 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
   }
 
   def cannotClearOutputDirectoryError(staticPrefixPath: Path): Throwable = {
-    new IOException(s"Unable to clear output directory $staticPrefixPath prior 
to writing to it")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2056",
+      messageParameters = Map("staticPrefixPath" -> 
staticPrefixPath.toString()),
+      cause = null)
   }
 
   def cannotClearPartitionDirectoryError(path: Path): Throwable = {
-    new IOException(s"Unable to clear partition directory $path prior to 
writing to it")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2057",
+      messageParameters = Map("path" -> path.toString()),
+      cause = null)
   }
 
   def failedToCastValueToDataTypeForPartitionColumnError(
-      value: String, dataType: DataType, columnName: String): Throwable = {
-    new RuntimeException(s"Failed to cast value `$value` to " +
-      s"`$dataType` for partition column `$columnName`")
+      value: String, dataType: DataType, columnName: String): 
SparkRuntimeException = {
+    new SparkRuntimeException(
+      errorClass = "_LEGACY_ERROR_TEMP_2058",
+      messageParameters = Map(
+        "value" -> value,
+        "dataType" -> dataType.toString(),
+        "columnName" -> columnName))
   }
 
   def endOfStreamError(): Throwable = {
-    new NoSuchElementException("End of stream")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2059",
+      messageParameters = Map.empty,
+      cause = null)
   }
 
   def fallbackV1RelationReportsInconsistentSchemaError(
-      v2Schema: StructType, v1Schema: StructType): Throwable = {
-    new IllegalArgumentException(
-      "The fallback v1 relation reports inconsistent schema:\n" +
-        "Schema of v2 scan:     " + v2Schema + "\n" +
-        "Schema of v1 relation: " + v1Schema)
+      v2Schema: StructType, v1Schema: StructType): 
SparkIllegalArgumentException = {
+    new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_2060",
+      messageParameters = Map("v2Schema" -> v2Schema.toString(), "v1Schema" -> 
v1Schema.toString()))
   }
 
   def noRecordsFromEmptyDataReaderError(): Throwable = {
-    new IOException("No records should be returned from EmptyDataReader")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2061",
+      messageParameters = Map.empty,
+      cause = null)
   }
 
-  def fileNotFoundError(e: FileNotFoundException): Throwable = {
-    new FileNotFoundException(
-      e.getMessage + "\n" +
-        "It is possible the underlying files have been updated. " +
-        "You can explicitly invalidate the cache in Spark by " +
-        "recreating the Dataset/DataFrame involved.")
+  def fileNotFoundError(e: FileNotFoundException): SparkFileNotFoundException 
= {
+    new SparkFileNotFoundException(
+      errorClass = "_LEGACY_ERROR_TEMP_2062",
+      messageParameters = Map("message" -> e.getMessage))
   }
 
   def unsupportedSchemaColumnConvertError(
@@ -853,66 +869,100 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       logicalType: String,
       physicalType: String,
       e: Exception): Throwable = {
-    val message = "Parquet column cannot be converted in " +
-      s"file $filePath. Column: $column, " +
-      s"Expected: $logicalType, Found: $physicalType"
-    new QueryExecutionException(message, e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2063",
+      messageParameters = Map(
+        "filePath" -> filePath,
+        "column" -> column,
+        "logicalType" -> logicalType,
+        "physicalType" -> physicalType),
+      cause = e)
   }
 
   def cannotReadFilesError(
       e: Throwable,
       path: String): Throwable = {
-    val message = s"Encountered error while reading file $path. Details: "
-    new QueryExecutionException(message, e)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2064",
+      messageParameters = Map("path" -> path),
+      cause = e)
   }
 
   def cannotCreateColumnarReaderError(): Throwable = {
-    new UnsupportedOperationException("Cannot create columnar reader.")
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2065",
+      messageParameters = Map.empty,
+      cause = null)
   }
 
-  def invalidNamespaceNameError(namespace: Array[String]): Throwable = {
-    new IllegalArgumentException(s"Invalid namespace name: 
${namespace.quoted}")
+  def invalidNamespaceNameError(namespace: Array[String]): 
SparkIllegalArgumentException = {
+    new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_2066",
+      messageParameters = Map("namespace" -> namespace.quoted))
   }
 
-  def unsupportedPartitionTransformError(transform: Transform): Throwable = {
-    new UnsupportedOperationException(
-      s"Unsupported partition transform: $transform")
+  def unsupportedPartitionTransformError(
+      transform: Transform): SparkUnsupportedOperationException = {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2067",
+      messageParameters = Map("transform" -> transform.toString()))
   }
 
-  def missingDatabaseLocationError(): Throwable = {
-    new IllegalArgumentException("Missing database location")
+  def missingDatabaseLocationError(): SparkIllegalArgumentException = {
+    new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_2068",
+      messageParameters = Map.empty)
   }
 
-  def cannotRemoveReservedPropertyError(property: String): Throwable = {
-    new UnsupportedOperationException(s"Cannot remove reserved property: 
$property")
+  def cannotRemoveReservedPropertyError(property: String): 
SparkUnsupportedOperationException = {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2069",
+      messageParameters = Map("property" -> property))
   }
 
   def writingJobFailedError(cause: Throwable): Throwable = {
-    new SparkException("Writing job failed.", cause)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2070",
+      messageParameters = Map.empty,
+      cause = cause)
   }
 
   def commitDeniedError(
       partId: Int, taskId: Long, attemptId: Int, stageId: Int, stageAttempt: 
Int): Throwable = {
-    val message = s"Commit denied for partition $partId (task $taskId, attempt 
$attemptId, " +
-      s"stage $stageId.$stageAttempt)"
-    new CommitDeniedException(message, stageId, partId, attemptId)
+    new SparkException(
+      errorClass = "_LEGACY_ERROR_TEMP_2071",
+      messageParameters = Map(
+        "partId" -> partId.toString(),
+        "taskId" -> taskId.toString(),
+        "attemptId" -> attemptId.toString(),
+        "stageId" -> stageId.toString(),
+        "stageAttempt" -> stageAttempt.toString()),
+      cause = null)
   }
 
   def unsupportedTableWritesError(ident: Identifier): Throwable = {
     new SparkException(
-      s"Table implementation does not support writes: ${ident.quoted}")
+      errorClass = "_LEGACY_ERROR_TEMP_2072",
+      messageParameters = Map("idnt" -> ident.quoted),
+      cause = null)
   }
 
-  def cannotCreateJDBCTableWithPartitionsError(): Throwable = {
-    new UnsupportedOperationException("Cannot create JDBC table with 
partition")
+  def cannotCreateJDBCTableWithPartitionsError(): 
SparkUnsupportedOperationException = {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2073",
+      messageParameters = Map.empty)
   }
 
-  def unsupportedUserSpecifiedSchemaError(): Throwable = {
-    new UnsupportedOperationException("user-specified schema")
+  def unsupportedUserSpecifiedSchemaError(): 
SparkUnsupportedOperationException = {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2074",
+      messageParameters = Map.empty)
   }
 
-  def writeUnsupportedForBinaryFileDataSourceError(): Throwable = {
-    new UnsupportedOperationException("Write is not supported for binary file 
data source")
+  def writeUnsupportedForBinaryFileDataSourceError(): 
SparkUnsupportedOperationException = {
+    new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_2075",
+      messageParameters = Map.empty)
   }
 
   def fileLengthExceedsMaxLengthError(status: FileStatus, maxLength: Int): 
Throwable = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 62adf28aca4..c047603934d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -30,7 +30,6 @@ import org.mockito.Mockito.{mock, when}
 import org.apache.spark._
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, 
SaveMode}
 import org.apache.spark.sql.catalyst.util.BadRecordException
-import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, 
JDBCOptions}
 import org.apache.spark.sql.execution.datasources.orc.OrcTest
 import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
@@ -329,9 +328,9 @@ class QueryExecutionErrorsSuite
           .load(path.getAbsolutePath).select($"money").collect()
       }
     }
-    assert(e1.getCause.isInstanceOf[QueryExecutionException])
+    assert(e1.getCause.isInstanceOf[SparkException])
 
-    val e2 = e1.getCause.asInstanceOf[QueryExecutionException]
+    val e2 = e1.getCause.asInstanceOf[SparkException]
     assert(e2.getCause.isInstanceOf[SparkException])
 
     val e3 = e2.getCause.asInstanceOf[SparkException]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 1bef75ef0d9..96f9ff58e85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -28,7 +28,6 @@ import org.apache.parquet.schema.Type._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.execution.QueryExecutionException
 import 
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
@@ -982,7 +981,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     withTempPath { dir =>
       val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled 
= false)
       val expectedMessage = "Encountered error while reading file"
-      assert(e.getCause.isInstanceOf[QueryExecutionException])
+      assert(e.getCause.isInstanceOf[SparkException])
       assert(e.getCause.getCause.isInstanceOf[ParquetDecodingException])
       assert(e.getCause.getMessage.contains(expectedMessage))
     }
@@ -991,7 +990,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
   test("schema mismatch failure error message for parquet vectorized reader") {
     withTempPath { dir =>
       val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled 
= true)
-      assert(e.getCause.isInstanceOf[QueryExecutionException])
+      assert(e.getCause.isInstanceOf[SparkException])
       
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
 
       // Check if the physical type is reporting correctly


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to