This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 0749938b6109 [SPARK-45896][SQL][3.4] Construct `ValidateExternalType`
with the correct expected type
0749938b6109 is described below
commit 0749938b610970d26d9ee65a17f500483230b0a2
Author: Bruce Robbins <[email protected]>
AuthorDate: Sun Nov 12 18:23:04 2023 -0800
[SPARK-45896][SQL][3.4] Construct `ValidateExternalType` with the correct
expected type
### What changes were proposed in this pull request?
This is a backport of #43770.
When creating a serializer for a `Map` or `Seq` with an element of type
`Option`, pass an expected type of `Option` to `ValidateExternalType` rather
than the `Option`'s type argument.
### Why are the changes needed?
In 3.4.1, 3.5.0, and master, the following code gets an error:
```
scala> val df = Seq(Seq(Some(Seq(0)))).toDF("a")
val df = Seq(Seq(Some(Seq(0)))).toDF("a")
org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed
to encode a value of the expressions: mapobjects(lambdavariable(MapObject,
ObjectType(class java.lang.Object), true, -1),
mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true,
-2), assertnotnull(validateexternaltype(lambdavariable(MapObject,
ObjectType(class java.lang.Object), true, -2), IntegerType, IntegerType)),
unwrapoption(ObjectType(interface scala.collection.immutable.Seq), vali [...]
...
Caused by: java.lang.RuntimeException: scala.Some is not a valid external
type for schema of array<int>
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown
Source)
...
```
However, this code works in 3.3.3.
Similarly, this code gets an error:
```
scala> val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01
00:00:00")))).toDF("a")
val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01
00:00:00")))).toDF("a")
org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed
to encode a value of the expressions: mapobjects(lambdavariable(MapObject,
ObjectType(class java.lang.Object), true, -1), staticinvoke(class
org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType,
fromJavaTimestamp, unwrapoption(ObjectType(class java.sql.Timestamp),
validateexternaltype(lambdavariable(MapObject, ObjectType(class
java.lang.Object), true, -1), TimestampType, ObjectType(class scala.Opti [...]
...
Caused by: java.lang.RuntimeException: scala.Some is not a valid external
type for schema of timestamp
...
```
As with the first example, this code works in 3.3.3.
`ScalaReflection#validateAndSerializeElement` will construct
`ValidateExternalType` with an expected type of the `Option`'s type parameter.
Therefore, for element types `Option[Seq/Date/Timestamp/BigDecimal]`,
`ValidateExternalType` will try to validate that the element is of the
contained type (e.g., `BigDecimal`) rather than of type `Option`. Since the
element type is of type `Option`, the validation fails.
Validation currently works by accident for element types
`Option[Map/<primitive-type]`, simply because in that case
`ValidateExternalType` ignores that passed expected type and tries to validate
based on the encoder's `clsTag` field (which, for the `OptionEncoder`, will be
class `Option`).
### Does this PR introduce _any_ user-facing change?
Other than fixing the bug, no.
### How was this patch tested?
New unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43775 from bersprockets/encoding_error_br34.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/catalyst/ScalaReflection.scala | 7 ++++++-
.../spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala | 12 ++++++++++++
.../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++
3 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index b18613bdad3a..4e4ca4ee0632 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -564,10 +564,15 @@ object ScalaReflection extends ScalaReflection {
private def validateAndSerializeElement(
enc: AgnosticEncoder[_],
nullable: Boolean): Expression => Expression = { input =>
+ val expected = enc match {
+ case OptionEncoder(_) => lenientExternalDataTypeFor(enc)
+ case _ => enc.dataType
+ }
+
expressionWithNullSafety(
serializerFor(
enc,
- ValidateExternalType(input, enc.dataType,
lenientExternalDataTypeFor(enc))),
+ ValidateExternalType(input, expected,
lenientExternalDataTypeFor(enc))),
nullable,
WalkedTypePath())
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 79417c4ca1fe..9e19c3755b24 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -476,6 +476,18 @@ class ExpressionEncoderSuite extends
CodegenInterpretedPlanTest with AnalysisTes
encodeDecodeTest(Option.empty[Int], "empty option of int")
encodeDecodeTest(Option("abc"), "option of string")
encodeDecodeTest(Option.empty[String], "empty option of string")
+ encodeDecodeTest(Seq(Some(Seq(0))), "SPARK-45896: seq of option of seq")
+ encodeDecodeTest(Map(0 -> Some(Seq(0))), "SPARK-45896: map of option of seq")
+ encodeDecodeTest(Seq(Some(Timestamp.valueOf("2023-01-01 00:00:00"))),
+ "SPARK-45896: seq of option of timestamp")
+ encodeDecodeTest(Map(0 -> Some(Timestamp.valueOf("2023-01-01 00:00:00"))),
+ "SPARK-45896: map of option of timestamp")
+ encodeDecodeTest(Seq(Some(Date.valueOf("2023-01-01"))),
+ "SPARK-45896: seq of option of date")
+ encodeDecodeTest(Map(0 -> Some(Date.valueOf("2023-01-01"))),
+ "SPARK-45896: map of option of date")
+ encodeDecodeTest(Seq(Some(BigDecimal(200))), "SPARK-45896: seq of option of
bigdecimal")
+ encodeDecodeTest(Map(0 -> Some(BigDecimal(200))), "SPARK-45896: map of
option of bigdecimal")
encodeDecodeTest(ScroogeLikeExample(1),
"SPARK-40385 class with only a companion object constructor")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 6a1aa25c6e21..7dec558f8df3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -270,6 +270,13 @@ class DatasetSuite extends QueryTest
(ClassData("one", 2), 1L), (ClassData("two", 3), 1L))
}
+ test("SPARK-45896: seq of option of seq") {
+ val ds = Seq(DataSeqOptSeq(Seq(Some(Seq(0))))).toDS()
+ checkDataset(
+ ds,
+ DataSeqOptSeq(Seq(Some(List(0)))))
+ }
+
test("select") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
checkDataset(
@@ -2561,6 +2568,8 @@ case class ClassNullableData(a: String, b: Integer)
case class NestedStruct(f: ClassData)
case class DeepNestedStruct(f: NestedStruct)
+case class DataSeqOptSeq(a: Seq[Option[Seq[Int]]])
+
/**
* A class used to test serialization using encoders. This class throws
exceptions when using
* Java serialization -- so the only way it can be "serialized" is through our
encoders.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]