JingsongLi commented on code in PR #7789:
URL: https://github.com/apache/paimon/pull/7789#discussion_r3212622974
##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala:
##########
@@ -41,21 +41,66 @@ private[spark] trait SchemaHelper extends
WithFileStoreTable {
def mergeSchema(sparkSession: SparkSession, input: DataFrame, options:
Options): DataFrame = {
val dataSchema = SparkSystemColumns.filterSparkSystemColumns(input.schema)
- val newTableSchema = mergeSchema(input.schema, options)
- if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
+ val writeSchema = mergeSchema(dataSchema, options)
+ if (!PaimonUtils.sameType(writeSchema, dataSchema)) {
val resolve = sparkSession.sessionState.conf.resolver
- val cols = alignColumns(newTableSchema, dataSchema, resolve)
+ val cols = SchemaHelper.alignColumns(writeSchema, dataSchema, resolve)
input.select(cols: _*)
} else {
input
}
}
+ def mergeSchema(dataSchema: StructType, options: Options): StructType = {
+ val mergeSchemaEnabled =
+ options.get(SparkConnectorOptions.MERGE_SCHEMA) ||
OptionUtils.writeMergeSchemaEnabled()
+ if (!mergeSchemaEnabled) {
+ return dataSchema
+ }
+
+ val filteredDataSchema =
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
+ val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
|| OptionUtils
+ .writeMergeSchemaExplicitCastEnabled()
+ SchemaHelper.mergeAndCommitSchema(table, filteredDataSchema,
allowExplicitCast).foreach {
+ updatedTable => newTable = Some(updatedTable)
+ }
+
+ val writeSchema =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
+ if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
+ writeSchema
+ } else {
+ filteredDataSchema
+ }
+ }
+
+ def updateTableWithOptions(options: Map[String, String]): Unit = {
+ newTable = Some(table.copy(options.asJava))
+ }
+}
+
+private[spark] object SchemaHelper {
+
+ /**
+ * Merge the given dataSchema into the table's schema. If the schema
changed, commit the change
+ * and return the updated table; otherwise return None.
+ */
+ def mergeAndCommitSchema(
+ table: FileStoreTable,
+ dataSchema: StructType,
+ allowExplicitCast: Boolean): Option[FileStoreTable] = {
+ val dataRowType =
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
+ if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+ Some(table.copyWithLatestSchema())
+ } else {
+ None
+ }
+ }
+
/**
* Recursively align columns from dataSchema to targetSchema by name. For
nested struct fields,
* reorder and fill nulls for missing sub-fields.
*/
- private def alignColumns(
+ def alignColumns(
Review Comment:
SchemaHelper.scala now handles ArrayType<StructType> alignment via
transform, but MapType<StructType> is not handled. Meanwhile,
AssignmentAlignmentHelper.reorderFieldsByName does handle MapType. This
inconsistency means the DataFrame write path won't align map values while the
MERGE path will.
Is this a problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]