This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new aaee89a12fd [SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054
aaee89a12fd is described below
commit aaee89a12fd9b8ca3c57fa4283a51ce229dd7b71
Author: itholic <[email protected]>
AuthorDate: Tue Jan 10 16:25:15 2023 +0300
[SPARK-41575][SQL] Assign name to _LEGACY_ERROR_TEMP_2054
### What changes were proposed in this pull request?
This PR proposes to assign name to _LEGACY_ERROR_TEMP_2054,
"TASK_WRITE_FAILED".
### Why are the changes needed?
We should assign proper name to _LEGACY_ERROR_TEMP_*
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`
Closes #39394 from itholic/LEGACY_2054.
Authored-by: itholic <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 10 +--
.../spark/sql/errors/QueryExecutionErrors.scala | 6 +-
.../execution/datasources/FileFormatWriter.scala | 2 +-
.../apache/spark/sql/CharVarcharTestSuite.scala | 82 +++++++++++++++-------
.../org/apache/spark/sql/sources/InsertSuite.scala | 16 +++--
.../spark/sql/HiveCharVarcharTestSuite.scala | 27 +++++++
6 files changed, 104 insertions(+), 39 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index a3acb940585..edf46a0fe09 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1187,6 +1187,11 @@
],
"sqlState" : "42000"
},
+ "TASK_WRITE_FAILED" : {
+ "message" : [
+ "Task failed while writing rows to <path>."
+ ]
+ },
"TEMP_TABLE_OR_VIEW_ALREADY_EXISTS" : {
"message" : [
"Cannot create the temporary view <relationName> because it already
exists.",
@@ -3728,11 +3733,6 @@
"buildReader is not supported for <format>"
]
},
- "_LEGACY_ERROR_TEMP_2054" : {
- "message" : [
- "Task failed while writing rows. <message>"
- ]
- },
"_LEGACY_ERROR_TEMP_2055" : {
"message" : [
"<message>",
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 17fc38812f8..9598933d941 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -782,10 +782,10 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
messageParameters = Map("format" -> format))
}
- def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
+ def taskFailedWhileWritingRowsError(path: String, cause: Throwable):
Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2054",
- messageParameters = Map("message" -> cause.getMessage),
+ errorClass = "TASK_WRITE_FAILED",
+ messageParameters = Map("path" -> path),
cause = cause)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6285095c647..5c4d662c145 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -423,7 +423,7 @@ object FileFormatWriter extends Logging {
// We throw the exception and let Executor throw ExceptionFailure to
abort the job.
throw new TaskOutputFileAlreadyExistException(f)
case t: Throwable =>
- throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t)
+ throw
QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 95c2e5085d9..c0ceebaa9a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -178,26 +178,6 @@ trait CharVarcharTestSuite extends QueryTest with
SQLTestUtils {
}
}
- test("char/varchar type values length check: partitioned columns of other
types") {
- Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
- withTable("t") {
- sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY
(c)")
- Seq(1, 10, 100, 1000, 10000).foreach { v =>
- sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
- checkPlainResult(spark.table("t"), typ, v.toString)
- sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
- checkAnswer(spark.table("t"), Nil)
- }
-
- val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES
('1', 100000)"))
- assert(e1.getCause.getMessage.contains("Exceeds char/varchar type
length limitation: 5"))
-
- val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP
PARTITION(c=100000)"))
- assert(e2.getMessage.contains("Exceeds char/varchar type length
limitation: 5"))
- }
- }
- }
-
test("char type values should be padded: nested in struct") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c STRUCT<c: CHAR(5)>) USING $format")
@@ -332,12 +312,18 @@ trait CharVarcharTestSuite extends QueryTest with
SQLTestUtils {
test("length check for input string values: partitioned columns") {
// DS V2 doesn't support partitioned table.
if (!conf.contains(SQLConf.DEFAULT_CATALOG.key)) {
+ val tableName = "t"
testTableWrite { typeName =>
- sql(s"CREATE TABLE t(i INT, c $typeName(5)) USING $format PARTITIONED
BY (c)")
- sql("INSERT INTO t VALUES (1, null)")
- checkAnswer(spark.table("t"), Row(1, null))
- val e = intercept[SparkException](sql("INSERT INTO t VALUES (1,
'123456')"))
- assert(e.getCause.getMessage.contains(s"Exceeds char/varchar type
length limitation: 5"))
+ sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format
PARTITIONED BY (c)")
+ sql(s"INSERT INTO $tableName VALUES (1, null)")
+ checkAnswer(spark.table(tableName), Row(1, null))
+ val e = intercept[SparkException](sql(s"INSERT INTO $tableName VALUES
(1, '123456')"))
+ checkError(
+ exception = e.getCause.asInstanceOf[SparkException],
+ errorClass = "TASK_WRITE_FAILED",
+ parameters = Map("path" -> s".*$tableName.*"),
+ matchPVals = true
+ )
}
}
}
@@ -884,6 +870,32 @@ class FileSourceCharVarcharTestSuite extends
CharVarcharTestSuite with SharedSpa
}
}
}
+
+ test("char/varchar type values length check: partitioned columns of other
types") {
+ val tableName = "t"
+ Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format
PARTITIONED BY (c)")
+ Seq(1, 10, 100, 1000, 10000).foreach { v =>
+ sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
+ checkPlainResult(spark.table(tableName), typ, v.toString)
+ sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
+ checkAnswer(spark.table(tableName), Nil)
+ }
+
+ val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName
VALUES ('1', 100000)"))
+ checkError(
+ exception = e1.getCause.asInstanceOf[SparkException],
+ errorClass = "TASK_WRITE_FAILED",
+ parameters = Map("path" -> s".*$tableName"),
+ matchPVals = true
+ )
+
+ val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP
PARTITION(c=100000)"))
+ assert(e2.getMessage.contains("Exceeds char/varchar type length
limitation: 5"))
+ }
+ }
+ }
}
class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
@@ -894,4 +906,24 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
.set("spark.sql.catalog.testcat",
classOf[InMemoryPartitionTableCatalog].getName)
.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
}
+
+ test("char/varchar type values length check: partitioned columns of other
types") {
+ Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
+ withTable("t") {
+ sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY
(c)")
+ Seq(1, 10, 100, 1000, 10000).foreach { v =>
+ sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
+ checkPlainResult(spark.table("t"), typ, v.toString)
+ sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
+ checkAnswer(spark.table("t"), Nil)
+ }
+
+ val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES
('1', 100000)"))
+ assert(e1.getCause.getMessage.contains("Exceeds char/varchar type
length limitation: 5"))
+
+ val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP
PARTITION(c=100000)"))
+ assert(e2.getMessage.contains("Exceeds char/varchar type length
limitation: 5"))
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 5df9b2ae598..d544b5fde5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -2027,27 +2027,33 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
test("Stop task set if FileAlreadyExistsException was thrown") {
+ val tableName = "t"
Seq(true, false).foreach { fastFail =>
withSQLConf("fs.file.impl" ->
classOf[FileExistingTestFileSystem].getName,
"fs.file.impl.disable.cache" -> "true",
SQLConf.FASTFAIL_ON_FILEFORMAT_OUTPUT.key -> fastFail.toString) {
- withTable("t") {
+ withTable(tableName) {
sql(
- """
- |CREATE TABLE t(i INT, part1 INT) USING PARQUET
+ s"""
+ |CREATE TABLE $tableName(i INT, part1 INT) USING PARQUET
|PARTITIONED BY (part1)
""".stripMargin)
val df = Seq((1, 1)).toDF("i", "part1")
val err = intercept[SparkException] {
- df.write.mode("overwrite").format("parquet").insertInto("t")
+ df.write.mode("overwrite").format("parquet").insertInto(tableName)
}
if (fastFail) {
assert(err.getMessage.contains("can not write to output file: " +
"org.apache.hadoop.fs.FileAlreadyExistsException"))
} else {
- assert(err.getMessage.contains("Task failed while writing rows"))
+ checkError(
+ exception = err.getCause.asInstanceOf[SparkException],
+ errorClass = "TASK_WRITE_FAILED",
+ parameters = Map("path" -> s".*$tableName"),
+ matchPVals = true
+ )
}
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
index 182047a8c64..1e7820f0c19 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.SparkException
import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -73,6 +74,32 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite
with TestHiveSinglet
}
}
}
+
+ test("char/varchar type values length check: partitioned columns of other
types") {
+ val tableName = "t"
+ Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format
PARTITIONED BY (c)")
+ Seq(1, 10, 100, 1000, 10000).foreach { v =>
+ sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
+ checkPlainResult(spark.table(tableName), typ, v.toString)
+ sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
+ checkAnswer(spark.table(tableName), Nil)
+ }
+
+ val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName
VALUES ('1', 100000)"))
+ checkError(
+ exception = e1.getCause.asInstanceOf[SparkException],
+ errorClass = "TASK_WRITE_FAILED",
+ parameters = Map("path" -> s".*$tableName.*"),
+ matchPVals = true
+ )
+
+ val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP
PARTITION(c=100000)"))
+ assert(e2.getMessage.contains("Exceeds char/varchar type length
limitation: 5"))
+ }
+ }
+ }
}
class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with
TestHiveSingleton {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]