This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c6dccc7dd41 [SPARK-39163][SQL] Throw an exception w/ error class for
an invalid bucket file
c6dccc7dd41 is described below
commit c6dccc7dd412a95007f5bb2584d69b85ff9ebf8e
Author: panbingkun <[email protected]>
AuthorDate: Thu May 19 20:39:35 2022 +0300
[SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket
file
### What changes were proposed in this pull request?
In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an
invalid bucket file.
### Why are the changes needed?
Porting the executing errors for multiple rows from a subquery used as an
expression to the new error framework should improve user experience with Spark
SQL.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #36603 from panbingkun/SPARK-39163.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 3 +++
.../spark/sql/errors/QueryExecutionErrors.scala | 5 ++++
.../spark/sql/execution/DataSourceScanExec.scala | 4 ++--
.../sql/errors/QueryExecutionErrorsSuite.scala | 28 ++++++++++++++++++++--
.../adaptive/AdaptiveQueryExecSuite.scala | 6 ++---
.../spark/sql/sources/BucketedReadSuite.scala | 23 ------------------
6 files changed, 38 insertions(+), 31 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index e4ee09ea8a7..1a139c018e8 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -115,6 +115,9 @@
"INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
"message" : [ "The index <indexValue> is out of bounds. The array has
<arraySize> elements. To return NULL instead, use `try_element_at`. If
necessary set <config> to \"false\" to bypass this error." ]
},
+ "INVALID_BUCKET_FILE" : {
+ "message" : [ "Invalid bucket file: <path>" ]
+ },
"INVALID_FIELD_NAME" : {
"message" : [ "Field name <fieldName> is invalid: <path> is not a struct."
],
"sqlState" : "42000"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index a155b0694b5..1e664100545 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2000,4 +2000,9 @@ object QueryExecutionErrors extends QueryErrorsBase {
s"add ${toSQLValue(amount, IntegerType)} $unit to " +
s"${toSQLValue(DateTimeUtils.microsToInstant(micros),
TimestampType)}"))
}
+
+ def invalidBucketFile(path: String): Throwable = {
+ new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters =
Array(path),
+ cause = null)
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7b627cef08..f5d349d975f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -618,8 +619,7 @@ case class FileSourceScanExec(
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
- // TODO(SPARK-39163): Throw an exception w/ error class for an
invalid bucket file
- .getOrElse(throw new IllegalStateException(s"Invalid bucket file
${f.filePath}"))
+ .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
}
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index bdc0772c1de..bbf6c0dda79 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.errors
-import java.io.IOException
-import java.net.URL
+import java.io.{File, IOException}
+import java.net.{URI, URL}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet,
ResultSetMetaData}
import java.util.{Locale, Properties, ServiceConfigurationError}
@@ -587,6 +587,30 @@ class QueryExecutionErrorsSuite
JdbcDialects.unregisterDialect(testH2DialectUnrecognizedSQLType)
}
+
+ test("INVALID_BUCKET_FILE: error if there exists any malformed bucket
files") {
+ val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).
+ toDF("i", "j", "k").as("df1")
+
+ withTable("bucketed_table") {
+ df1.write.format("parquet").bucketBy(8, "i").
+ saveAsTable("bucketed_table")
+ val warehouseFilePath = new
URI(spark.sessionState.conf.warehousePath).getPath
+ val tableDir = new File(warehouseFilePath, "bucketed_table")
+ Utils.deleteRecursively(tableDir)
+ df1.write.parquet(tableDir.getAbsolutePath)
+
+ val aggregated = spark.table("bucketed_table").groupBy("i").count()
+
+ checkErrorClass(
+ exception = intercept[SparkException] {
+ aggregated.count()
+ },
+ errorClass = "INVALID_BUCKET_FILE",
+ msg = "Invalid bucket file: .+",
+ matchMsg = true)
+ }
+ }
}
class FakeFileSystemSetPermission extends LocalFileSystem {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 0f71c028962..51d02f4a7c6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -860,10 +860,8 @@ class AdaptiveQueryExecSuite
val error = intercept[SparkException] {
aggregated.count()
}
- // TODO(SPARK-39163): Throw an exception w/ error class for an invalid
bucket file
- assert(error.getErrorClass === "INTERNAL_ERROR")
- assert(error.getCause.toString contains "Invalid bucket file")
- assert(error.getCause.getSuppressed.size === 0)
+ assert(error.getErrorClass === "INVALID_BUCKET_FILE")
+ assert(error.getMessage contains "Invalid bucket file")
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index c39edbc5860..fc7c4e5761b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -17,12 +17,8 @@
package org.apache.spark.sql.sources
-import java.io.File
-import java.net.URI
-
import scala.util.Random
-import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
@@ -37,7 +33,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
-import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet
class BucketedReadWithoutHiveSupportSuite
@@ -833,24 +828,6 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
}
}
- test("error if there exists any malformed bucket files") {
- withTable("bucketed_table") {
- df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
- val warehouseFilePath = new
URI(spark.sessionState.conf.warehousePath).getPath
- val tableDir = new File(warehouseFilePath, "bucketed_table")
- Utils.deleteRecursively(tableDir)
- df1.write.parquet(tableDir.getAbsolutePath)
-
- val aggregated = spark.table("bucketed_table").groupBy("i").count()
- val e = intercept[SparkException] {
- aggregated.count()
- }
- // TODO(SPARK-39163): Throw an exception w/ error class for an invalid
bucket file
- assert(e.getErrorClass === "INTERNAL_ERROR")
- assert(e.getCause.toString contains "Invalid bucket file")
- }
- }
-
test("disable bucketing when the output doesn't contain all bucketing
columns") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]