alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r971445275


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -169,23 +179,107 @@ object HoodieSparkSqlWriter {
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-      val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
+      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))
+
+      val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+      val reconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+
+      val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
+      var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
+
+      val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+      val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, 
tableIdentifier, sparkContext.hadoopConfiguration)
+
+      val writerSchema: Schema = latestTableSchemaOpt match {
+        // In case table schema is empty we're just going to use the source 
schema as a
+        // writer's schema. No additional handling is required
+        case None => sourceSchema
+        // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+        case Some(latestTableSchema) =>
+          if (reconcileSchema) {
+            // In case we need to reconcile the schema and schema evolution is 
enabled,
+            // we will force-apply schema evolution to the writer's schema
+            if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
+              internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+            }
+
+            if (internalSchemaOpt.isDefined) {
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
+            } else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, 
latestTableSchema)) {
+              // In case schema reconciliation is enabled and source and 
latest table schemas
+              // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+              // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+              //
+              // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+              //       we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+              //       and [[latestTableSchema]] is treated as a writer's 
schema)
+              latestTableSchema
+            } else {
+              log.error(
+                s"""
+                   |Failed to reconcile incoming batch schema with the table's 
one.
+                   |Incoming schema ${sourceSchema.toString(true)}
+
+                   |Table's schema ${latestTableSchema.toString(true)}
+
+                   |""".stripMargin)
+              throw new SchemaCompatibilityException("Failed to reconcile 
incoming schema with the table's one")
+            }
+          } else {
+            // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+            // relative to the table's one, by doing a (minor) reconciliation 
of the nullability constraints:
+            // for ex, if in incoming schema column A is designated as 
non-null, but it's designated as nullable
+            // in the table's one we want to proceed w/ such operation, simply 
relaxing such constraint in the
+            // source schema.
+            val canonicalizedSourceSchema = 
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+            // In case reconciliation is disabled, we have to validate that 
the source's schema
+            // is compatible w/ the table's latest schema, such that we're 
able to read existing table's
+            // records using [[sourceSchema]].
+            if (TableSchemaResolver.isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema)) {
+              canonicalizedSourceSchema
+            } else {
+              log.error(
+                s"""
+                   |Incoming batch schema is not compatible with the table's 
one.
+                   |Incoming schema ${canonicalizedSourceSchema.toString(true)}
+                   |Table's schema ${latestTableSchema.toString(true)}
+                   |""".stripMargin)
+              throw new SchemaCompatibilityException("Incoming batch schema is 
not compatible with the table's one")
+            }
+          }
+      }
+
+      validateSchemaForHoodieIsDeleted(writerSchema)
+
+      // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS
+      //       We have to register w/ Kryo all of the Avro schemas that might 
potentially be used to decode
+      //       records into Avro format. Otherwise, Kryo wouldn't be able to 
apply an optimization allowing
+      //       it to avoid the need to ser/de the whole schema along _every_ 
Avro record
+      val targetAvroSchemas = sourceSchema +: writerSchema +: 
latestTableSchemaOpt.toSeq

Review Comment:
   Good call! Am going to extract it as a standalone method (we can take the 
unification as a follow-up)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to