This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c5a01a [SPARK-35378][SQL][FOLLOWUP] Restore the command execution
name for DataFrameWriterV2
0c5a01a is described below
commit 0c5a01a78c067a78025878dff9d54843111287cc
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Jun 17 08:55:42 2021 +0000
[SPARK-35378][SQL][FOLLOWUP] Restore the command execution name for
DataFrameWriterV2
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32513
It's hard to keep the command execution name for `DataFrameWriter`, as the
command logical plan is a bit messy (DS v1, file source and hive and different
command logical plans) and sometimes it's hard to distinguish "insert" and
"save".
However, `DataFrameWriterV2` only produce v2 commands which are pretty
clean. It's easy to keep the command execution name for them.
### Why are the changes needed?
less breaking changes.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes #32919 from cloud-fan/follow.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
docs/sql-migration-guide.md | 2 +-
.../scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +--
.../scala/org/apache/spark/sql/DataFrameWriterV2.scala | 4 +---
.../apache/spark/sql/execution/QueryExecution.scala | 18 ++++++++++++++----
.../connector/WriteDistributionAndOrderingSuite.scala | 5 ++---
5 files changed, 19 insertions(+), 13 deletions(-)
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index ce655af..dd654cd 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -95,7 +95,7 @@ license: |
- In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it
used to be mapped to `REAL`, which is by default a synonym to `DOUBLE
PRECISION` in MySQL.
- - In Spark 3.2, the query executions triggered by `DataFrameWriter` are
always named `command` when being sent to `QueryExecutionListener`. In Spark
3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`,
`create`, `append`, `overwrite`, `overwritePartitions`, `replace`.
+ - In Spark 3.2, the query executions triggered by `DataFrameWriter` are
always named `command` when being sent to `QueryExecutionListener`. In Spark
3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`.
## Upgrading from Spark SQL 3.0 to 3.1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5b68493a..47eb199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -857,8 +857,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
{
*/
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
- // call `QueryExecution.commandExecuted` to trigger the execution of
commands.
- qe.commandExecuted
+ qe.assertCommandExecuted()
}
private def lookupV2Provider(): Option[TableProvider] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 7b13105..3931d1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -25,7 +25,6 @@ import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days,
Hours, Literal, Months, Years}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression,
OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions,
NamedReference, Transform}
-import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.types.IntegerType
/**
@@ -191,8 +190,7 @@ final class DataFrameWriterV2[T] private[sql](table:
String, ds: Dataset[T])
*/
private def runCommand(command: LogicalPlan): Unit = {
val qe = sparkSession.sessionState.executePlan(command)
- // call `QueryExecution.toRDD` to trigger the execution of commands.
- SQLExecution.withNewExecutionId(qe, Some("command"))(qe.toRdd)
+ qe.assertCommandExecuted()
}
private def internalReplace(orCreate: Boolean): Unit = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index a794a47..aaa87bd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow,
QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan,
ReturnAnswer}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command,
CreateTableAsSelect, LogicalPlan, OverwriteByExpression,
OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -81,11 +81,21 @@ class QueryExecution(
case CommandExecutionMode.SKIP => analyzed
}
+ private def commandExecutionName(command: Command): String = command match {
+ case _: CreateTableAsSelect => "create"
+ case _: ReplaceTableAsSelect => "replace"
+ case _: AppendData => "append"
+ case _: OverwriteByExpression => "overwrite"
+ case _: OverwritePartitionsDynamic => "overwritePartitions"
+ case _ => "command"
+ }
+
private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
case c: Command =>
val qe = sparkSession.sessionState.executePlan(c,
CommandExecutionMode.NON_ROOT)
- val result =
- SQLExecution.withNewExecutionId(qe,
Some("command"))(qe.executedPlan.executeCollect())
+ val result = SQLExecution.withNewExecutionId(qe,
Some(commandExecutionName(c))) {
+ qe.executedPlan.executeCollect()
+ }
CommandResult(
qe.analyzed.output,
qe.commandExecuted,
@@ -102,7 +112,7 @@ class QueryExecution(
sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone())
}
- private def assertCommandExecuted(): Unit = commandExecuted
+ def assertCommandExecuted(): Unit = commandExecuted
lazy val optimizedPlan: LogicalPlan = {
// We need to materialize the commandExecuted here because optimizedPlan
is also tracked under
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index 945a35a..db4a9c1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog}
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference,
NullOrdering, SortDirection, SortOrder}
import org.apache.spark.sql.connector.expressions.LogicalExpressions._
-import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution,
SortExec, SparkPlan}
+import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@@ -778,8 +778,7 @@ class WriteDistributionAndOrderingSuite
sparkContext.listenerBus.waitUntilEmpty()
- assert(executedPlan.isInstanceOf[CommandResultExec])
- executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan match {
+ executedPlan match {
case w: V2TableWriteExec =>
stripAQEPlan(w.query)
case _ =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]