Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5636c4a58 -> 62063b7a3


[SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog

Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the 
given plan multiple times if there are many Metastore Parquet tables. Since the 
transformUp operation is recursive, it should be better to only perform it once.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #4651 from viirya/parquet_atonce and squashes the following commits:

c1ed29d [Liang-Chi Hsieh] Fix bug.
e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once.

(cherry picked from commit 4611de1cef7363bc71ec608560dfd866ae477747)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62063b7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62063b7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62063b7a

Branch: refs/heads/branch-1.3
Commit: 62063b7a3e2db9fc7320739d3b900a7840c2dee7
Parents: 5636c4a
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Tue Feb 17 12:23:18 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Feb 17 12:23:26 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 37 +++++++++++---------
 1 file changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62063b7a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0e43faa..cfd6f27 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
               hive.convertMetastoreParquet &&
               hive.conf.parquetUseDataSourceApi &&
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-          relation
+          val parquetRelation = convertToParquetRelation(relation)
+          val attributedRewrites = relation.output.zip(parquetRelation.output)
+          (relation, parquetRelation, attributedRewrites)
 
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
             if hive.convertMetastoreParquet &&
               hive.conf.parquetUseDataSourceApi &&
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-          relation
+          val parquetRelation = convertToParquetRelation(relation)
+          val attributedRewrites = relation.output.zip(parquetRelation.output)
+          (relation, parquetRelation, attributedRewrites)
       }
 
+      val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
+      val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ 
++: _))
+
       // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
       // attribute IDs referenced in other nodes.
-      toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
-        val parquetRelation = convertToParquetRelation(relation)
-        val attributedRewrites = 
AttributeMap(relation.output.zip(parquetRelation.output))
-
-        lastPlan.transformUp {
-          case r: MetastoreRelation if r == relation => {
-            val withAlias =
-              r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
-                Subquery(r.tableName, parquetRelation))
-
-            withAlias
-          }
-          case other => other.transformExpressions {
-            case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, 
a)
-          }
+      plan.transformUp {
+        case r: MetastoreRelation if relationMap.contains(r) => {
+          val parquetRelation = relationMap(r)
+          val withAlias =
+            r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+              Subquery(r.tableName, parquetRelation))
+
+          withAlias
+        }
+        case other => other.transformExpressions {
+          case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to