[
https://issues.apache.org/jira/browse/SPARK-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14693468#comment-14693468
]
Herman van Hovell commented on SPARK-9813:
------------------------------------------
It turns out that the columns for the {{UNION ALL}} operator, get widened by
the {{WidenSetOperationTypes}} rule:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L200-L242
This assumes that the {{left}} and {{right}} logical plans have the same number
of columns:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L207
Somehow the rule that should prevent the {{left}} and {{right}} from having
different inputs lengths (SPARK-8328) is not getting triggered. This creates a
very interesting situation when there is no input widening:
{noformat}
val data1 = (1 to 5).map(v => v -> v.toString).toDF("v_int", "v_str")
val data2 = (4 to 7).map(Tuple1.apply).toDF("w_int")
val data3 = data1.unionAll(data2)
scala> data3.printSchema
root
|-- v_int: integer (nullable = false)
|-- v_str: string (nullable = true)
scala> data3.show()
+-----+-----+
|v_int|v_str|
+-----+-----+
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 4|
| 5|
| 6|
| 7|
+-----+-----+
{noformat}
I would call that a bug.
@ [~joshrosen] is this a blocker for 1.5?
> Incorrect UNION ALL behavior
> ----------------------------
>
> Key: SPARK-9813
> URL: https://issues.apache.org/jira/browse/SPARK-9813
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, SQL
> Affects Versions: 1.4.1
> Environment: Ubuntu on AWS
> Reporter: Simeon Simeonov
> Labels: sql, union
>
> According to the [Hive Language
> Manual|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union]
> for UNION ALL:
> {quote}
> The number and names of columns returned by each select_statement have to be
> the same. Otherwise, a schema error is thrown.
> {quote}
> Spark SQL silently swallows an error when the tables being joined with UNION
> ALL have the same number of columns but different names.
> Reproducible example:
> {code}
> // This test is meant to run in spark-shell
> import java.io.File
> import java.io.PrintWriter
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.SaveMode
> val ctx = sqlContext.asInstanceOf[HiveContext]
> import ctx.implicits._
> def dataPath(name:String) = sys.env("HOME") + "/" + name + ".jsonlines"
> def tempTable(name: String, json: String) = {
> val path = dataPath(name)
> new PrintWriter(path) { write(json); close }
> ctx.read.json("file://" + path).registerTempTable(name)
> }
> // Note category vs. cat names of first column
> tempTable("test_one", """{"category" : "A", "num" : 5}""")
> tempTable("test_another", """{"cat" : "A", "num" : 5}""")
> // +--------+---+
> // |category|num|
> // +--------+---+
> // | A| 5|
> // | A| 5|
> // +--------+---+
> //
> // Instead, an error should have been generated due to incompatible schema
> ctx.sql("select * from test_one union all select * from test_another").show
> // Cleanup
> new File(dataPath("test_one")).delete()
> new File(dataPath("test_another")).delete()
> {code}
> When the number of columns is different, Spark can even mix in datatypes.
> Reproducible example (requires a new spark-shell session):
> {code}
> // This test is meant to run in spark-shell
> import java.io.File
> import java.io.PrintWriter
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.SaveMode
> val ctx = sqlContext.asInstanceOf[HiveContext]
> import ctx.implicits._
> def dataPath(name:String) = sys.env("HOME") + "/" + name + ".jsonlines"
> def tempTable(name: String, json: String) = {
> val path = dataPath(name)
> new PrintWriter(path) { write(json); close }
> ctx.read.json("file://" + path).registerTempTable(name)
> }
> // Note test_another is missing category column
> tempTable("test_one", """{"category" : "A", "num" : 5}""")
> tempTable("test_another", """{"num" : 5}""")
> // +--------+
> // |category|
> // +--------+
> // | A|
> // | 5|
> // +--------+
> //
> // Instead, an error should have been generated due to incompatible schema
> ctx.sql("select * from test_one union all select * from test_another").show
> // Cleanup
> new File(dataPath("test_one")).delete()
> new File(dataPath("test_another")).delete()
> {code}
> At other times, when the schema are complex, Spark SQL produces a misleading
> error about an unresolved Union operator:
> {code}
> scala> ctx.sql("""select * from view_clicks
> | union all
> | select * from view_clicks_aug
> | """)
> 15/08/11 02:40:25 INFO ParseDriver: Parsing command: select * from view_clicks
> union all
> select * from view_clicks_aug
> 15/08/11 02:40:25 INFO ParseDriver: Parse Completed
> 15/08/11 02:40:25 INFO HiveMetaStore: 0: get_table : db=default
> tbl=view_clicks
> 15/08/11 02:40:25 INFO audit: ugi=ubuntu ip=unknown-ip-addr
> cmd=get_table : db=default tbl=view_clicks
> 15/08/11 02:40:25 INFO HiveMetaStore: 0: get_table : db=default
> tbl=view_clicks
> 15/08/11 02:40:25 INFO audit: ugi=ubuntu ip=unknown-ip-addr
> cmd=get_table : db=default tbl=view_clicks
> 15/08/11 02:40:25 INFO HiveMetaStore: 0: get_table : db=default
> tbl=view_clicks_aug
> 15/08/11 02:40:25 INFO audit: ugi=ubuntu ip=unknown-ip-addr
> cmd=get_table : db=default tbl=view_clicks_aug
> 15/08/11 02:40:25 INFO HiveMetaStore: 0: get_table : db=default
> tbl=view_clicks_aug
> 15/08/11 02:40:25 INFO audit: ugi=ubuntu ip=unknown-ip-addr
> cmd=get_table : db=default tbl=view_clicks_aug
> org.apache.spark.sql.AnalysisException: unresolved operator 'Union;
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:126)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:97)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:97)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:97)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:97)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:97)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:97)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:755){code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]