This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 09d93ece2cf7 [SPARK-50602][SQL] Fix transpose to show a proper error
message when invalid index columns are specified
09d93ece2cf7 is described below
commit 09d93ece2cf78ff265c305fcf79c5a4f4b5bd5a4
Author: Takuya Ueshin <[email protected]>
AuthorDate: Thu Dec 19 14:11:07 2024 -0800
[SPARK-50602][SQL] Fix transpose to show a proper error message when
invalid index columns are specified
### What changes were proposed in this pull request?
Fixes `transpose` to show a proper error message when invalid index columns
are specified.
### Why are the changes needed?
When invalid index columns are specified, it shows `INTERNAL_ERROR`.
```py
>>> df = spark.range(10).transpose(sf.col("id") + 1)
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o40.transpose.
: org.apache.spark.SparkException: [INTERNAL_ERROR] Found the unresolved
operator: 'UnresolvedTranspose [unresolvedalias((id#0L + cast(1 as bigint)))]
SQLSTATE: XX000
...
```
### Does this PR introduce _any_ user-facing change?
Yes, the proper error message will be shown.
```py
>>> df = spark.range(10).transpose(sf.col("id") + 1)
Traceback (most recent call last):
...
pyspark.errors.exceptions.captured.AnalysisException:
[TRANSPOSE_INVALID_INDEX_COLUMN] Invalid index column for TRANSPOSE because:
Index column must be an atomic attribute SQLSTATE: 42804
```
### How was this patch tested?
Added the related tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49219 from ueshin/issues/SPARK-50602/fix_transpose.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Takuya Ueshin <[email protected]>
---
python/pyspark/sql/tests/test_dataframe.py | 12 ++++++++++++
.../src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../org/apache/spark/sql/DataFrameTransposeSuite.scala | 17 +++++++++++++++++
3 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/tests/test_dataframe.py
b/python/pyspark/sql/tests/test_dataframe.py
index cd6a57429cfa..a0234a527f63 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1044,6 +1044,18 @@ class DataFrameTestsMixin:
messageParameters={"dt1": '"STRING"', "dt2": '"BIGINT"'},
)
+ def test_transpose_with_invalid_index_columns(self):
+ # SPARK-50602: invalid index columns
+ df = self.spark.createDataFrame([{"a": "x", "b": "y", "c": "z"}])
+
+ with self.assertRaises(AnalysisException) as pe:
+ df.transpose(col("a") + 1).collect()
+ self.check_error(
+ exception=pe.exception,
+ errorClass="TRANSPOSE_INVALID_INDEX_COLUMN",
+ messageParameters={"reason": "Index column must be an atomic
attribute"},
+ )
+
class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
def test_query_execution_unsupported_in_classic(self):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c5c9dde901c9..c8c1bacfb9de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1013,7 +1013,7 @@ class Dataset[T] private[sql](
/** @inheritdoc */
def transpose(indexColumn: Column): DataFrame = withPlan {
UnresolvedTranspose(
- Seq(indexColumn.named),
+ Seq(indexColumn.expr),
logicalPlan
)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTransposeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTransposeSuite.scala
index 51de8553216c..ce1c8d7ceb64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTransposeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTransposeSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -173,4 +174,20 @@ class DataFrameTransposeSuite extends QueryTest with
SharedSparkSession {
)
assertResult(Array("key", "A", "B"))(transposedDf.columns)
}
+
+ test("SPARK-50602: invalid index columns") {
+ val df = Seq(
+ ("A", 1, 2),
+ ("B", 3, 4),
+ (null, 5, 6)
+ ).toDF("id", "val1", "val2")
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df.transpose($"id" + lit(1))
+ },
+ condition = "TRANSPOSE_INVALID_INDEX_COLUMN",
+ parameters = Map("reason" -> "Index column must be an atomic attribute")
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]