[
https://issues.apache.org/jira/browse/SPARK-17670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-17670:
---------------------------------
Labels: bulk-closed (was: )
> Spark DataFrame/Dataset no longer supports Option[Map] in case classes
> ----------------------------------------------------------------------
>
> Key: SPARK-17670
> URL: https://issues.apache.org/jira/browse/SPARK-17670
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Daniel Williams
> Priority: Major
> Labels: bulk-closed
>
> Upon upgrading to Spark 2.0 I discovered that previously supported case
> classes containing members of the type Option[Map] of any key/value binding,
> mutable or immutable, were no longer supported and produced an exception
> similar to the following. Upon further testing I also noticed that Option
> was support for Seq, case classes, and primitives. Validating unit tests
> included using spark-testing-base.
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve
> 'wrapoption(staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface
> scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32,
> MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32,
> MapObjects_loopIsNull33, StringType).toString,
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31,
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true),
> StructField(sourceSystem,StringType,true),
> StructField(input,MapType(StringType,StringType,true),true)).input as
> map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34,
> MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34,
> MapObjects_loopIsNull35, StringType).toString,
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31,
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true),
> StructField(sourceSystem,StringType,true),
> StructField(input,MapType(StringType,StringType,true),true)).input as
> map<string,string>).valueArray).array, true), ObjectType(interface
> scala.collection.immutable.Map))' due to data type mismatch: argument 1
> requires scala.collection.immutable.Map type, however, 'staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface
> scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32,
> MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32,
> MapObjects_loopIsNull33, StringType).toString,
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31,
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true),
> StructField(sourceSystem,StringType,true),
> StructField(input,MapType(StringType,StringType,true),true)).input as
> map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34,
> MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34,
> MapObjects_loopIsNull35, StringType).toString,
> cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31,
> StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true),
> StructField(sourceSystem,StringType,true),
> StructField(input,MapType(StringType,StringType,true),true)).input as
> map<string,string>).valueArray).array, true)' is of scala.collection.Map
> type.;
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
> {code}
> Unit tests:
> {code}
> import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
> import org.scalatest.{Matchers, FunSuite}
> import org.slf4j.LoggerFactory
> import scala.util.{Failure, Try, Success}
> case class ImmutableMapTest(data: Map[String, String])
> case class MapTest(data: scala.collection.mutable.Map[String, String])
> case class ImmtableWithOption(data: Option[Map[String, String]])
> case class MutableWithOption(data:
> Option[scala.collection.mutable.Map[String, String]])
> case class PrimWithOption(data: Option[String])
> case class ArrayWithOption(data: Option[Seq[String]])
> class TestOptionWithDataTypes
> extends FunSuite
> with Matchers
> with SharedSparkContext
> with DataFrameSuiteBase {
> val logger = LoggerFactory.getLogger(classOf[TestOptionWithDataTypes])
> test("test immutable map") {
> import sqlContext.implicits._
> val rdd = sc.parallelize(Seq(ImmutableMapTest(Map("1"->"2"))))
> val result = Try {
> rdd.toDF()
> } match {
> case Success(e) => Option(e)
> case Failure(e) => {
> logger.error(e.getMessage, e)
> None
> }
> }
> result should not be(None)
> result.get.count() should be(1)
> result.get.show()
> }
> test("test mutable Map") {
> import sqlContext.implicits._
> val rdd =
> sc.parallelize(Seq(MapTest(scala.collection.mutable.Map("1"->"2"))))
> val result = Try {
> rdd.toDF()
> } match {
> case Success(e) => Option(e)
> case Failure(e) => {
> logger.error(e.getMessage, e)
> None
> }
> }
> result should not be(None)
> result.get.count() should be(1)
> result.get.show()
> }
> test("test immutable option Map") {
> import sqlContext.implicits._
> val rdd = sc.parallelize(Seq(ImmtableWithOption(Option(Map("1"->"2")))))
> val result = Try {
> rdd.toDF()
> } match {
> case Success(e) => Option(e)
> case Failure(e) => {
> logger.error(e.getMessage, e)
> None
> }
> }
> result should not be(None)
> result.get.count() should be(1)
> result.get.show()
> }
> test("test mutable option Map") {
> import sqlContext.implicits._
> val rdd =
> sc.parallelize(Seq(MutableWithOption(Option(scala.collection.mutable.Map("1"->"2")))))
> val result = Try {
> rdd.toDF()
> } match {
> case Success(e) => Option(e)
> case Failure(e) => {
> logger.error(e.getMessage, e)
> None
> }
> }
> result should not be(None)
> result.get.count() should be(1)
> result.get.show()
> }
> test("test option with prim") {
> import sqlContext.implicits._
> val rdd = sc.parallelize(Seq(PrimWithOption(Option("foo"))))
> val result = Try {
> rdd.toDF()
> } match {
> case Success(e) => Option(e)
> case Failure(e) => {
> logger.error(e.getMessage, e)
> None
> }
> }
> result should not be(None)
> result.get.count() should be(1)
> result.get.show()
> }
> test("test option with array") {
> import sqlContext.implicits._
> val rdd = sc.parallelize(Seq(ArrayWithOption(Option(Seq("foo")))))
> val result = Try {
> rdd.toDF()
> } match {
> case Success(e) => Option(e)
> case Failure(e) => {
> logger.error(e.getMessage, e)
> None
> }
> }
> result should not be(None)
> result.get.count() should be(1)
> result.get.show()
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]