Repository: spark
Updated Branches:
refs/heads/branch-2.1 80f58510a -> 4424c901e
[SPARK-18370][SQL] Add table information to InsertIntoHadoopFsRelationCommand
## What changes were proposed in this pull request?
`InsertIntoHadoopFsRelationCommand` does not keep track if it inserts into a
table and what table it inserts to. This can make debugging these statements
problematic. This PR adds table information the
`InsertIntoHadoopFsRelationCommand`. Explaining this SQL command `insert into
prq select * from range(0, 100000)` now yields the following executed plan:
```
== Physical Plan ==
ExecutedCommand
+- InsertIntoHadoopFsRelationCommand file:/dev/assembly/spark-warehouse/prq,
ParquetFormat, <function1>, Map(serialization.format -> 1, path ->
file:/dev/assembly/spark-warehouse/prq), Append, CatalogTable(
Table: `default`.`prq`
Owner: hvanhovell
Created: Wed Nov 09 17:42:30 CET 2016
Last Access: Thu Jan 01 01:00:00 CET 1970
Type: MANAGED
Schema: [StructField(id,LongType,true)]
Provider: parquet
Properties: [transient_lastDdlTime=1478709750]
Storage(Location: file:/dev/assembly/spark-warehouse/prq, InputFormat:
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat:
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde:
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties:
[serialization.format=1]))
+- Project [id#7L]
+- Range (0, 100000, step=1, splits=None)
```
## How was this patch tested?
Added extra checks to the `ParquetMetastoreSuite`
Author: Herman van Hovell <[email protected]>
Closes #15832 from hvanhovell/SPARK-18370.
(cherry picked from commit d8b81f778af8c3d7112ad37f691c49215b392836)
Signed-off-by: Reynold Xin <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4424c901
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4424c901
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4424c901
Branch: refs/heads/branch-2.1
Commit: 4424c901e82ed4992d5568cbc5a5f524b88dc5eb
Parents: 80f5851
Author: Herman van Hovell <[email protected]>
Authored: Wed Nov 9 12:26:09 2016 -0800
Committer: Reynold Xin <[email protected]>
Committed: Wed Nov 9 12:26:17 2016 -0800
----------------------------------------------------------------------
.../apache/spark/sql/execution/datasources/DataSource.scala | 3 ++-
.../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 +++--
.../datasources/InsertIntoHadoopFsRelationCommand.scala | 5 +++--
.../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 6 ++++--
4 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5266611..5d66394 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -424,7 +424,8 @@ case class DataSource(
_ => Unit, // No existing table needs to be refreshed.
options,
data.logicalPlan,
- mode)
+ mode,
+ catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to
avoid re-inferring it.
copy(userSpecifiedSchema =
Some(data.schema.asNullable)).resolveRelation()
http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a548e88..2d43a6a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends
Rule[LogicalPlan] {
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query,
overwrite, false)
+ l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query,
overwrite, false)
if query.resolved && t.schema.asNullable == query.schema.asNullable =>
// Sanity checks
@@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends
Rule[LogicalPlan] {
refreshPartitionsCallback,
t.options,
query,
- mode)
+ mode,
+ table)
insertCmd
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 9c75e2a..a0a8cb5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand(
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
options: Map[String, String],
@transient query: LogicalPlan,
- mode: SaveMode)
+ mode: SaveMode,
+ catalogTable: Option[CatalogTable])
extends RunnableCommand {
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9fc62a3..3644ff9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest
{
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.sparkPlan match {
- case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
+ case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+ assert(cmd.catalogTable.map(_.identifier.table) ===
Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should
have been SparkPlan. " +
@@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest
{
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM
jt_array")
df.queryExecution.sparkPlan match {
- case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
+ case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+ assert(cmd.catalogTable.map(_.identifier.table) ===
Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should
have been SparkPlan." +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]