This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e3cadc3  [SPARK-34567][SQL] CreateTableAsSelect should update metrics 
too
e3cadc3 is described below

commit e3cadc32102fb502e3b8ac5f6bacb39ec80df26a
Author: Angerszhuuuu <angers....@gmail.com>
AuthorDate: Thu Mar 4 20:42:47 2021 +0800

    [SPARK-34567][SQL] CreateTableAsSelect should update metrics too
    
    ### What changes were proposed in this pull request?
    For command `CreateTableAsSelect` we use `InsertIntoHiveTable`, 
`InsertIntoHadoopFsRelationCommand` to insert data.
    We will update metrics of  `InsertIntoHiveTable`, 
`InsertIntoHadoopFsRelationCommand`  in `FileFormatWriter.write()`, but we only 
show CreateTableAsSelectCommand in WebUI SQL Tab.
    We need to update `CreateTableAsSelectCommand`'s metrics too.
    
    Before this PR:
    
![image](https://user-images.githubusercontent.com/46485123/109411226-81f44480-79db-11eb-99cb-b9686b15bf61.png)
    
    After this PR:
    
![image](https://user-images.githubusercontent.com/46485123/109411232-8ae51600-79db-11eb-9111-3bea0bc2d475.png)
    
    
![image](https://user-images.githubusercontent.com/46485123/109905192-62aa2f80-7cd9-11eb-91f9-04b16c9238ae.png)
    
    ### Why are the changes needed?
    Complete SQL Metrics
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    <!--
    MT
    
    Closes #31679 from AngersZhuuuu/SPARK-34567.
    
    Authored-by: Angerszhuuuu <angers....@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 401e270c179021dd7bfd136b143ccc6d01c04755)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/command/DataWritingCommand.scala | 27 ++++++++++++++++++++--
 .../execution/command/createDataSourceTables.scala |  2 +-
 .../sql/execution/datasources/DataSource.scala     |  5 +++-
 .../sql/execution/metric/SQLMetricsSuite.scala     | 17 ++++++++++++++
 .../execution/CreateHiveTableAsSelectCommand.scala |  2 ++
 .../spark/sql/hive/execution/SQLMetricsSuite.scala | 27 ++++++++++++++++++++++
 6 files changed, 76 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index a56007f..c9de8c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.hadoop.conf.Configuration
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
-import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -73,4 +74,26 @@ object DataWritingCommand {
       attr.withName(outputName)
     }
   }
+
+  /**
+   * When execute CTAS operators, Spark will use 
[[InsertIntoHadoopFsRelationCommand]]
+   * or [[InsertIntoHiveTable]] command to write data, they both inherit 
metrics from
+   * [[DataWritingCommand]], but after running 
[[InsertIntoHadoopFsRelationCommand]]
+   * or [[InsertIntoHiveTable]], we only update metrics in these two command 
through
+   * [[BasicWriteJobStatsTracker]], we also need to propogate metrics to the 
command
+   * that actually calls [[InsertIntoHadoopFsRelationCommand]] or 
[[InsertIntoHiveTable]].
+   *
+   * @param sparkContext Current SparkContext.
+   * @param command Command to execute writing data.
+   * @param metrics Metrics of real DataWritingCommand.
+   */
+  def propogateMetrics(
+      sparkContext: SparkContext,
+      command: DataWritingCommand,
+      metrics: Map[String, SQLMetric]): Unit = {
+    command.metrics.foreach { case (key, metric) => 
metrics(key).set(metric.value) }
+    SQLMetrics.postDriverMetricUpdates(sparkContext,
+      sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
+      metrics.values.toSeq)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index be7fa7b..dc26e00 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -217,7 +217,7 @@ case class CreateDataSourceTableAsSelectCommand(
       catalogTable = if (tableExists) Some(table) else None)
 
     try {
-      dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan)
+      dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan, 
metrics)
     } catch {
       case ex: AnalysisException =>
         logError(s"Failed to write to table 
${table.identifier.unquotedString}", ex)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e84f594..476174a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -43,6 +43,7 @@ import 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, 
TextSocketSourceProvider}
 import org.apache.spark.sql.internal.SQLConf
@@ -518,7 +519,8 @@ case class DataSource(
       mode: SaveMode,
       data: LogicalPlan,
       outputColumnNames: Seq[String],
-      physicalPlan: SparkPlan): BaseRelation = {
+      physicalPlan: SparkPlan,
+      metrics: Map[String, SQLMetric]): BaseRelation = {
     val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, 
outputColumnNames)
     if 
(outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into 
external storage.")
@@ -546,6 +548,7 @@ case class DataSource(
           partitionColumns = resolvedPartCols,
           outputColumnNames = outputColumnNames)
         resolved.run(sparkSession, physicalPlan)
+        DataWritingCommand.propogateMetrics(sparkSession.sparkContext, 
resolved, metrics)
         // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring
         copy(userSpecifiedSchema = 
Some(outputColumns.toStructType.asNullable)).resolveRelation()
       case _ =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 21d17f4..d5f9875 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
 import org.apache.spark.sql.functions._
@@ -755,4 +756,20 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
       }
     }
   }
+
+  test("SPARK-34567: Add metrics for CTAS operator") {
+    withTable("t") {
+      val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
+      val dataWritingCommandExec =
+        df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
+      dataWritingCommandExec.executeCollect()
+      val createTableAsSelect = dataWritingCommandExec.cmd
+      assert(createTableAsSelect.metrics.contains("numFiles"))
+      assert(createTableAsSelect.metrics("numFiles").value == 1)
+      assert(createTableAsSelect.metrics.contains("numOutputBytes"))
+      assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
+      assert(createTableAsSelect.metrics.contains("numOutputRows"))
+      assert(createTableAsSelect.metrics("numOutputRows").value == 1)
+    }
+  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ccaa450..283c254 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -55,6 +55,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
 
       val command = getWritingCommand(catalog, tableDesc, tableExists = true)
       command.run(sparkSession, child)
+      DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, 
metrics)
     } else {
       // TODO ideally, we should get the output data ready first and then
       // add the relation into catalog, just in case of failure occurs while 
data
@@ -69,6 +70,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
         val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
         val command = getWritingCommand(catalog, createdTableMeta, tableExists 
= false)
         command.run(sparkSession, child)
+        DataWritingCommand.propogateMetrics(sparkSession.sparkContext, 
command, metrics)
       } catch {
         case NonFatal(e) =>
           // drop the created table.
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
index 4d6dafd..a2de43d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
+import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 // Disable AQE because metric info is different with AQE on/off
@@ -34,4 +36,29 @@ class SQLMetricsSuite extends SQLMetricsTestUtils with 
TestHiveSingleton
       testMetricsDynamicPartition("hive", "hive", "t1")
     }
   }
+
+  test("SPARK-34567: Add metrics for CTAS operator") {
+    Seq(false, true).foreach { canOptimized =>
+      withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> 
canOptimized.toString) {
+        withTable("t") {
+          val df = sql(s"CREATE TABLE t STORED AS PARQUET AS SELECT 1 as a")
+          val dataWritingCommandExec =
+            df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
+          dataWritingCommandExec.executeCollect()
+          val createTableAsSelect = dataWritingCommandExec.cmd
+          if (canOptimized) {
+            
assert(createTableAsSelect.isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
+          } else {
+            
assert(createTableAsSelect.isInstanceOf[CreateHiveTableAsSelectCommand])
+          }
+          assert(createTableAsSelect.metrics.contains("numFiles"))
+          assert(createTableAsSelect.metrics("numFiles").value == 1)
+          assert(createTableAsSelect.metrics.contains("numOutputBytes"))
+          assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
+          assert(createTableAsSelect.metrics.contains("numOutputRows"))
+          assert(createTableAsSelect.metrics("numOutputRows").value == 1)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to