Repository: spark Updated Branches: refs/heads/branch-1.1 35a585355 -> 5d981a49c
[SPARK-3063][SQL] ExistingRdd should convert Map to catalyst Map. Currently `ExistingRdd.convertToCatalyst` doesn't convert `Map` value. Author: Takuya UESHIN <[email protected]> Closes #1963 from ueshin/issues/SPARK-3063 and squashes the following commits: 3ba41f2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 4d7bae2 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 9321379 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-3063 d8a900a [Takuya UESHIN] Make ExistingRdd.convertToCatalyst be able to convert Map value. (cherry picked from commit 6b5584ef1c605cd30f25dbe7099ab32aea1746fb) Signed-off-by: Michael Armbrust <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d981a49 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d981a49 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d981a49 Branch: refs/heads/branch-1.1 Commit: 5d981a49c846db8e08bba08d46cf4bba45609e76 Parents: 35a5853 Author: Takuya UESHIN <[email protected]> Authored: Tue Aug 26 15:04:08 2014 -0700 Committer: Michael Armbrust <[email protected]> Committed: Tue Aug 26 15:04:23 2014 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/basicOperators.scala | 3 +- .../sql/ScalaReflectionRelationSuite.scala | 46 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5d981a49/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index f9dfa3c..374af48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -206,7 +206,8 @@ case class Sort( object ExistingRdd { def convertToCatalyst(a: Any): Any = a match { case o: Option[_] => o.orNull - case s: Seq[Any] => s.map(convertToCatalyst) + case s: Seq[_] => s.map(convertToCatalyst) + case m: Map[_, _] => m.map { case (k, v) => convertToCatalyst(k) -> convertToCatalyst(v) } case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray) case other => other } http://git-wip-us.apache.org/repos/asf/spark/blob/5d981a49/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala index 5b84c65..e24c521 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala @@ -21,6 +21,7 @@ import java.sql.Timestamp import org.scalatest.FunSuite +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test.TestSQLContext._ case class ReflectData( @@ -56,6 +57,22 @@ case class OptionalReflectData( case class ReflectBinary(data: Array[Byte]) +case class Nested(i: Option[Int], s: String) + +case class Data( + array: Seq[Int], + arrayContainsNull: Seq[Option[Int]], + map: Map[Int, Long], + mapContainsNul: Map[Int, Option[Long]], + nested: Nested) + +case class ComplexReflectData( + arrayField: Seq[Int], + arrayFieldContainsNull: Seq[Option[Int]], + mapField: Map[Int, Long], + mapFieldContainsNull: Map[Int, Option[Long]], + dataField: Data) + class ScalaReflectionRelationSuite extends FunSuite { test("query case class RDD") { val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true, @@ -90,4 +107,33 @@ class ScalaReflectionRelationSuite extends FunSuite { val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]] assert(result.toSeq === Seq[Byte](1)) } + + test("query complex data") { + val data = ComplexReflectData( + Seq(1, 2, 3), + Seq(Some(1), Some(2), None), + Map(1 -> 10L, 2 -> 20L), + Map(1 -> Some(10L), 2 -> Some(20L), 3 -> None), + Data( + Seq(10, 20, 30), + Seq(Some(10), Some(20), None), + Map(10 -> 100L, 20 -> 200L), + Map(10 -> Some(100L), 20 -> Some(200L), 30 -> None), + Nested(None, "abc"))) + val rdd = sparkContext.parallelize(data :: Nil) + rdd.registerTempTable("reflectComplexData") + + assert(sql("SELECT * FROM reflectComplexData").collect().head === + new GenericRow(Array[Any]( + Seq(1, 2, 3), + Seq(1, 2, null), + Map(1 -> 10L, 2 -> 20L), + Map(1 -> 10L, 2 -> 20L, 3 -> null), + new GenericRow(Array[Any]( + Seq(10, 20, 30), + Seq(10, 20, null), + Map(10 -> 100L, 20 -> 200L), + Map(10 -> 100L, 20 -> 200L, 30 -> null), + new GenericRow(Array[Any](null, "abc"))))))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
