This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d61c1ea7f700 [SPARK-52335][CONNET][SQL] Unify the 'invalid bucket
count' error for both Connect and Classic
d61c1ea7f700 is described below
commit d61c1ea7f700ccf4a0ca37e9caef0ee811881b1e
Author: Yihong He <[email protected]>
AuthorDate: Fri May 30 08:30:51 2025 +0900
[SPARK-52335][CONNET][SQL] Unify the 'invalid bucket count' error for both
Connect and Classic
### What changes were proposed in this pull request?
This PR unifies the error handling for invalid bucket count validation
between Spark Connect and Classic Spark. The main changes are:
1. Updated the error message in `error-conditions.json` for
`INVALID_BUCKET_COUNT` to be more descriptive and consistent
2. Removed the legacy error condition `_LEGACY_ERROR_TEMP_1083` since its
functionality is now merged into `INVALID_BUCKET_COUNT`
3. Removed the `InvalidCommandInput` class and its usage in Connect since
we're now using the standard `AnalysisException` with `INVALID_BUCKET_COUNT`
error condition
4. Updated the bucket count validation in `SparkConnectPlanner` to rely on
the standard error handling path
5. Updated the test case in `SparkConnectProtoSuite` to verify the new
unified error handling
The key improvement is that both Connect and Classic now use the same error
condition and message format for invalid bucket count errors, making the error
handling more consistent across Spark's different interfaces. The error message
now includes both the maximum allowed bucket count and the invalid value
received, providing better guidance to users.
This change simplifies the error handling codebase by removing duplicate
error definitions and standardizing on a single error condition for this
validation case.
### Why are the changes needed?
The changes are needed to:
1. Provide consistent error messages across Spark Connect and Classic
interfaces
2. Simplify error handling by removing duplicate error definitions
3. Improve error message clarity by including the maximum allowed bucket
count in the error message
4. Maintain better code maintainability by reducing code duplication in
error handling
The unified error message now clearly indicates both the requirement
(bucket count > 0) and the upper limit (≤ bucketing.maxBuckets), making it more
helpful for users to understand and fix the issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
` build/sbt "connect/testOnly *SparkConnectProtoSuite"`
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51039 from heyihong/SPARK-52335.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 7 +----
python/pyspark/errors/exceptions/connect.py | 7 -----
.../spark/sql/errors/QueryCompilationErrors.scala | 2 +-
.../sql/connect/common/InvalidCommandInput.scala | 36 ----------------------
.../sql/connect/planner/InvalidInputErrors.scala | 5 +--
.../sql/connect/planner/SparkConnectPlanner.scala | 3 --
.../connect/planner/SparkConnectProtoSuite.scala | 13 +++++---
7 files changed, 12 insertions(+), 61 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 3e689cecac3b..8dddca5077a6 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2382,7 +2382,7 @@
},
"INVALID_BUCKET_COUNT" : {
"message" : [
- "BucketBy must specify a bucket count > 0, received <numBuckets>
instead."
+ "Number of buckets should be greater than 0 but less than or equal to
bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`."
],
"sqlState" : "22003"
},
@@ -7042,11 +7042,6 @@
"Partition [<specString>] did not specify locationUri."
]
},
- "_LEGACY_ERROR_TEMP_1083" : {
- "message" : [
- "Number of buckets should be greater than 0 but less than or equal to
bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`."
- ]
- },
"_LEGACY_ERROR_TEMP_1089" : {
"message" : [
"Column statistics deserialization is not supported for column <name> of
data type: <dataType>."
diff --git a/python/pyspark/errors/exceptions/connect.py
b/python/pyspark/errors/exceptions/connect.py
index fafdd6b84297..45c96a84f14d 100644
--- a/python/pyspark/errors/exceptions/connect.py
+++ b/python/pyspark/errors/exceptions/connect.py
@@ -371,12 +371,6 @@ class InvalidPlanInput(SparkConnectGrpcException):
"""
-class InvalidCommandInput(SparkConnectGrpcException):
- """
- Error thrown when a connect command is not valid.
- """
-
-
class StreamingPythonRunnerInitializationException(
SparkConnectGrpcException, BaseStreamingPythonRunnerInitException
):
@@ -411,7 +405,6 @@ EXCEPTION_CLASS_MAPPING = {
"org.apache.spark.SparkNoSuchElementException":
SparkNoSuchElementException,
"org.apache.spark.SparkException": SparkException,
"org.apache.spark.sql.connect.common.InvalidPlanInput": InvalidPlanInput,
- "org.apache.spark.sql.connect.common.InvalidCommandInput":
InvalidCommandInput,
"org.apache.spark.api.python.StreamingPythonRunner"
"$StreamingPythonRunnerInitializationException":
StreamingPythonRunnerInitializationException,
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9de52533dbbb..14f279ad5ad7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1276,7 +1276,7 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
def invalidBucketNumberError(bucketingMaxBuckets: Int, numBuckets: Int):
Throwable = {
new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1083",
+ errorClass = "INVALID_BUCKET_COUNT",
messageParameters = Map(
"bucketingMaxBuckets" -> bucketingMaxBuckets.toString,
"numBuckets" -> numBuckets.toString))
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala
deleted file mode 100644
index 38efa547e9dc..000000000000
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.connect.common
-
-import scala.jdk.CollectionConverters._
-
-import org.apache.spark.{SparkThrowable, SparkThrowableHelper}
-
-/**
- * Error thrown when a connect command is not valid.
- */
-final case class InvalidCommandInput(
- private val errorCondition: String,
- private val messageParameters: Map[String, String] = Map.empty,
- private val cause: Throwable = null)
- extends Exception(SparkThrowableHelper.getMessage(errorCondition,
messageParameters), cause)
- with SparkThrowable {
-
- override def getCondition: String = errorCondition
-
- override def getMessageParameters: java.util.Map[String, String] =
messageParameters.asJava
-}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
index 0f00d3643992..8076c62b0092 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.apache.spark.SparkThrowableHelper
import org.apache.spark.connect.proto
-import org.apache.spark.sql.connect.common.{InvalidCommandInput,
InvalidPlanInput}
+import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
import org.apache.spark.sql.types.DataType
@@ -214,9 +214,6 @@ object InvalidInputErrors {
def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput =
InvalidPlanInput("UnionByName `allowMissingCol` can be true only if
`byName` is true.")
- def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
- InvalidCommandInput("INVALID_BUCKET_COUNT", Map("numBuckets" ->
numBuckets.toString))
-
def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]):
InvalidPlanInput =
InvalidPlanInput(s"Unsupported UserDefinedFunction implementation:
${clazz}")
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 199fe7da3291..5978560c67d4 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3077,9 +3077,6 @@ class SparkConnectPlanner(
if (writeOperation.hasBucketBy) {
val op = writeOperation.getBucketBy
val cols = op.getBucketColumnNamesList.asScala
- if (op.getNumBuckets <= 0) {
- throw InvalidInputErrors.invalidBucketCount(op.getNumBuckets)
- }
w.bucketBy(op.getNumBuckets, cols.head, cols.tail.toSeq: _*)
}
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index e5f19e714895..5c43715d2dd1 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, Distinct, Lo
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.classic.DataFrame
-import org.apache.spark.sql.connect.common.{InvalidCommandInput,
InvalidPlanInput}
+import org.apache.spark.sql.connect.common.InvalidPlanInput
import
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
import org.apache.spark.sql.connect.dsl.MockRemoteSession
import org.apache.spark.sql.connect.dsl.commands._
@@ -657,13 +657,18 @@ class SparkConnectProtoSuite extends PlanTest with
SparkConnectPlanTest {
}
test("Write with invalid bucketBy configuration") {
- val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets =
Some(0))
+ val cmd = localRelation.write(
+ tableName = Some("testtable"),
+ tableSaveMethod = Some("save_as_table"),
+ format = Some("parquet"),
+ bucketByCols = Seq("id"),
+ numBuckets = Some(0))
checkError(
- exception = intercept[InvalidCommandInput] {
+ exception = intercept[AnalysisException] {
transform(cmd)
},
condition = "INVALID_BUCKET_COUNT",
- parameters = Map("numBuckets" -> "0"))
+ parameters = Map("bucketingMaxBuckets" -> "100000", "numBuckets" -> "0"))
}
test("Write to Path") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]