nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1027247902


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {
+      // In case table schema is empty we're just going to use the source 
schema as a
+      // writer's schema. No additional handling is required
+      case None => sourceSchema
+      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+      case Some(latestTableSchema) =>
+        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
+        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
+        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+          
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+          AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+        } else {
+          sourceSchema
+        }
+
+        if (shouldReconcileSchema) {
+          internalSchemaOpt match {
+            case Some(internalSchema) =>
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+
+            case None =>
+              // In case schema reconciliation is enabled and source and 
latest table schemas
+              // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+              // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+              //
+              // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+              //       we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+              //       and [[latestTableSchema]] is treated as a writer's 
schema)
+              //
+              // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
+              //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
+              //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
+              //       only incoming dataset's projection has to match the 
table's schema, and not the whole one
+              if (!shouldValidateSchemasCompatibility || 
TableSchemaResolver.isSchemaCompatible(canonicalizedSourceSchema, 
latestTableSchema)) {

Review Comment:
   I was interested in scenario2. 
   i.e. table schema has 4 cols. new incoming batch has 5 cols. but reconcile 
schema config is also enabled. can you confirm we are good for this scenario ? 
   
   bcoz, reconcile config is just a hint. there could be multiple producers. 
and one of them could be lagging (w/ old schema). but others could have caught 
up. or one of the producer could be evolving the schema. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to