This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 885f4733c41 [SPARK-38753][SQL][TESTS] Move the tests for
`WRITING_JOB_ABORTED` to `QueryExecutionErrorsSuite`
885f4733c41 is described below
commit 885f4733c413bdbb110946361247fbbd19f6bba9
Author: Max Gekk <[email protected]>
AuthorDate: Thu Apr 14 15:36:07 2022 +0300
[SPARK-38753][SQL][TESTS] Move the tests for `WRITING_JOB_ABORTED` to
`QueryExecutionErrorsSuite`
### What changes were proposed in this pull request?
Move test for the error class `WRITING_JOB_ABORTED` from
`DataSourceV2Suite.scala` to `QueryExecutionErrorsSuite`.
### Why are the changes needed?
To improve code maintenance - all tests for error classes are placed to
Query.*ErrorsSuite. Also the exception is raised from
[QueryExecutionErrors](https://github.com/apache/spark/blob/073fd2ad5c16d193725954e76ce357e4a9d97449/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala#L665-L670),
so, tests should be in `QueryExecutionErrorsSuite` for consistency.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "sql/test:testOnly *QueryExecutionErrorsSuite"
$ build/sbt "sql/test:testOnly *DataSourceV2Suite"
```
Closes #36196 from MaxGekk/move-tests-WRITING_JOB_ABORTED.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../spark/sql/connector/DataSourceV2Suite.scala | 24 ----------------
.../sql/errors/QueryExecutionErrorsSuite.scala | 33 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 24 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 44d4f1fa825..3fefaf72df4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -22,7 +22,6 @@ import java.util.OptionalLong
import test.org.apache.spark.sql.connector._
-import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table,
TableCapability, TableProvider}
@@ -349,29 +348,6 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
.option("path", path).mode("error").save()
}
assert(e2.getMessage.contains("please use Append or Overwrite modes
instead"))
-
- // test transaction
- val failingUdf = org.apache.spark.sql.functions.udf {
- var count = 0
- (id: Long) => {
- if (count > 5) {
- throw new RuntimeException("testing error")
- }
- count += 1
- id
- }
- }
- // this input data will fail to read middle way.
- val input = spark.range(15).select(failingUdf($"id").as(Symbol("i")))
- .select($"i", -$"i" as Symbol("j"))
- val e3 = intercept[SparkException] {
- input.write.format(cls.getName).option("path",
path).mode("overwrite").save()
- }
- assert(e3.getMessage.contains("Writing job aborted"))
- assert(e3.getErrorClass == "WRITING_JOB_ABORTED")
- assert(e3.getSqlState == "40000")
- // make sure we don't have partial data.
- assert(spark.read.format(cls.getName).option("path",
path).load().collect().isEmpty)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index e47e0823536..77eb6b28d54 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -19,9 +19,12 @@ package org.apache.spark.sql.errors
import java.util.Locale
+import test.org.apache.spark.sql.connector.JavaSimpleWritableDataSource
+
import org.apache.spark.{SparkArithmeticException, SparkException,
SparkIllegalStateException, SparkRuntimeException,
SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.util.BadRecordException
+import org.apache.spark.sql.connector.SimpleWritableDataSource
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.orc.OrcTest
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
@@ -324,4 +327,34 @@ class QueryExecutionErrorsSuite extends QueryTest
assert(e5.getSqlState === "42000")
assert(e5.getMessage === "Cannot parse decimal")
}
+
+ test("WRITING_JOB_ABORTED: read of input data fails in the middle") {
+ Seq(classOf[SimpleWritableDataSource],
classOf[JavaSimpleWritableDataSource]).foreach { cls =>
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+ assert(spark.read.format(cls.getName).option("path",
path).load().collect().isEmpty)
+ // test transaction
+ val failingUdf = org.apache.spark.sql.functions.udf {
+ var count = 0
+ (id: Long) => {
+ if (count > 5) {
+ throw new RuntimeException("testing error")
+ }
+ count += 1
+ id
+ }
+ }
+ val input = spark.range(15).select(failingUdf($"id").as(Symbol("i")))
+ .select($"i", -$"i" as Symbol("j"))
+ val e = intercept[SparkException] {
+ input.write.format(cls.getName).option("path",
path).mode("overwrite").save()
+ }
+ assert(e.getMessage === "Writing job aborted")
+ assert(e.getErrorClass === "WRITING_JOB_ABORTED")
+ assert(e.getSqlState === "40000")
+ // make sure we don't have partial data.
+ assert(spark.read.format(cls.getName).option("path",
path).load().collect().isEmpty)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]