alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r971445275
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -169,23 +179,107 @@ object HoodieSparkSqlWriter {
}
val commitActionType = CommitUtils.getCommitActionType(operation,
tableConfig.getTableType)
- val dropPartitionColumns =
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
+ // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+ sparkContext.getConf.registerKryoClasses(
+ Array(classOf[org.apache.avro.generic.GenericData],
+ classOf[org.apache.avro.Schema]))
+
+ val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+ val reconcileSchema =
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+
+ val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
+ var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext)
+
+ val sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
+ val latestTableSchemaOpt = getLatestTableSchema(spark, basePath,
tableIdentifier, sparkContext.hadoopConfiguration)
+
+ val writerSchema: Schema = latestTableSchemaOpt match {
+ // In case table schema is empty we're just going to use the source
schema as a
+ // writer's schema. No additional handling is required
+ case None => sourceSchema
+ // Otherwise, we need to make sure we reconcile incoming and latest
table schemas
+ case Some(latestTableSchema) =>
+ if (reconcileSchema) {
+ // In case we need to reconcile the schema and schema evolution is
enabled,
+ // we will force-apply schema evolution to the writer's schema
+ if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
+ internalSchemaOpt =
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+ }
+
+ if (internalSchemaOpt.isDefined) {
+ // Apply schema evolution, by auto-merging write schema and read
schema
+ val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+ AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getName)
+ } else if (TableSchemaResolver.isSchemaCompatible(sourceSchema,
latestTableSchema)) {
+ // In case schema reconciliation is enabled and source and
latest table schemas
+ // are compatible (as defined by
[[TableSchemaResolver#isSchemaCompatible]]), then we
+ // will rebase incoming batch onto the table's latest schema
(ie, reconcile them)
+ //
+ // NOTE: Since we'll be converting incoming batch from
[[sourceSchema]] into [[latestTableSchema]]
+ // we're validating in that order (where [[sourceSchema]]
is treated as a reader's schema,
+ // and [[latestTableSchema]] is treated as a writer's
schema)
+ latestTableSchema
+ } else {
+ log.error(
+ s"""
+ |Failed to reconcile incoming batch schema with the table's
one.
+ |Incoming schema ${sourceSchema.toString(true)}
+
+ |Table's schema ${latestTableSchema.toString(true)}
+
+ |""".stripMargin)
+ throw new SchemaCompatibilityException("Failed to reconcile
incoming schema with the table's one")
+ }
+ } else {
+ // Before validating whether schemas are compatible, we need to
"canonicalize" source's schema
+ // relative to the table's one, by doing a (minor) reconciliation
of the nullability constraints:
+ // for ex, if in incoming schema column A is designated as
non-null, but it's designated as nullable
+ // in the table's one we want to proceed w/ such operation, simply
relaxing such constraint in the
+ // source schema.
+ val canonicalizedSourceSchema =
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
+ // In case reconciliation is disabled, we have to validate that
the source's schema
+ // is compatible w/ the table's latest schema, such that we're
able to read existing table's
+ // records using [[sourceSchema]].
+ if (TableSchemaResolver.isSchemaCompatible(latestTableSchema,
canonicalizedSourceSchema)) {
+ canonicalizedSourceSchema
+ } else {
+ log.error(
+ s"""
+ |Incoming batch schema is not compatible with the table's
one.
+ |Incoming schema ${canonicalizedSourceSchema.toString(true)}
+ |Table's schema ${latestTableSchema.toString(true)}
+ |""".stripMargin)
+ throw new SchemaCompatibilityException("Incoming batch schema is
not compatible with the table's one")
+ }
+ }
+ }
+
+ validateSchemaForHoodieIsDeleted(writerSchema)
+
+ // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS
+ // We have to register w/ Kryo all of the Avro schemas that might
potentially be used to decode
+ // records into Avro format. Otherwise, Kryo wouldn't be able to
apply an optimization allowing
+ // it to avoid the need to ser/de the whole schema along _every_
Avro record
+ val targetAvroSchemas = sourceSchema +: writerSchema +:
latestTableSchemaOpt.toSeq
Review Comment:
Good call! Am going to extract it as a standalone method (we can take the
unification as a follow-up)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]