Repository: spark
Updated Branches:
  refs/heads/branch-2.1 80f58510a -> 4424c901e


[SPARK-18370][SQL] Add table information to InsertIntoHadoopFsRelationCommand

## What changes were proposed in this pull request?
`InsertIntoHadoopFsRelationCommand` does not keep track if it inserts into a 
table and what table it inserts to. This can make debugging these statements 
problematic. This PR adds table information the 
`InsertIntoHadoopFsRelationCommand`. Explaining this SQL command `insert into 
prq select * from range(0, 100000)` now yields the following executed plan:
```
== Physical Plan ==
ExecutedCommand
   +- InsertIntoHadoopFsRelationCommand file:/dev/assembly/spark-warehouse/prq, 
ParquetFormat, <function1>, Map(serialization.format -> 1, path -> 
file:/dev/assembly/spark-warehouse/prq), Append, CatalogTable(
        Table: `default`.`prq`
        Owner: hvanhovell
        Created: Wed Nov 09 17:42:30 CET 2016
        Last Access: Thu Jan 01 01:00:00 CET 1970
        Type: MANAGED
        Schema: [StructField(id,LongType,true)]
        Provider: parquet
        Properties: [transient_lastDdlTime=1478709750]
        Storage(Location: file:/dev/assembly/spark-warehouse/prq, InputFormat: 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat: 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: 
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties: 
[serialization.format=1]))
         +- Project [id#7L]
            +- Range (0, 100000, step=1, splits=None)
```

## How was this patch tested?
Added extra checks to the `ParquetMetastoreSuite`

Author: Herman van Hovell <hvanhov...@databricks.com>

Closes #15832 from hvanhovell/SPARK-18370.

(cherry picked from commit d8b81f778af8c3d7112ad37f691c49215b392836)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4424c901
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4424c901
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4424c901

Branch: refs/heads/branch-2.1
Commit: 4424c901e82ed4992d5568cbc5a5f524b88dc5eb
Parents: 80f5851
Author: Herman van Hovell <hvanhov...@databricks.com>
Authored: Wed Nov 9 12:26:09 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Nov 9 12:26:17 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/execution/datasources/DataSource.scala    | 3 ++-
 .../spark/sql/execution/datasources/DataSourceStrategy.scala   | 5 +++--
 .../datasources/InsertIntoHadoopFsRelationCommand.scala        | 5 +++--
 .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala   | 6 ++++--
 4 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5266611..5d66394 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -424,7 +424,8 @@ case class DataSource(
             _ => Unit, // No existing table needs to be refreshed.
             options,
             data.logicalPlan,
-            mode)
+            mode,
+            catalogTable)
         sparkSession.sessionState.executePlan(plan).toRdd
         // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring it.
         copy(userSpecifiedSchema = 
Some(data.schema.asNullable)).resolveRelation()

http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a548e88..2d43a6a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
 
     case i @ logical.InsertIntoTable(
-           l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, 
overwrite, false)
+           l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, 
overwrite, false)
         if query.resolved && t.schema.asNullable == query.schema.asNullable =>
 
       // Sanity checks
@@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends 
Rule[LogicalPlan] {
         refreshPartitionsCallback,
         t.options,
         query,
-        mode)
+        mode,
+        table)
 
       insertCmd
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 9c75e2a..a0a8cb5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand(
     refreshFunction: (Seq[TablePartitionSpec]) => Unit,
     options: Map[String, String],
     @transient query: LogicalPlan,
-    mode: SaveMode)
+    mode: SaveMode,
+    catalogTable: Option[CatalogTable])
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9fc62a3..3644ff9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
+        case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+          assert(cmd.catalogTable.map(_.identifier.table) === 
Some("test_insert_parquet"))
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
           s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should 
have been SparkPlan. " +
@@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
+        case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+          assert(cmd.catalogTable.map(_.identifier.table) === 
Some("test_insert_parquet"))
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
           s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should 
have been SparkPlan." +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to