[ 
https://issues.apache.org/jira/browse/SPARK-17670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-17670:
---------------------------------
    Labels: bulk-closed  (was: )

> Spark DataFrame/Dataset no longer supports Option[Map] in case classes
> ----------------------------------------------------------------------
>
>                 Key: SPARK-17670
>                 URL: https://issues.apache.org/jira/browse/SPARK-17670
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Daniel Williams
>            Priority: Major
>              Labels: bulk-closed
>
> Upon upgrading to Spark 2.0 I discovered that previously supported case 
> classes containing members of the type Option[Map] of any key/value binding, 
> mutable or immutable, were no longer supported and produced an exception 
> similar to the following.  Upon further testing I also noticed that Option 
> was support for Seq, case classes, and primitives.  Validating unit tests 
> included using spark-testing-base.
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'wrapoption(staticinvoke(class 
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface 
> scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, 
> MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, 
> MapObjects_loopIsNull33, StringType).toString, 
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
> StructField(sourceSystem,StringType,true), 
> StructField(input,MapType(StringType,StringType,true),true)).input as 
> map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, 
> MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, 
> MapObjects_loopIsNull35, StringType).toString, 
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
> StructField(sourceSystem,StringType,true), 
> StructField(input,MapType(StringType,StringType,true),true)).input as 
> map<string,string>).valueArray).array, true), ObjectType(interface 
> scala.collection.immutable.Map))' due to data type mismatch: argument 1 
> requires scala.collection.immutable.Map type, however, 'staticinvoke(class 
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface 
> scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, 
> MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, 
> MapObjects_loopIsNull33, StringType).toString, 
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
> StructField(sourceSystem,StringType,true), 
> StructField(input,MapType(StringType,StringType,true),true)).input as 
> map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, 
> MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, 
> MapObjects_loopIsNull35, StringType).toString, 
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
> StructField(sourceSystem,StringType,true), 
> StructField(input,MapType(StringType,StringType,true),true)).input as 
> map<string,string>).valueArray).array, true)' is of scala.collection.Map 
> type.;
> at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
> {code}
> Unit tests:
> {code}
> import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
> import org.scalatest.{Matchers, FunSuite}
> import org.slf4j.LoggerFactory
> import scala.util.{Failure, Try, Success}
> case class ImmutableMapTest(data: Map[String, String])
> case class MapTest(data: scala.collection.mutable.Map[String, String])
> case class ImmtableWithOption(data: Option[Map[String, String]])
> case class MutableWithOption(data: 
> Option[scala.collection.mutable.Map[String, String]])
> case class PrimWithOption(data: Option[String])
> case class ArrayWithOption(data: Option[Seq[String]])
> class TestOptionWithDataTypes
>   extends FunSuite
>     with Matchers
>     with SharedSparkContext
>     with DataFrameSuiteBase {
>   val logger = LoggerFactory.getLogger(classOf[TestOptionWithDataTypes])
>   test("test immutable map") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(ImmutableMapTest(Map("1"->"2"))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test mutable Map") {
>     import sqlContext.implicits._
>     val rdd = 
> sc.parallelize(Seq(MapTest(scala.collection.mutable.Map("1"->"2"))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test immutable option Map") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(ImmtableWithOption(Option(Map("1"->"2")))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test mutable option Map") {
>     import sqlContext.implicits._
>     val rdd = 
> sc.parallelize(Seq(MutableWithOption(Option(scala.collection.mutable.Map("1"->"2")))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test option with prim") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(PrimWithOption(Option("foo"))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
>   test("test option with array") {
>     import sqlContext.implicits._
>     val rdd = sc.parallelize(Seq(ArrayWithOption(Option(Seq("foo")))))
>     val result = Try {
>       rdd.toDF()
>     } match {
>       case Success(e) => Option(e)
>       case Failure(e) => {
>         logger.error(e.getMessage, e)
>         None
>       }
>     }
>     result should not be(None)
>     result.get.count() should be(1)
>     result.get.show()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to