Repository: spark Updated Branches: refs/heads/branch-2.1 80f58510a -> 4424c901e
[SPARK-18370][SQL] Add table information to InsertIntoHadoopFsRelationCommand ## What changes were proposed in this pull request? `InsertIntoHadoopFsRelationCommand` does not keep track if it inserts into a table and what table it inserts to. This can make debugging these statements problematic. This PR adds table information the `InsertIntoHadoopFsRelationCommand`. Explaining this SQL command `insert into prq select * from range(0, 100000)` now yields the following executed plan: ``` == Physical Plan == ExecutedCommand +- InsertIntoHadoopFsRelationCommand file:/dev/assembly/spark-warehouse/prq, ParquetFormat, <function1>, Map(serialization.format -> 1, path -> file:/dev/assembly/spark-warehouse/prq), Append, CatalogTable( Table: `default`.`prq` Owner: hvanhovell Created: Wed Nov 09 17:42:30 CET 2016 Last Access: Thu Jan 01 01:00:00 CET 1970 Type: MANAGED Schema: [StructField(id,LongType,true)] Provider: parquet Properties: [transient_lastDdlTime=1478709750] Storage(Location: file:/dev/assembly/spark-warehouse/prq, InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, Serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Properties: [serialization.format=1])) +- Project [id#7L] +- Range (0, 100000, step=1, splits=None) ``` ## How was this patch tested? Added extra checks to the `ParquetMetastoreSuite` Author: Herman van Hovell <hvanhov...@databricks.com> Closes #15832 from hvanhovell/SPARK-18370. (cherry picked from commit d8b81f778af8c3d7112ad37f691c49215b392836) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4424c901 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4424c901 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4424c901 Branch: refs/heads/branch-2.1 Commit: 4424c901e82ed4992d5568cbc5a5f524b88dc5eb Parents: 80f5851 Author: Herman van Hovell <hvanhov...@databricks.com> Authored: Wed Nov 9 12:26:09 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Nov 9 12:26:17 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 ++- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 +++-- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 5 +++-- .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 6 ++++-- 4 files changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5266611..5d66394 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -424,7 +424,8 @@ case class DataSource( _ => Unit, // No existing table needs to be refreshed. options, data.logicalPlan, - mode) + mode, + catalogTable) sparkSession.sessionState.executePlan(plan).toRdd // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a548e88..2d43a6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -162,7 +162,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) + l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false) if query.resolved && t.schema.asNullable == query.schema.asNullable => // Sanity checks @@ -222,7 +222,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { refreshPartitionsCallback, t.options, query, - mode) + mode, + table) insertCmd } http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 9c75e2a..a0a8cb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -41,7 +41,8 @@ case class InsertIntoHadoopFsRelationCommand( refreshFunction: (Seq[TablePartitionSpec]) => Unit, options: Map[String, String], @transient query: LogicalPlan, - mode: SaveMode) + mode: SaveMode, + catalogTable: Option[CatalogTable]) extends RunnableCommand { override protected def innerChildren: Seq[LogicalPlan] = query :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/4424c901/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 9fc62a3..3644ff9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -307,7 +307,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.sparkPlan match { - case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK + case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + s"${classOf[HadoopFsRelation ].getCanonicalName} and " + s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " + @@ -337,7 +338,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.sparkPlan match { - case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK + case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + s"${classOf[HadoopFsRelation ].getCanonicalName} and " + s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org