Repository: spark
Updated Branches:
  refs/heads/master 51a6f9097 -> f02394d06


[SPARK-6023][SQL] ParquetConversions fails to replace the destination 
MetastoreRelation of an InsertIntoTable node to ParquetRelation2

JIRA: https://issues.apache.org/jira/browse/SPARK-6023

Author: Yin Huai <yh...@databricks.com>

Closes #4782 from yhuai/parquetInsertInto and squashes the following commits:

ae7e806 [Yin Huai] Convert MetastoreRelation in InsertIntoTable and 
InsertIntoHiveTable.
ba543cd [Yin Huai] More tests.
50b6d0f [Yin Huai] Update error messages.
346780c [Yin Huai] Failed test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f02394d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f02394d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f02394d0

Branch: refs/heads/master
Commit: f02394d06473889d0d7897c4583239e6e136ff46
Parents: 51a6f90
Author: Yin Huai <yh...@databricks.com>
Authored: Thu Feb 26 22:39:49 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Feb 26 22:39:49 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  21 +++
 .../spark/sql/parquet/parquetSuites.scala       | 138 ++++++++++++++++++-
 2 files changed, 152 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f02394d0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2cc8d65..8af5a48 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
           val attributedRewrites = relation.output.zip(parquetRelation.output)
           (relation, parquetRelation, attributedRewrites)
 
+        // Write path
+        case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
+          // Inserting into partitioned table is not supported in Parquet data 
source (yet).
+          if !relation.hiveQlTable.isPartitioned &&
+            hive.convertMetastoreParquet &&
+            hive.conf.parquetUseDataSourceApi &&
+            
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          val parquetRelation = convertToParquetRelation(relation)
+          val attributedRewrites = relation.output.zip(parquetRelation.output)
+          (relation, parquetRelation, attributedRewrites)
+
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
             if hive.convertMetastoreParquet &&
@@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
 
           withAlias
         }
+        case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
+          if relationMap.contains(r) => {
+          val parquetRelation = relationMap(r)
+          InsertIntoTable(parquetRelation, partition, child, overwrite)
+        }
+        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite)
+          if relationMap.contains(r) => {
+          val parquetRelation = relationMap(r)
+          InsertIntoTable(parquetRelation, partition, child, overwrite)
+        }
         case other => other.transformExpressions {
           case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/f02394d0/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 653f4b4..80fd5cd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{SQLConf, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.PhysicalRDD
-import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -93,6 +93,11 @@ class ParquetMetastoreSuiteBase extends 
ParquetPartitioningTest {
       sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
     }
 
+    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str${i}"}"""))
+    jsonRDD(rdd1).registerTempTable("jt")
+    val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, 
null]}"""))
+    jsonRDD(rdd2).registerTempTable("jt_array")
+
     setConf("spark.sql.hive.convertMetastoreParquet", "true")
   }
 
@@ -100,6 +105,8 @@ class ParquetMetastoreSuiteBase extends 
ParquetPartitioningTest {
     sql("DROP TABLE partitioned_parquet")
     sql("DROP TABLE partitioned_parquet_with_key")
     sql("DROP TABLE normal_parquet")
+    sql("DROP TABLE IF EXISTS jt")
+    sql("DROP TABLE IF EXISTS jt_array")
     setConf("spark.sql.hive.convertMetastoreParquet", "false")
   }
 
@@ -122,9 +129,6 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str${i}"}"""))
-    jsonRDD(rdd).registerTempTable("jt")
-
     sql(
       """
         |create table test_parquet
@@ -143,7 +147,6 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
   override def afterAll(): Unit = {
     super.afterAll()
-    sql("DROP TABLE IF EXISTS jt")
     sql("DROP TABLE IF EXISTS test_parquet")
 
     setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
@@ -238,6 +241,70 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     sql("DROP TABLE IF EXISTS test_parquet_ctas")
   }
+
+  test("MetastoreRelation in InsertIntoTable will be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+    df.queryExecution.executedPlan match {
+      case ExecutedCommand(
+        InsertIntoDataSource(
+          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case o => fail("test_insert_parquet should be converted to a " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
+        s"However, found a ${o.toString} ")
+    }
+
+    checkAnswer(
+      sql("SELECT intField FROM test_insert_parquet WHERE 
test_insert_parquet.intField > 5"),
+      sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  test("MetastoreRelation in InsertIntoHiveTable will be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  int_array array<int>
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
+    df.queryExecution.executedPlan match {
+      case ExecutedCommand(
+        InsertIntoDataSource(
+          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case o => fail("test_insert_parquet should be converted to a " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
+        s"However, found a ${o.toString} ")
+    }
+
+    checkAnswer(
+      sql("SELECT int_array FROM test_insert_parquet"),
+      sql("SELECT a FROM jt_array").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
 }
 
 class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
@@ -252,6 +319,63 @@ class ParquetDataSourceOffMetastoreSuite extends 
ParquetMetastoreSuiteBase {
     super.afterAll()
     setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
   }
+
+  test("MetastoreRelation in InsertIntoTable will not be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+    df.queryExecution.executedPlan match {
+      case insert: InsertIntoHiveTable => // OK
+      case o => fail(s"The SparkPlan should be 
${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+        s"However, found ${o.toString}.")
+    }
+
+    checkAnswer(
+      sql("SELECT intField FROM test_insert_parquet WHERE 
test_insert_parquet.intField > 5"),
+      sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  // TODO: enable it after the fix of SPARK-5950.
+  ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  int_array array<int>
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
+    df.queryExecution.executedPlan match {
+      case insert: InsertIntoHiveTable => // OK
+      case o => fail(s"The SparkPlan should be 
${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+        s"However, found ${o.toString}.")
+    }
+
+    checkAnswer(
+      sql("SELECT int_array FROM test_insert_parquet"),
+      sql("SELECT a FROM jt_array").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to