alexeykudinkin commented on code in PR #6213:
URL: https://github.com/apache/hudi/pull/6213#discussion_r932717646
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig {
* @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession,
- table: CatalogTable,
- query: LogicalPlan,
- insertPartitions: Map[String, Option[String]],
- overwrite: Boolean,
- refreshTable: Boolean = true,
- extraOptions: Map[String, String] = Map.empty): Boolean = {
-
- val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
- val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession,
overwrite, insertPartitions, extraOptions)
-
- val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
+ table: CatalogTable,
+ query: LogicalPlan,
+ partitionSpec: Map[String, Option[String]],
+ overwrite: Boolean,
+ refreshTable: Boolean = true,
+ extraOptions: Map[String, String] = Map.empty): Boolean = {
+ val catalogTable = new HoodieCatalogTable(sparkSession, table)
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
overwrite, partitionSpec, extraOptions)
+
+ // NOTE: In case of partitioned table we override specified "overwrite"
parameter
+ // to instead append to the dataset
+ val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
SaveMode.Overwrite
} else {
- // for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
- val conf = sparkSession.sessionState.conf
- val alignedQuery = alignOutputFields(query, hoodieCatalogTable,
insertPartitions, conf)
- // If we create dataframe using the Dataset.ofRows(sparkSession,
alignedQuery),
- // The nullable attribute of fields will lost.
- // In order to pass the nullable attribute to the inputDF, we specify the
schema
- // of the rdd.
- val inputDF = sparkSession.createDataFrame(
- Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
- val success =
- HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config,
inputDF)._1
- if (success) {
- if (refreshTable) {
- sparkSession.catalog.refreshTable(table.identifier.unquotedString)
- }
- true
- } else {
- false
+
+ val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec,
sparkSession.sessionState.conf)
+
+ val (success, _, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config,
Dataset.ofRows(sparkSession, alignedQuery))
+
+ if (success && refreshTable) {
+ sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
+
+ success
}
/**
- * Aligned the type and name of query's output fields with the result
table's fields.
- * @param query The insert query which to aligned.
- * @param hoodieCatalogTable The result hoodie catalog table.
- * @param insertPartitions The insert partition map.
- * @param conf The SQLConf.
- * @return
+ * Align provided [[query]]'s output with the expected [[catalogTable]]
schema by
+ *
+ * <ul>
+ * <li>Performing type coercion (casting corresponding outputs, where
needed)</li>
+ * <li>Adding aliases (matching column names) to corresponding outputs
</li>
+ * </ul>
+ *
+ * @param query target query whose output is to be inserted
+ * @param catalogTable catalog table
+ * @param partitionsSpec partition spec specifying static/dynamic partition
values
+ * @param conf Spark's [[SQLConf]]
*/
- private def alignOutputFields(
- query: LogicalPlan,
- hoodieCatalogTable: HoodieCatalogTable,
- insertPartitions: Map[String, Option[String]],
- conf: SQLConf): LogicalPlan = {
-
- val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
- val staticPartitionValues = insertPartitions.filter(p =>
p._2.isDefined).mapValues(_.get)
- assert(staticPartitionValues.isEmpty ||
- insertPartitions.size == targetPartitionSchema.size,
- s"Required partition columns is: ${targetPartitionSchema.json}, Current
input partitions " +
- s"is: ${staticPartitionValues.mkString("," + "")}")
-
- val queryOutputWithoutMetaFields = removeMetaFields(query.output)
- assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
- == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
- s"Required select columns count:
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
- s"Current select columns(including static partition column) count: " +
- s"${staticPartitionValues.size +
queryOutputWithoutMetaFields.size},columns: " +
- s"(${(queryOutputWithoutMetaFields.map(_.name) ++
staticPartitionValues.keys).mkString(",")})")
-
- val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
- hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f =>
staticPartitionValues.contains(f.name)))
- val dataProjectsWithoutMetaFields =
getTableFieldsAlias(queryOutputWithoutMetaFields,
- dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
- val partitionProjects = targetPartitionSchema.fields.filter(f =>
staticPartitionValues.contains(f.name))
- .map(f => {
- val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
- s"Missing static partition value for: ${f.name}")
- val castAttr = castIfNeeded(Literal.create(staticPartitionValue),
f.dataType, conf)
- Alias(castAttr, f.name)()
- })
+ private def alignQueryOutput(query: LogicalPlan,
+ catalogTable: HoodieCatalogTable,
+ partitionsSpec: Map[String, Option[String]],
+ conf: SQLConf): LogicalPlan = {
+
+ val targetPartitionSchema = catalogTable.partitionSchema
+ val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+ validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+ // Make sure we strip out meta-fields from the incoming dataset (these
will have to be discarded anyway)
+ val cleanedQuery = stripMetaFields(query)
+ // To validate and align properly output of the query, we simply filter
out partition columns with already
+ // provided static values from the table's schema
+ //
+ // NOTE: This is a crucial step: since coercion might rely on either of a)
name-based or b) positional-based
+ // matching it's important to strip out partition columns, having
static values provided in the partition spec,
+ // since such columns wouldn't be otherwise specified w/in the query
itself and therefore couldn't be matched
+ // positionally for example
+ val expectedQueryColumns =
catalogTable.tableSchemaWithoutMetaFields.filterNot(f =>
staticPartitionValues.contains(f.name))
+ val coercedQueryOutput =
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery,
catalogTable, conf)
+
+ val staticPartitionValuesExprs =
createStaticPartitionValuesExpressions(staticPartitionValues,
targetPartitionSchema, conf)
+
+ Project(coercedQueryOutput.output ++ staticPartitionValuesExprs,
coercedQueryOutput)
+ }
+
+ private def coerceQueryOutputColumns(expectedSchema: StructType,
+ query: LogicalPlan,
+ catalogTable: HoodieCatalogTable,
+ conf: SQLConf): LogicalPlan = {
+ val planUtils = sparkAdapter.getCatalystPlanUtils
+ try {
+ planUtils.resolveOutputColumns(catalogTable.catalogTableName,
expectedSchema.toAttributes, query, byName = true, conf)
+ } catch {
+ // NOTE: In case matching by name didn't match the query output, we will
attempt positional matching
+ case ae: AnalysisException if ae.getMessage().startsWith("Cannot write
incompatible data to table") =>
Review Comment:
Agreed, not happy about the setup either but that's the best we can do at
this point: Spark uses single `AnalysisException` type w/o any collateral trait
that i can use here to distinguish (other than message).
I checked older versions of Spark and they are all consistent, and in
regards to the future ones if that changes it would fail loud in our tests
(whenever positional matching would need to be applied but won't) so am not
particularly worried about this going unnoticed.
> can't we just try match positions right away?
We have to do hybrid, since this is what Spark does
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]