This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f3ba9d9 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path
fails for Map with case classes as keys or values
f3ba9d9 is described below
commit f3ba9d9408352e6b0ac6d1fc02d4d9c3a91b5952
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Thu Jun 10 09:37:27 2021 -0700
[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map
with case classes as keys or values
### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.
### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 ->
IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info] Encoded/Decoded data does not match input data
[info]
[info] in: Map(1 -> IntAndString(1,a))
[info] out: Map(1 -> [1,a])
[info] types: scala.collection.immutable.Map$Map1 [info]
[info] Encoded Data:
[org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info] Schema: value#823
[info] root
[info] -- value: map (nullable = true)
[info] |-- key: integer
[info] |-- value: struct (valueContainsNull = true)
[info] | |-- i: integer (nullable = false)
[info] | |-- s: string (nullable = true)
[info]
[info]
[info] fromRow Expressions:
[info] catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key,
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key,
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179),
if (isnull(lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)))
null else newInstance(class org.apache.spark.sql.catalyst.encoders [...]
[info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false,
178)
[info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false,
178)
[info] :- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info] :- if (isnull(lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)))
null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info] : :- isnull(lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info] : : +- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info] : :- null
[info] : +- newInstance(class
org.apache.spark.sql.catalyst.encoders.IntAndString)
[info] : :- assertnotnull(lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info] : : +- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info] : : +- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info] : +- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true,
179).s.toString
[info] : +- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info] : +- lambdavariable(CatalystToExternalMap_value,
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info] +- input[0, map<int,struct<i:int,s:string>>, true]
(ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the
interpreted path would incorrect deserialize data from the catalyst
representation.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.
### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite
Closes #32783 from
eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.
Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../spark/sql/catalyst/expressions/objects/objects.scala | 14 ++++++--------
.../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 5 +++++
2 files changed, 11 insertions(+), 8 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index e5e9999..4d4ebc3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -27,7 +27,7 @@ import scala.util.Try
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.serializer._
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow,
ScalaReflection}
+import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -1075,11 +1075,6 @@ case class CatalystToExternalMap private(
private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]
- private lazy val keyConverter =
- CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
- private lazy val valueConverter =
- CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)
-
private lazy val (newMapBuilderMethod, moduleField) = {
val clazz = Utils.classForName(collClass.getCanonicalName + "$")
(clazz.getMethod("newBuilder"), clazz.getField("MODULE$").get(null))
@@ -1096,10 +1091,13 @@ case class CatalystToExternalMap private(
builder.sizeHint(result.numElements())
val keyArray = result.keyArray()
val valueArray = result.valueArray()
+ val row = new GenericInternalRow(1)
var i = 0
while (i < result.numElements()) {
- val key = keyConverter(keyArray.get(i, inputMapType.keyType))
- val value = valueConverter(valueArray.get(i, inputMapType.valueType))
+ row.update(0, keyArray.get(i, inputMapType.keyType))
+ val key = keyLambdaFunction.eval(row)
+ row.update(0, valueArray.get(i, inputMapType.valueType))
+ val value = valueLambdaFunction.eval(row)
builder += Tuple2(key, value)
i += 1
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 67c8ccb..00418ff 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -114,6 +114,7 @@ case class ReferenceValueClass(wrapped:
ReferenceValueClass.Container) extends A
object ReferenceValueClass {
case class Container(data: Int)
}
+case class IntAndString(i: Int, s: String)
class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with
AnalysisTest {
OuterScopes.addOuterScope(this)
@@ -174,6 +175,10 @@ class ExpressionEncoderSuite extends
CodegenInterpretedPlanTest with AnalysisTes
encodeDecodeTest(Map(1 -> "a", 2 -> "b"), "map")
encodeDecodeTest(Map(1 -> "a", 2 -> null), "map with null")
encodeDecodeTest(Map(1 -> Map("a" -> 1), 2 -> Map("b" -> 2)), "map of map")
+ encodeDecodeTest(Map(1 -> IntAndString(1, "a")), "map with case class as
value")
+ encodeDecodeTest(Map(IntAndString(1, "a") -> 1), "map with case class as
key")
+ encodeDecodeTest(Map(IntAndString(1, "a") -> IntAndString(2, "b")),
+ "map with case class as key and value")
encodeDecodeTest(Tuple1[Seq[Int]](null), "null seq in tuple")
encodeDecodeTest(Tuple1[Map[String, String]](null), "null map in tuple")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]