This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cfdbfb7349a [SPARK-41726][SQL] Remove
`OptimizedCreateHiveTableAsSelectCommand`
cfdbfb7349a is described below
commit cfdbfb7349a6c7765b0172c23f133d39196354b0
Author: ulysses-you <[email protected]>
AuthorDate: Thu Dec 29 17:02:00 2022 -0800
[SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand`
### What changes were proposed in this pull request?
This pr removes `OptimizedCreateHiveTableAsSelectCommand` and move the code
that tune `InsertIntoHiveTable` to `InsertIntoHadoopFsRelationCommand` into
`RelationConversions`.
### Why are the changes needed?
CTAS use a nested execution to do data writing, so it is unnecessary to
have `OptimizedCreateHiveTableAsSelectCommand`. The inside
`InsertIntoHiveTable` would be converted to `InsertIntoHadoopFsRelationCommand`
if possible.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
fix test
Closes #39263 from ulysses-you/SPARK-41726.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/hive/HiveStrategies.scala | 32 +++++-
.../execution/CreateHiveTableAsSelectCommand.scala | 114 ++++-----------------
.../sql/hive/execution/HiveExplainSuite.scala | 24 -----
.../spark/sql/hive/execution/SQLQuerySuite.scala | 98 ++++++++++--------
4 files changed, 104 insertions(+), 164 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 42bf1e31bb0..af727f966e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir,
InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils,
InsertIntoDataSourceDirCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTable,
DataSourceStrategy}
+import org.apache.spark.sql.execution.datasources.{CreateTable,
DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand,
LogicalRelation}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
import org.apache.spark.sql.internal.HiveSerDe
@@ -232,15 +233,36 @@ case class RelationConversions(
if DDLUtils.isHiveTable(relation.tableMeta) &&
isConvertible(relation) =>
metastoreCatalog.convert(relation, isWrite = false)
- // CTAS
- case CreateTable(tableDesc, mode, Some(query))
+ // CTAS path
+ // This `InsertIntoHiveTable` is derived from
`CreateHiveTableAsSelectCommand`,
+ // that only matches table insertion inside Hive CTAS.
+ // This pattern would not cause conflicts because this rule is always
applied before
+ // `HiveAnalysis` and both of these rules are running once.
+ case InsertIntoHiveTable(tableDesc, _, query, overwrite,
ifPartitionNotExists, _)
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc)
&&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
- OptimizedCreateHiveTableAsSelectCommand(
- tableDesc, query, query.output.map(_.name), mode)
+ val hiveTable = DDLUtils.readHiveTable(tableDesc)
+ val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite =
true) match {
+ case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
+ case _ => throw
QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
+ tableDesc.identifier)
+ }
+ InsertIntoHadoopFsRelationCommand(
+ hadoopRelation.location.rootPaths.head,
+ Map.empty, // We don't support to convert partitioned table.
+ ifPartitionNotExists,
+ Seq.empty, // We don't support to convert partitioned table.
+ hadoopRelation.bucketSpec,
+ hadoopRelation.fileFormat,
+ hadoopRelation.options,
+ query,
+ if (overwrite) SaveMode.Overwrite else SaveMode.Append,
+ Some(tableDesc),
+ Some(hadoopRelation.location),
+ query.output.map(_.name))
// INSERT HIVE DIR
case InsertIntoDir(_, storage, provider, query, overwrite)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 4dfb2cf65eb..a6d85b3f8b3 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -24,17 +24,21 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils,
LeafRunnableCommand}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
InsertIntoHadoopFsRelationCommand, LogicalRelation}
-import org.apache.spark.sql.hive.HiveSessionCatalog
-import org.apache.spark.util.Utils
-
-trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
- val tableDesc: CatalogTable
- val query: LogicalPlan
- val outputColumnNames: Seq[String]
- val mode: SaveMode
+import org.apache.spark.sql.execution.command.{DataWritingCommand,
LeafRunnableCommand}
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, storage
handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+ tableDesc: CatalogTable,
+ query: LogicalPlan,
+ outputColumnNames: Seq[String],
+ mode: SaveMode)
+ extends LeafRunnableCommand {
assert(query.resolved)
override def innerChildren: Seq[LogicalPlan] = query :: Nil
@@ -60,9 +64,9 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand
{
val qe = sparkSession.sessionState.executePlan(command)
qe.assertCommandExecuted()
} else {
- tableDesc.storage.locationUri.foreach { p =>
- DataWritingCommand.assertEmptyRootPath(p, mode,
sparkSession.sessionState.newHadoopConf)
- }
+ tableDesc.storage.locationUri.foreach { p =>
+ DataWritingCommand.assertEmptyRootPath(p, mode,
sparkSession.sessionState.newHadoopConf)
+ }
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while
data
// processing.
@@ -90,38 +94,7 @@ trait CreateHiveTableAsSelectBase extends
LeafRunnableCommand {
Seq.empty[Row]
}
- // Returns `DataWritingCommand` which actually writes data into the table.
- def getWritingCommand(
- catalog: SessionCatalog,
- tableDesc: CatalogTable,
- tableExists: Boolean): DataWritingCommand
-
- // A subclass should override this with the Class name of the concrete type
expected to be
- // returned from `getWritingCommand`.
- def writingCommandClassName: String
-
- override def argString(maxFields: Int): String = {
- s"[Database: ${tableDesc.database}, " +
- s"TableName: ${tableDesc.identifier.table}, " +
- s"${writingCommandClassName}]"
- }
-}
-
-/**
- * Create table and insert the query result into it.
- *
- * @param tableDesc the table description, which may contain serde, storage
handler etc.
- * @param query the query whose result will be insert into the new relation
- * @param mode SaveMode
- */
-case class CreateHiveTableAsSelectCommand(
- tableDesc: CatalogTable,
- query: LogicalPlan,
- outputColumnNames: Seq[String],
- mode: SaveMode)
- extends CreateHiveTableAsSelectBase {
-
- override def getWritingCommand(
+ private def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
@@ -136,53 +109,8 @@ case class CreateHiveTableAsSelectCommand(
outputColumnNames = outputColumnNames)
}
- override def writingCommandClassName: String =
- Utils.getSimpleName(classOf[InsertIntoHiveTable])
-}
-
-/**
- * Create table and insert the query result into it. This creates Hive table
but inserts
- * the query result into it by using data source.
- *
- * @param tableDesc the table description, which may contain serde, storage
handler etc.
- * @param query the query whose result will be insert into the new relation
- * @param mode SaveMode
- */
-case class OptimizedCreateHiveTableAsSelectCommand(
- tableDesc: CatalogTable,
- query: LogicalPlan,
- outputColumnNames: Seq[String],
- mode: SaveMode)
- extends CreateHiveTableAsSelectBase {
-
- override def getWritingCommand(
- catalog: SessionCatalog,
- tableDesc: CatalogTable,
- tableExists: Boolean): DataWritingCommand = {
- val metastoreCatalog =
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
- val hiveTable = DDLUtils.readHiveTable(tableDesc)
-
- val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true)
match {
- case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
- case _ => throw
QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
- tableIdentifier)
- }
-
- InsertIntoHadoopFsRelationCommand(
- hadoopRelation.location.rootPaths.head,
- Map.empty, // We don't support to convert partitioned table.
- false,
- Seq.empty, // We don't support to convert partitioned table.
- hadoopRelation.bucketSpec,
- hadoopRelation.fileFormat,
- hadoopRelation.options,
- query,
- if (tableExists) mode else SaveMode.Overwrite,
- Some(tableDesc),
- Some(hadoopRelation.location),
- query.output.map(_.name))
+ override def argString(maxFields: Int): String = {
+ s"[Database: ${tableDesc.database}, " +
+ s"TableName: ${tableDesc.identifier.table}]"
}
-
- override def writingCommandClassName: String =
- Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand])
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 258b101dd21..08ebcf3e4dc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -23,13 +23,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
-import
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.tags.SlowHiveTest
-import org.apache.spark.util.Utils
/**
* A set of tests that validates support for Hive Explain command.
@@ -185,27 +182,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils
with TestHiveSingleto
}
}
- test("SPARK-26661: Show actual class name of the writing command in CTAS
explain") {
- Seq(true, false).foreach { convertCTAS =>
- withSQLConf(
- HiveUtils.CONVERT_METASTORE_CTAS.key -> convertCTAS.toString,
- HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertCTAS.toString) {
-
- val df = sql(s"EXPLAIN CREATE TABLE tab1 STORED AS PARQUET AS SELECT *
FROM range(2)")
- val keywords = if (convertCTAS) {
- Seq(
- s"Execute
${Utils.getSimpleName(classOf[OptimizedCreateHiveTableAsSelectCommand])}",
- Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand]))
- } else {
- Seq(
- s"Execute
${Utils.getSimpleName(classOf[CreateHiveTableAsSelectCommand])}",
- Utils.getSimpleName(classOf[InsertIntoHiveTable]))
- }
- checkKeywordsExist(df, keywords: _*)
- }
- }
- }
-
test("SPARK-28595: explain should not trigger partition listing") {
Seq(true, false).foreach { legacyBucketedScan =>
withSQLConf(
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 7976dab3c44..a902cb3a69e 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -27,6 +27,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkException, TestUtils}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
@@ -34,10 +35,11 @@ import
org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, Hi
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.execution.TestUncaughtExceptionHandler
+import org.apache.spark.sql.execution.{SparkPlanInfo,
TestUncaughtExceptionHandler}
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite,
EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand,
LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveSingleton}
@@ -2296,47 +2298,6 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
}
}
- test("SPARK-25271: Hive ctas commands should use data source if it is
convertible") {
- withTempView("p") {
- Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")
-
- Seq("orc", "parquet").foreach { format =>
- Seq(true, false).foreach { isConverted =>
- withSQLConf(
- HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
- HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
- Seq(true, false).foreach { isConvertedCtas =>
- withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key ->
s"$isConvertedCtas") {
-
- val targetTable = "targetTable"
- withTable(targetTable) {
- val df = sql(s"CREATE TABLE $targetTable STORED AS $format
AS SELECT id FROM p")
- checkAnswer(sql(s"SELECT id FROM $targetTable"),
- Row(1) :: Row(2) :: Row(3) :: Nil)
-
- val ctasDSCommand = df.queryExecution.analyzed.collect {
- case _: OptimizedCreateHiveTableAsSelectCommand => true
- }.headOption
- val ctasCommand = df.queryExecution.analyzed.collect {
- case _: CreateHiveTableAsSelectCommand => true
- }.headOption
-
- if (isConverted && isConvertedCtas) {
- assert(ctasDSCommand.nonEmpty)
- assert(ctasCommand.isEmpty)
- } else {
- assert(ctasDSCommand.isEmpty)
- assert(ctasCommand.nonEmpty)
- }
- }
- }
- }
- }
- }
- }
- }
- }
-
test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") {
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
withTable("all_null") {
@@ -2682,10 +2643,63 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
@SlowHiveTest
class SQLQuerySuite extends SQLQuerySuiteBase with
DisableAdaptiveExecutionSuite {
+ import spark.implicits._
+
test("SPARK-36421: Validate all SQL configs to prevent from wrong use for
ConfigEntry") {
val df = spark.sql("set -v").select("Meaning")
assert(df.collect().forall(!_.getString(0).contains("ConfigEntry")))
}
+
+ test("SPARK-25271: Hive ctas commands should use data source if it is
convertible") {
+ withTempView("p") {
+ Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")
+
+ Seq("orc", "parquet").foreach { format =>
+ Seq(true, false).foreach { isConverted =>
+ withSQLConf(
+ HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
+ Seq(true, false).foreach { isConvertedCtas =>
+ withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key ->
s"$isConvertedCtas") {
+
+ val targetTable = "targetTable"
+ withTable(targetTable) {
+ var commands: Seq[SparkPlanInfo] = Seq.empty
+ val listener = new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit
= {
+ event match {
+ case start: SparkListenerSQLExecutionStart =>
+ commands = commands ++ Seq(start.sparkPlanInfo)
+ case _ => // ignore other events
+ }
+ }
+ }
+ spark.sparkContext.addSparkListener(listener)
+ try {
+ sql(s"CREATE TABLE $targetTable STORED AS $format AS
SELECT id FROM p")
+ checkAnswer(sql(s"SELECT id FROM $targetTable"),
+ Row(1) :: Row(2) :: Row(3) :: Nil)
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ assert(commands.size == 3)
+ assert(commands.head.nodeName == "Execute
CreateHiveTableAsSelectCommand")
+
+ val v1WriteCommand = commands(1)
+ if (isConverted && isConvertedCtas) {
+ assert(v1WriteCommand.nodeName == "Execute
InsertIntoHadoopFsRelationCommand")
+ } else {
+ assert(v1WriteCommand.nodeName == "Execute
InsertIntoHiveTable")
+ }
+ } finally {
+ spark.sparkContext.removeSparkListener(listener)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
@SlowHiveTest
class SQLQuerySuiteAE extends SQLQuerySuiteBase with
EnableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]