Repository: spark
Updated Branches:
  refs/heads/master d44a3362e -> 8c3b0052f


[SPARK-6450] [SQL] Fixes metastore Parquet table conversion

The `ParquetConversions` analysis rule generates a hash map, which maps from 
the original `MetastoreRelation` instances to the newly created 
`ParquetRelation2` instances. However, `MetastoreRelation.equals` doesn't 
compare output attributes. Thus, if a single metastore Parquet table appears 
multiple times in a query, only a single entry ends up in the hash map, and the 
conversion is not correctly performed.

Proper fix for this issue should be overriding `equals` and `hashCode` for 
MetastoreRelation. Unfortunately, this breaks more tests than expected. It's 
possible that these tests are ill-formed from the very beginning. As 1.3.1 
release is approaching, we'd like to make the change more surgical to avoid 
potential regressions. The proposed fix here is to make both the metastore 
relations and their output attributes as keys in the hash map used in 
ParquetConversions.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/5183)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes #5183 from liancheng/spark-6450 and squashes the following commits:

3536780 [Cheng Lian] Fixes metastore Parquet table conversion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c3b0052
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c3b0052
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c3b0052

Branch: refs/heads/master
Commit: 8c3b0052f4792d97d23244ade335676e37cb1fae
Parents: d44a336
Author: Cheng Lian <[email protected]>
Authored: Wed Mar 25 17:40:19 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Wed Mar 25 17:40:19 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 34 +++++++++++---------
 .../apache/spark/sql/hive/parquetSuites.scala   | 25 ++++++++++++++
 2 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8c3b0052/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4c5eb48..d1a9955 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation, parquetRelation, attributedRewrites)
+          (relation -> relation.output, parquetRelation, attributedRewrites)
 
         // Write path
         case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
             
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation, parquetRelation, attributedRewrites)
+          (relation -> relation.output, parquetRelation, attributedRewrites)
 
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation, parquetRelation, attributedRewrites)
+          (relation -> relation.output, parquetRelation, attributedRewrites)
       }
 
+      // Quick fix for SPARK-6450: Notice that we're using both the 
MetastoreRelation instances and
+      // their output attributes as the key of the map. This is because 
MetastoreRelation.equals
+      // doesn't take output attributes into account, thus multiple 
MetastoreRelation instances
+      // pointing to the same table get collapsed into a single entry in the 
map. A proper fix for
+      // this should be overriding equals & hashCode in MetastoreRelation.
       val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
       val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ 
++: _))
 
       // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
       // attribute IDs referenced in other nodes.
       plan.transformUp {
-        case r: MetastoreRelation if relationMap.contains(r) => {
-          val parquetRelation = relationMap(r)
-          val withAlias =
-            r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
-              Subquery(r.tableName, parquetRelation))
+        case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
+          val parquetRelation = relationMap(r -> r.output)
+          val alias = r.alias.getOrElse(r.tableName)
+          Subquery(alias, parquetRelation)
 
-          withAlias
-        }
         case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r) => {
-          val parquetRelation = relationMap(r)
+          if relationMap.contains(r -> r.output) =>
+          val parquetRelation = relationMap(r -> r.output)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
-        }
+
         case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite)
-          if relationMap.contains(r) => {
-          val parquetRelation = relationMap(r)
+          if relationMap.contains(r -> r.output) =>
+          val parquetRelation = relationMap(r -> r.output)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
-        }
+
         case other => other.transformExpressions {
           case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/8c3b0052/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 8a31bd0..432d65a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -365,6 +365,31 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     sql("DROP TABLE IF EXISTS test_insert_parquet")
   }
+
+  test("SPARK-6450 regression test") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS ms_convert (key INT)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    // This shouldn't throw AnalysisException
+    val analyzed = sql(
+      """SELECT key FROM ms_convert
+        |UNION ALL
+        |SELECT key FROM ms_convert
+      """.stripMargin).queryExecution.analyzed
+
+    assertResult(2) {
+      analyzed.collect {
+        case r @ LogicalRelation(_: ParquetRelation2) => r
+      }.size
+    }
+
+    sql("DROP TABLE ms_convert")
+  }
 }
 
 class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to