xushiyan commented on code in PR #6213:
URL: https://github.com/apache/hudi/pull/6213#discussion_r932713368


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, 
overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" 
parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, 
insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, 
alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the 
schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result 
table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where 
needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs 
</li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition 
values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => 
p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current 
input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: 
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = 
getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => 
staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), 
f.dataType, conf)
-        Alias(castAttr, f.name)()
-      })
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these 
will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter 
out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) 
name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having 
static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query 
itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = 
catalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = 
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, 
catalogTable, conf)
+
+    val staticPartitionValuesExprs = 
createStaticPartitionValuesExpressions(staticPartitionValues, 
targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, 
coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will 
attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write 
incompatible data to table") =>
+        planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = false, conf)
+    }
+  }
 
-    Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
+  private def validate(queryOutputSchema: StructType, partitionsSpec: 
Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
+    // Validate that partition-spec has proper format (it could be empty if 
all of the partition values are dynamic,
+    // ie there are no static partition-values specified)
+    if (partitionsSpec.nonEmpty && partitionsSpec.size != 
catalogTable.partitionSchema.size) {

Review Comment:
   /nit can use ValidationUtils here



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, 
overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" 
parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, 
insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, 
alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the 
schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result 
table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where 
needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs 
</li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition 
values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => 
p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current 
input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: 
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = 
getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => 
staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), 
f.dataType, conf)
-        Alias(castAttr, f.name)()
-      })
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these 
will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter 
out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) 
name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having 
static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query 
itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = 
catalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = 
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, 
catalogTable, conf)
+
+    val staticPartitionValuesExprs = 
createStaticPartitionValuesExpressions(staticPartitionValues, 
targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, 
coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will 
attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write 
incompatible data to table") =>
+        planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = false, conf)
+    }
+  }
 
-    Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
+  private def validate(queryOutputSchema: StructType, partitionsSpec: 
Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
+    // Validate that partition-spec has proper format (it could be empty if 
all of the partition values are dynamic,
+    // ie there are no static partition-values specified)
+    if (partitionsSpec.nonEmpty && partitionsSpec.size != 
catalogTable.partitionSchema.size) {
+      throw new HoodieException(s"Required partition schema is: 
${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " +
+        s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}")
+    }
+
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+    val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ 
staticPartitionValues.keys.map(StructField(_, StringType)))
+
+    // Assert that query provides all the required columns
+    if (!conforms(fullQueryOutputSchema, 
catalogTable.tableSchemaWithoutMetaFields)) {

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,100 +77,111 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, 
overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" 
parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {

Review Comment:
   i think this makes sense, because normally when people do INSERT OVERWRITE 
they meant it for partition overwrite. but if the table has no partition, it's 
meant for INSERT OVERWRITE TABLE, so we do the actual overwrite.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
             sparkContext.getConf.registerKryoClasses(
               Array(classOf[org.apache.avro.generic.GenericData],
                 classOf[org.apache.avro.Schema]))
-            var schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val lastestSchema = getLatestTableSchema(fs, basePath, 
sparkContext, schema)
+
+            // TODO(HUDI-4472) revisit and simplify schema handling
+            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
+
+            val enabledSchemaEvolution = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
             var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-            if (reconcileSchema && 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-              && internalSchemaOpt.isEmpty) {
-              // force apply full schema evolution.
-              internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(schema))
-            }
-            if (reconcileSchema) {
-              schema = lastestSchema
-            }
-            if (internalSchemaOpt.isDefined) {
-              // Apply schema evolution.
-              val mergedSparkSchema = if (!reconcileSchema) {
-                
AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema,
 lastestSchema))
+
+            val writerSchema: Schema =
+              if (reconcileSchema) {
+                // In case we need to reconcile the schema and schema 
evolution is enabled,
+                // we will force-apply schema evolution to the writer's schema
+                if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) {
+                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+                }
+
+                if (internalSchemaOpt.isDefined) {
+                  // Apply schema evolution, by auto-merging write schema and 
read schema
+                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
+                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
+                  // In case schema reconciliation is enabled and source and 
latest table schemas
+                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
+                  // pick latest table's schema as the writer's schema
+                  latestTableSchema
+                } else {
+                  // Otherwise fallback to original source's schema
+                  sourceSchema
+                }
               } else {
-                // Auto merge write schema and read schema.
-                val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
-                
AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema,
 lastestSchema.getName))
+                // In case reconciliation is disabled, we still have to do 
nullability attributes
+                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
+                // the data already committed in the table
+                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)

Review Comment:
   > In the original logic, we will eventually call 
AvroConversionUtils.convertStructTypeToAvroSchema to maintain namespace 
consistency
   
   @alexeykudinkin do we have a UT to ensure namespace consistent?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, 
overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" 
parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, 
insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, 
alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the 
schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result 
table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where 
needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs 
</li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition 
values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => 
p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current 
input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: 
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = 
getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => 
staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), 
f.dataType, conf)
-        Alias(castAttr, f.name)()
-      })
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these 
will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter 
out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) 
name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having 
static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query 
itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = 
catalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = 
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, 
catalogTable, conf)
+
+    val staticPartitionValuesExprs = 
createStaticPartitionValuesExpressions(staticPartitionValues, 
targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, 
coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will 
attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write 
incompatible data to table") =>

Review Comment:
   relying on the error message is risky. can't we just try match positions 
right away?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to