Repository: spark
Updated Branches:
  refs/heads/master ffee4f1ce -> fb9beda54


[SPARK-19893][SQL] should not run DataFrame set oprations with map type

## What changes were proposed in this pull request?

In spark SQL, map type can't be used in equality test/comparison, and 
`Intersect`/`Except`/`Distinct` do need equality test for all columns, we 
should not allow map type in `Intersect`/`Except`/`Distinct`.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #17236 from cloud-fan/map.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb9beda5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb9beda5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb9beda5

Branch: refs/heads/master
Commit: fb9beda54622e0c3190c6504fc468fa4e50eeb45
Parents: ffee4f1
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Mar 10 16:14:22 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Mar 10 16:14:22 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 25 +++++++++++++++++---
 .../org/apache/spark/sql/DataFrameSuite.scala   | 19 +++++++++++++++
 .../columnar/InMemoryColumnarQuerySuite.scala   | 14 +++++------
 3 files changed, 47 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb9beda5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7529f90..d32fbeb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -44,6 +44,18 @@ trait CheckAnalysis extends PredicateHelper {
     }).length > 1
   }
 
+  protected def hasMapType(dt: DataType): Boolean = {
+    dt.existsRecursively(_.isInstanceOf[MapType])
+  }
+
+  protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] 
= plan match {
+    case _: Intersect | _: Except | _: Distinct =>
+      plan.output.find(a => hasMapType(a.dataType))
+    case d: Deduplicate =>
+      d.keys.find(a => hasMapType(a.dataType))
+    case _ => None
+  }
+
   private def checkLimitClause(limitExpr: Expression): Unit = {
     limitExpr match {
       case e if !e.foldable => failAnalysis(
@@ -121,8 +133,7 @@ trait CheckAnalysis extends PredicateHelper {
             if (conditions.isEmpty && query.output.size != 1) {
               failAnalysis(
                 s"Scalar subquery must return only one column, but got 
${query.output.size}")
-            }
-            else if (conditions.nonEmpty) {
+            } else if (conditions.nonEmpty) {
               // Collect the columns from the subquery for further checking.
               var subqueryColumns = 
conditions.flatMap(_.references).filter(query.output.contains)
 
@@ -200,7 +211,7 @@ trait CheckAnalysis extends PredicateHelper {
               s"filter expression '${f.condition.sql}' " +
                 s"of type ${f.condition.dataType.simpleString} is not a 
boolean.")
 
-          case f @ Filter(condition, child) =>
+          case Filter(condition, _) =>
             splitConjunctivePredicates(condition).foreach {
               case _: PredicateSubquery | Not(_: PredicateSubquery) =>
               case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) =>
@@ -374,6 +385,14 @@ trait CheckAnalysis extends PredicateHelper {
                  |Conflicting attributes: 
${conflictingAttributes.mkString(",")}
                """.stripMargin)
 
+          // TODO: although map type is not orderable, technically map type 
should be able to be
+          // used in equality comparison, remove this type check once we 
support it.
+          case o if mapColumnInSetOperation(o).isDefined =>
+            val mapCol = mapColumnInSetOperation(o).get
+            failAnalysis("Cannot have map type columns in DataFrame which 
calls " +
+              s"set operations(intersect, except, etc.), but the type of 
column ${mapCol.name} " +
+              "is " + mapCol.dataType.simpleString)
+
           case o if o.expressions.exists(!_.deterministic) &&
             !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
             !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/fb9beda5/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 19c2d55..52bd4e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1703,4 +1703,23 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
     val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as 
x").selectExpr("percentile(x, 0.5)")
     checkAnswer(df, Row(BigDecimal(0.0)) :: Nil)
   }
+
+  test("SPARK-19893: cannot run set operations with map type") {
+    val df = spark.range(1).select(map(lit("key"), $"id").as("m"))
+    val e = intercept[AnalysisException](df.intersect(df))
+    assert(e.message.contains(
+      "Cannot have map type columns in DataFrame which calls set operations"))
+    val e2 = intercept[AnalysisException](df.except(df))
+    assert(e2.message.contains(
+      "Cannot have map type columns in DataFrame which calls set operations"))
+    val e3 = intercept[AnalysisException](df.distinct())
+    assert(e3.message.contains(
+      "Cannot have map type columns in DataFrame which calls set operations"))
+    withTempView("v") {
+      df.createOrReplaceTempView("v")
+      val e4 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v"))
+      assert(e4.message.contains(
+        "Cannot have map type columns in DataFrame which calls set 
operations"))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb9beda5/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index f355a52..0250a53 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -234,8 +234,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
       Seq(StringType, BinaryType, NullType, BooleanType,
         ByteType, ShortType, IntegerType, LongType,
         FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-        DateType, TimestampType,
-        ArrayType(IntegerType), MapType(StringType, LongType), struct)
+        DateType, TimestampType, ArrayType(IntegerType), struct)
     val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
       StructField(s"col$index", dataType, true)
     }
@@ -244,10 +243,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 
     // Create an RDD for the schema
     val rdd =
-      sparkContext.parallelize((1 to 10000), 10).map { i =>
+      sparkContext.parallelize(1 to 10000, 10).map { i =>
         Row(
-          s"str${i}: test cache.",
-          s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8),
+          s"str$i: test cache.",
+          s"binary$i: test cache.".getBytes(StandardCharsets.UTF_8),
           null,
           i % 2 == 0,
           i.toByte,
@@ -255,13 +254,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
           i,
           Long.MaxValue - i.toLong,
           (i + 0.25).toFloat,
-          (i + 0.75),
+          i + 0.75,
           BigDecimal(Long.MaxValue.toString + ".12345"),
           new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"),
           new Date(i),
           new Timestamp(i * 1000000L),
-          (i to i + 10).toSeq,
-          (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap,
+          i to i + 10,
           Row((i - 0.25).toFloat, Seq(true, false, null)))
       }
     spark.createDataFrame(rdd, 
schema).createOrReplaceTempView("InMemoryCache_different_data_types")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to