This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8de53571e0 [HUDI-5346][HUDI-5320] Fixing Create Table as Select (CTAS) 
performance gaps (#7370)
8de53571e0 is described below

commit 8de53571e0e1e7ea63c7ecf7b646da64eceaef8c
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Thu Dec 8 18:52:47 2022 -0800

    [HUDI-5346][HUDI-5320] Fixing Create Table as Select (CTAS) performance 
gaps (#7370)
    
    This PR is addressing some of the performance traps detected while 
stress-testing Spark SQL's Create Table as Select command:
    
    Avoids reordering of the columns w/in CTAS (there's no need for it, 
InsertIntoTableCommand will be resolving columns anyway)
    Fixing validation sequence w/in InsertIntoTableCommand to first resolve the 
columns and then run validation (currently it's done the other way around)
    Propagating properties specified in CTAS to the HoodieSparkSqlWriter (for 
ex, currently there's no way to disable MT when using CTAS precisely b/c of the 
fact that these properties are not propagated)
    Additionally following improvements to HoodieBulkInsertHelper were made:
    
    Now if meta-fields are disabled, we won't be dereferencing incoming Dataset 
into RDD and instead simply add stubbed out meta-fields t/h additional 
Projection
---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 68 ++++++++++---------
 .../org/apache/spark/sql/HoodieUnsafeUtils.scala   | 12 +++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  7 +-
 .../command/CreateHoodieTableAsSelectCommand.scala | 76 +++++++++-------------
 .../command/InsertIntoHoodieTableCommand.scala     |  3 +-
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |  4 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 22 ++++---
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  | 11 +++-
 8 files changed, 106 insertions(+), 97 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 296abaf4f5..79fa67acdb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -33,7 +33,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.types.{DataType, StringType, StructField, 
StructType}
 import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -58,31 +60,6 @@ object HoodieDatasetBulkInsertHelper extends Logging {
     val populateMetaFields = config.populateMetaFields()
     val schema = df.schema
 
-    val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
-      "Key-generator class name is required")
-
-    val prependedRdd: RDD[InternalRow] =
-      df.queryExecution.toRdd.mapPartitions { iter =>
-        val keyGenerator =
-          ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
-            .asInstanceOf[SparkKeyGeneratorInterface]
-
-        iter.map { row =>
-          val (recordKey, partitionPath) =
-            if (populateMetaFields) {
-              (keyGenerator.getRecordKey(row, schema), 
keyGenerator.getPartitionPath(row, schema))
-            } else {
-              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
-            }
-          val commitTimestamp = UTF8String.EMPTY_UTF8
-          val commitSeqNo = UTF8String.EMPTY_UTF8
-          val filename = UTF8String.EMPTY_UTF8
-
-          // TODO use mutable row, avoid re-allocating
-          new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, 
partitionPath, filename, row, false)
-        }
-      }
-
     val metaFields = Seq(
       StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
       StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 
     val updatedSchema = StructType(metaFields ++ schema.fields)
 
-    val updatedDF = if (populateMetaFields && 
config.shouldCombineBeforeInsert) {
-      val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, 
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+    val updatedDF = if (populateMetaFields) {
+      val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+        "Key-generator class name is required")
+
+      val prependedRdd: RDD[InternalRow] =
+        df.queryExecution.toRdd.mapPartitions { iter =>
+          val keyGenerator =
+            ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+              .asInstanceOf[SparkKeyGeneratorInterface]
+
+          iter.map { row =>
+            val recordKey = keyGenerator.getRecordKey(row, schema)
+            val partitionPath = keyGenerator.getPartitionPath(row, schema)
+            val commitTimestamp = UTF8String.EMPTY_UTF8
+            val commitSeqNo = UTF8String.EMPTY_UTF8
+            val filename = UTF8String.EMPTY_UTF8
+
+            // TODO use mutable row, avoid re-allocating
+            new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, 
partitionPath, filename, row, false)
+          }
+        }
+
+      val dedupedRdd = if (config.shouldCombineBeforeInsert) {
+        dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, 
SparkHoodieIndexFactory.isGlobalIndex(config))
+      } else {
+        prependedRdd
+      }
+
       HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, 
updatedSchema)
     } else {
-      HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, 
updatedSchema)
+      // NOTE: In cases when we're not populating meta-fields we actually don't
+      //       need access to the [[InternalRow]] and therefore can avoid the 
need
+      //       to dereference [[DataFrame]] into [[RDD]]
+      val query = df.queryExecution.logical
+      val metaFieldsStubs = metaFields.map(f => 
Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)())
+      val prependedQuery = Project(metaFieldsStubs ++ query.output, query)
+
+      HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
     }
 
     val trimmedDF = if (shouldDropPartitionColumns) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index edf05f2db2..c981cd8113 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -30,6 +30,15 @@ import org.apache.spark.util.MutablePair
  */
 object HoodieUnsafeUtils {
 
+  /**
+   * Creates [[DataFrame]] from provided [[plan]]
+   *
+   * @param spark spark's session
+   * @param plan given plan to wrap into [[DataFrame]]
+   */
+  def createDataFrameFrom(spark: SparkSession, plan: LogicalPlan): DataFrame =
+    Dataset.ofRows(spark, plan)
+
   /**
    * Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with 
provided [[schema]]
    *
@@ -39,7 +48,6 @@ object HoodieUnsafeUtils {
    * @param spark spark's session
    * @param rows collection of rows to base [[DataFrame]] on
    * @param schema target [[DataFrame]]'s schema
-   * @return
    */
   def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: 
StructType): DataFrame =
     Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, 
rows))
@@ -53,7 +61,6 @@ object HoodieUnsafeUtils {
    * @param spark spark's session
    * @param rows collection of rows to base [[DataFrame]] on
    * @param schema target [[DataFrame]]'s schema
-   * @return
    */
   def createDataFrameFromInternalRows(spark: SparkSession, rows: 
Seq[InternalRow], schema: StructType): DataFrame =
     Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
@@ -65,7 +72,6 @@ object HoodieUnsafeUtils {
    * @param spark spark's session
    * @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
    * @param schema target [[DataFrame]]'s schema
-   * @return
    */
   def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], 
schema: StructType): DataFrame =
     spark.internalCreateDataFrame(rdd, schema)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 97c2c805cc..faa6fada13 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -66,7 +66,6 @@ import scala.collection.JavaConversions._
 import scala.collection.JavaConverters.setAsJavaSetConverter
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
-import scala.util.matching.Regex
 
 object HoodieSparkSqlWriter {
 
@@ -121,6 +120,7 @@ object HoodieSparkSqlWriter {
     }
     val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
     var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up
     // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
     // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
     // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
@@ -749,8 +749,7 @@ object HoodieSparkSqlWriter {
       val userDefinedBulkInsertPartitionerOpt = 
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
       if (userDefinedBulkInsertPartitionerOpt.isPresent) {
         userDefinedBulkInsertPartitionerOpt.get
-      }
-      else {
+      } else {
         
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
       }
     } else {
@@ -842,7 +841,7 @@ object HoodieSparkSqlWriter {
       properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, 
SPARK_VERSION)
       
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, 
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
 
-      //Collect exceptions in list because we want all sync to run. Then we 
can throw
+      // Collect exceptions in list because we want all sync to run. Then we 
can throw
       val metaSyncExceptions = new ListBuffer[HoodieException]()
       syncClientToolClassSet.foreach(impl => {
         try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index 6a93f0c7af..1f8d009530 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -19,16 +19,15 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-
 import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.util.ConfigUtils
-
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, HoodieCatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HoodieCatalogTable}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
 
 import scala.collection.JavaConverters._
@@ -43,8 +42,8 @@ case class CreateHoodieTableAsSelectCommand(
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    assert(table.tableType != CatalogTableType.VIEW)
-    assert(table.provider.isDefined)
+    checkState(table.tableType != CatalogTableType.VIEW)
+    checkState(table.provider.isDefined)
 
     val hasQueryAsProp = (table.storage.properties ++ 
table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
     if (hasQueryAsProp) {
@@ -53,11 +52,11 @@ case class CreateHoodieTableAsSelectCommand(
 
     val sessionState = sparkSession.sessionState
     val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    val tableName = tableIdentWithDB.unquotedString
+    val qualifiedTableIdentifier = table.identifier.copy(database = Some(db))
+    val tableName = qualifiedTableIdentifier.unquotedString
 
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
-      assert(mode != SaveMode.Overwrite,
+    if (sessionState.catalog.tableExists(qualifiedTableIdentifier)) {
+      checkState(mode != SaveMode.Overwrite,
         s"Expect the table $tableName has been dropped when the save mode is 
Overwrite")
 
       if (mode == SaveMode.ErrorIfExists) {
@@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand(
       }
     }
 
-    // ReOrder the query which move the partition columns to the last of the 
project list
-    val reOrderedQuery = reOrderPartitionColumn(query, 
table.partitionColumnNames)
     // Remove some properties should not be used
-    val newStorage = new CatalogStorageFormat(
-      table.storage.locationUri,
-      table.storage.inputFormat,
-      table.storage.outputFormat,
-      table.storage.serde,
-      table.storage.compressed,
-      table.storage.properties.--(needFilterProps))
-    val newTable = table.copy(
-      identifier = tableIdentWithDB,
-      storage = newStorage,
-      schema = reOrderedQuery.schema,
-      properties = table.properties.--(needFilterProps)
+    val updatedStorageFormat = table.storage.copy(
+      properties = table.storage.properties -- needFilterProps)
+
+    val updatedTable = table.copy(
+      identifier = qualifiedTableIdentifier,
+      storage = updatedStorageFormat,
+      // TODO need to add meta-fields here
+      schema = query.schema,
+      properties = table.properties -- needFilterProps
     )
 
-    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable)
     val tablePath = hoodieCatalogTable.tableLocation
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
-    // Execute the insert query
     try {
-      // init hoodie table
+      // Init hoodie table
       hoodieCatalogTable.initHoodieTable()
 
-      val tblProperties = hoodieCatalogTable.catalogProperties
-      val options = Map(
+      val tableProperties = hoodieCatalogTable.catalogProperties
+      // NOTE: Users might be specifying write-configuration (inadvertently) 
as options or table properties
+      //       in CTAS, therefore we need to make sure that these are 
appropriately propagated to the
+      //       write operation
+      val options = tableProperties ++ Map(
         HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType 
== CatalogTableType.MANAGED).toString,
-        HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> 
ConfigUtils.configToString(tblProperties.asJava),
-        HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> 
ConfigUtils.configToString(newTable.properties.asJava),
+        HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> 
ConfigUtils.configToString(tableProperties.asJava),
+        HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> 
ConfigUtils.configToString(updatedTable.properties.asJava),
         DataSourceWriteOptions.SQL_INSERT_MODE.key -> 
InsertMode.NON_STRICT.value(),
         DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
       )
-      val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap
-      val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, 
reOrderedQuery, partitionSpec,
+      val partitionSpec = updatedTable.partitionColumnNames.map((_, 
None)).toMap
+      val success = InsertIntoHoodieTableCommand.run(sparkSession, 
updatedTable, query, partitionSpec,
         mode == SaveMode.Overwrite, refreshTable = false, extraOptions = 
options)
       if (success) {
         // If write success, create the table in catalog if it has not synced 
to the
         // catalog by the meta sync.
-        if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
+        if 
(!sparkSession.sessionState.catalog.tableExists(qualifiedTableIdentifier)) {
           // create catalog table for this hoodie table
           CreateHoodieTableCommand.createTableInCatalog(sparkSession, 
hoodieCatalogTable, mode == SaveMode.Ignore)
         }
@@ -132,16 +128,4 @@ case class CreateHoodieTableAsSelectCommand(
     val fs = path.getFileSystem(conf)
     fs.delete(path, true)
   }
-
-  private def reOrderPartitionColumn(query: LogicalPlan,
-    partitionColumns: Seq[String]): LogicalPlan = {
-    if (partitionColumns.isEmpty) {
-      query
-    } else {
-      val nonPartitionAttrs = query.output.filter(p => 
!partitionColumns.contains(p.name))
-      val partitionAttrs = query.output.filter(p => 
partitionColumns.contains(p.name))
-      val reorderAttrs = nonPartitionAttrs ++ partitionAttrs
-      Project(reorderAttrs, query)
-    }
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 125e802802..0228e5ddcf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -132,7 +132,6 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     val targetPartitionSchema = catalogTable.partitionSchema
     val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
 
-    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
     // Make sure we strip out meta-fields from the incoming dataset (these 
will have to be discarded anyway)
     val cleanedQuery = stripMetaFields(query)
     // To validate and align properly output of the query, we simply filter 
out partition columns with already
@@ -144,6 +143,8 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     //       positionally for example
     val expectedQueryColumns = 
catalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name))
     val coercedQueryOutput = 
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, 
catalogTable, conf)
+    // After potential reshaping validate that the output of the query 
conforms to the table's schema
+    validate(removeMetaFields(coercedQueryOutput.schema), partitionsSpec, 
catalogTable)
 
     val staticPartitionValuesExprs = 
createStaticPartitionValuesExpressions(staticPartitionValues, 
targetPartitionSchema, conf)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index e7848320ff..13800e6c8b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -128,7 +128,7 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
       spark.sql(sql)
     } catch {
       case e: Throwable =>
-        assertResult(errorMsg)(e.getMessage)
+        assertResult(errorMsg.trim)(e.getMessage.trim)
         hasException = true
     }
     assertResult(true)(hasException)
@@ -139,7 +139,7 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     try {
       spark.sql(sql)
     } catch {
-      case e: Throwable if e.getMessage.contains(errorMsg) => hasException = 
true
+      case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) => 
hasException = true
       case f: Throwable => fail("Exception should contain: " + errorMsg + ", 
error message: " + f.getMessage, f)
     }
     assertResult(true)(hasException)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 2fa6b939ac..4c2bec48a6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -529,17 +529,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
          | tblproperties (primaryKey = 'id')
          | partitioned by (dt)
        """.stripMargin)
-    checkException(s"insert into $tableName partition(dt = '2021-06-20') 
select 1, 'a1', 10, '2021-06-20'") (
-      "Expected table's schema: " +
-        "[StructField(id,IntegerType,true), StructField(name,StringType,true), 
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
-        "query's output (including static partition values): " +
-        "[StructField(1,IntegerType,false), StructField(a1,StringType,false), 
StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), 
StructField(dt,StringType,true)]"
+    checkExceptionContain(s"insert into $tableName partition(dt = 
'2021-06-20') select 1, 'a1', 10, '2021-06-20'") (
+      """
+        |too many data columns:
+        |Table columns: 'id', 'name', 'price'
+        |Data columns: '1', 'a1', '10', '2021-06-20'
+        |""".stripMargin
     )
-    checkException(s"insert into $tableName select 1, 'a1', 10")(
-      "Expected table's schema: " +
-        "[StructField(id,IntegerType,true), StructField(name,StringType,true), 
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
-        "query's output (including static partition values): " +
-        "[StructField(1,IntegerType,false), StructField(a1,StringType,false), 
StructField(10,IntegerType,false)]"
+    checkExceptionContain(s"insert into $tableName select 1, 'a1', 10")(
+      """
+        |not enough data columns:
+        |Table columns: 'id', 'name', 'price', 'dt'
+        |Data columns: '1', 'a1', '10'
+        |""".stripMargin
     )
     spark.sql("set hoodie.sql.bulk.insert.enable = true")
     spark.sql("set hoodie.sql.insert.mode = strict")
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 2672e2c4cb..9be7198e6d 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
+import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan}
 import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
ExplainCommand}
@@ -31,8 +32,14 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
                            expected: Seq[Attribute],
                            query: LogicalPlan,
                            byName: Boolean,
-                           conf: SQLConf): LogicalPlan =
-    SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, 
expected, query, byName)
+                           conf: SQLConf): LogicalPlan = {
+    // NOTE: We have to apply [[ResolveUpCast]] and [[SimplifyCasts]] rules 
since by default Spark 2.x will
+    //       always be wrapping matched attributes into [[UpCast]]s which 
aren't resolvable and render some
+    //       APIs like [[QueryPlan.schema]] unusable
+    SimplifyCasts.apply(
+      SimpleAnalyzer.ResolveUpCast.apply(
+        SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, 
expected, query, byName)))
+  }
 
   def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
     ExplainCommand(plan, extended = extended)

Reply via email to