This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 70c322a  [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path 
fails for Map with case classes as keys or values
70c322a is described below

commit 70c322ad041511ded6e531d92ffc64c11bfdc378
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Thu Jun 10 09:37:27 2021 -0700

    [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map 
with case classes as keys or values
    
    ### What changes were proposed in this pull request?
    Use the key/value LambdaFunction to convert the elements instead of
    using CatalystTypeConverters.createToScalaConverter. This is how it is
    done in MapObjects and that correctly handles Arrays with case classes.
    
    ### Why are the changes needed?
    Before these changes the added test cases would fail with the following:
    ```
    [info] - encode/decode for map with case class as value: Map(1 -> 
IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
    [info]   Encoded/Decoded data does not match input data
    [info]
    [info]   in:  Map(1 -> IntAndString(1,a))
    [info]   out: Map(1 -> [1,a])
    [info]   types: scala.collection.immutable.Map$Map1 [info]
    [info]   Encoded Data: 
[org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
    [info]   Schema: value#823
    [info]   root
    [info]   -- value: map (nullable = true)
    [info]       |-- key: integer
    [info]       |-- value: struct (valueContainsNull = true)
    [info]       |    |-- i: integer (nullable = false)
    [info]       |    |-- s: string (nullable = true)
    [info]
    [info]
    [info]   fromRow Expressions:
    [info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, 
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, 
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), 
if (isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) 
null else newInstance(class org.apache.spark.sql.catalyst.encoders [...]
    [info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 
178)
    [info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 
178)
    [info]   :- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
    [info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) 
null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
    [info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
    [info]   :  :  +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
    [info]   :  :- null
    [info]   :  +- newInstance(class 
org.apache.spark.sql.catalyst.encoders.IntAndString)
    [info]   :     :- assertnotnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
    [info]   :     :  +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
    [info]   :     :     +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
    [info]   :     +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 
179).s.toString
    [info]   :        +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
    [info]   :           +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
    [info]   +- input[0, map<int,struct<i:int,s:string>>, true] 
(ExpressionEncoderSuite.scala:627)
    ```
    So using a map with cases classes for keys or values and using the 
interpreted path would incorrect deserialize data from the catalyst 
representation.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it fixes the bug.
    
    ### How was this patch tested?
    Existing and new unit tests in the ExpressionEncoderSuite
    
    Closes #32783 from 
eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.
    
    Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
    (cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87)
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../spark/sql/catalyst/expressions/objects/objects.scala   | 14 ++++++--------
 .../sql/catalyst/encoders/ExpressionEncoderSuite.scala     |  5 +++++
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index f78e3f5..dc5bd02 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -27,7 +27,7 @@ import scala.util.{Properties, Try}
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
ScalaReflection}
+import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -1117,11 +1117,6 @@ case class CatalystToExternalMap private(
 
   private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]
 
-  private lazy val keyConverter =
-    CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
-  private lazy val valueConverter =
-    CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)
-
   private lazy val (newMapBuilderMethod, moduleField) = {
     val clazz = Utils.classForName(collClass.getCanonicalName + "$")
     (clazz.getMethod("newBuilder"), clazz.getField("MODULE$").get(null))
@@ -1138,10 +1133,13 @@ case class CatalystToExternalMap private(
       builder.sizeHint(result.numElements())
       val keyArray = result.keyArray()
       val valueArray = result.valueArray()
+      val row = new GenericInternalRow(1)
       var i = 0
       while (i < result.numElements()) {
-        val key = keyConverter(keyArray.get(i, inputMapType.keyType))
-        val value = valueConverter(valueArray.get(i, inputMapType.valueType))
+        row.update(0, keyArray.get(i, inputMapType.keyType))
+        val key = keyLambdaFunction.eval(row)
+        row.update(0, valueArray.get(i, inputMapType.valueType))
+        val value = valueLambdaFunction.eval(row)
         builder += Tuple2(key, value)
         i += 1
       }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 7faab4e..bf4afac 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -114,6 +114,7 @@ case class ReferenceValueClass(wrapped: 
ReferenceValueClass.Container) extends A
 object ReferenceValueClass {
   case class Container(data: Int)
 }
+case class IntAndString(i: Int, s: String)
 
 class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with 
AnalysisTest {
   OuterScopes.addOuterScope(this)
@@ -174,6 +175,10 @@ class ExpressionEncoderSuite extends 
CodegenInterpretedPlanTest with AnalysisTes
   encodeDecodeTest(Map(1 -> "a", 2 -> "b"), "map")
   encodeDecodeTest(Map(1 -> "a", 2 -> null), "map with null")
   encodeDecodeTest(Map(1 -> Map("a" -> 1), 2 -> Map("b" -> 2)), "map of map")
+  encodeDecodeTest(Map(1 -> IntAndString(1, "a")), "map with case class as 
value")
+  encodeDecodeTest(Map(IntAndString(1, "a") -> 1), "map with case class as 
key")
+  encodeDecodeTest(Map(IntAndString(1, "a") -> IntAndString(2, "b")),
+    "map with case class as key and value")
 
   encodeDecodeTest(Tuple1[Seq[Int]](null), "null seq in tuple")
   encodeDecodeTest(Tuple1[Map[String, String]](null), "null map in tuple")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to