This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e39bfb6940 [HUDI-6947] Refactored 
HoodieSchemaUtils.deduceWriterSchema with many flags (#10810)
2e39bfb6940 is described below

commit 2e39bfb694099293b77eec9977e5e46af97af18b
Author: Geser Dugarov <[email protected]>
AuthorDate: Thu Mar 7 12:23:38 2024 +0700

    [HUDI-6947] Refactored HoodieSchemaUtils.deduceWriterSchema with many flags 
(#10810)
---
 .../scala/org/apache/hudi/HoodieSchemaUtils.scala  | 176 +++++++++++----------
 1 file changed, 93 insertions(+), 83 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index cfc43453e9c..9aeff64f237 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -76,107 +76,117 @@ object HoodieSchemaUtils {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
-    val setNullForMissingColumns = 
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
-      
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
-    val shouldReconcileSchema = 
opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
-      
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
-    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
-      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
-
     latestTableSchemaOpt match {
-      // In case table schema is empty we're just going to use the source 
schema as a
-      // writer's schema.
+      // If table schema is empty, then we use the source schema as a writer's 
schema.
       case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
       // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
       case Some(latestTableSchemaWithMetaFields) =>
-        // NOTE: Meta-fields will be unconditionally injected by Hudi writing 
handles, for the sake of
-        //       deducing proper writer schema we're stripping them to make 
sure we can perform proper
-        //       analysis
-        //add call to fix null ordering to ensure backwards compatibility
+        // NOTE: Meta-fields will be unconditionally injected by Hudi writing 
handles, for the sake of deducing proper writer schema
+        //       we're stripping them to make sure we can perform proper 
analysis
+        // add call to fix null ordering to ensure backwards compatibility
         val latestTableSchema = 
AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))
+
         // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
         // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
         // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
         // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
         // Also, we promote types to the latest table schema if possible.
-        val shouldCanonicalizeSchema = 
opts.getOrDefault(CANONICALIZE_SCHEMA.key,
-          CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
-        val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
-          SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
-
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(CANONICALIZE_SCHEMA.key, 
CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
         val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
           canonicalizeSchema(sourceSchema, latestTableSchema, opts)
         } else {
           AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
         }
 
-        val allowAutoEvolutionColumnDrop = 
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
-          
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
-
+        val shouldReconcileSchema = 
opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
+          
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
         if (shouldReconcileSchema) {
-          internalSchemaOpt match {
-            case Some(internalSchema) =>
-              // Apply schema evolution, by auto-merging write schema and read 
schema
-              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
-              val evolvedSchema = 
AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
-              val shouldRemoveMetaDataFromInternalSchema = 
sourceSchema.getFields().filter(f => 
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
-              if (shouldRemoveMetaDataFromInternalSchema) 
HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
-            case None =>
-              // In case schema reconciliation is enabled we will employ 
(legacy) reconciliation
-              // strategy to produce target writer's schema (see definition 
below)
-              val (reconciledSchema, isCompatible) =
-                reconcileSchemasLegacy(latestTableSchema, 
canonicalizedSourceSchema)
-
-              // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
-              //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
-              //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
-              //       only incoming dataset's projection has to match the 
table's schema, and not the whole one
-              if (!shouldValidateSchemasCompatibility || isCompatible) {
-                reconciledSchema
-              } else {
-                log.error(
-                  s"""Failed to reconcile incoming batch schema with the 
table's one.
-                     |Incoming schema ${sourceSchema.toString(true)}
-                     |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
-                     |Table's schema ${latestTableSchema.toString(true)}
-                     |""".stripMargin)
-                throw new SchemaCompatibilityException("Failed to reconcile 
incoming schema with the table's one")
-              }
-          }
+          deduceWriterSchemaWithReconcile(sourceSchema, 
canonicalizedSourceSchema, latestTableSchema, internalSchemaOpt, opts)
+        } else {
+          deduceWriterSchemaWithoutReconcile(sourceSchema, 
canonicalizedSourceSchema, latestTableSchema, opts)
+        }
+    }
+  }
+
+  /**
+   * Deducing with disabled reconciliation.
+   * We have to validate that the source's schema is compatible w/ the table's 
latest schema,
+   * such that we're able to read existing table's records using 
[[sourceSchema]].
+   */
+  private def deduceWriterSchemaWithoutReconcile(sourceSchema: Schema,
+                                                 canonicalizedSourceSchema: 
Schema,
+                                                 latestTableSchema: Schema,
+                                                 opts: Map[String, String]): 
Schema = {
+    // NOTE: In some cases we need to relax constraint of incoming dataset's 
schema to be compatible
+    //       w/ the table's one and allow schemas to diverge. This is required 
in cases where
+    //       partial updates will be performed (for ex, `MERGE INTO` Spark SQL 
statement) and as such
+    //       only incoming dataset's projection has to match the table's 
schema, and not the whole one
+    val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+    val allowAutoEvolutionColumnDrop = 
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
+      
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
+    val setNullForMissingColumns = 
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
+      
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
+
+    if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && 
!allowAutoEvolutionColumnDrop) {
+      // Default behaviour
+      val reconciledSchema = if (setNullForMissingColumns) {
+        AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
latestTableSchema)
+      } else {
+        canonicalizedSourceSchema
+      }
+      checkValidEvolution(reconciledSchema, latestTableSchema)
+      reconciledSchema
+    } else {
+      // If it's merge into writes, we don't check for projection nor schema 
compatibility. Writers down the line will take care of it.
+      // Or it's not merge into writes, and we don't validate schema, but we 
allow to drop columns automatically.
+      // Or it's not merge into writes, we validate schema, and schema is 
compatible.
+      if (shouldValidateSchemasCompatibility) {
+        checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, 
true,
+          allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
+      }
+      canonicalizedSourceSchema
+    }
+  }
+
+  /**
+   * Deducing with enabled reconciliation.
+   * Marked as Deprecated.
+   */
+  private def deduceWriterSchemaWithReconcile(sourceSchema: Schema,
+                                              canonicalizedSourceSchema: 
Schema,
+                                              latestTableSchema: Schema,
+                                              internalSchemaOpt: 
Option[InternalSchema],
+                                              opts: Map[String, String]): 
Schema = {
+    internalSchemaOpt match {
+      case Some(internalSchema) =>
+        // Apply schema evolution, by auto-merging write schema and read schema
+        val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
+        val evolvedSchema = 
AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+        val shouldRemoveMetaDataFromInternalSchema = 
sourceSchema.getFields().filter(f => 
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
+        if (shouldRemoveMetaDataFromInternalSchema) 
HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
+      case None =>
+        // In case schema reconciliation is enabled we will employ (legacy) 
reconciliation
+        // strategy to produce target writer's schema (see definition below)
+        val (reconciledSchema, isCompatible) =
+          reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
+
+        // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
+        //       w/ the table's one and allow schemas to diverge. This is 
required in cases where
+        //       partial updates will be performed (for ex, `MERGE INTO` Spark 
SQL statement) and as such
+        //       only incoming dataset's projection has to match the table's 
schema, and not the whole one
+        val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, 
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+        if (!shouldValidateSchemasCompatibility || isCompatible) {
+          reconciledSchema
         } else {
-          // In case reconciliation is disabled, we have to validate that the 
source's schema
-          // is compatible w/ the table's latest schema, such that we're able 
to read existing table's
-          // records using [[sourceSchema]].
-          //
-          // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
-          //       w/ the table's one and allow schemas to diverge. This is 
required in cases where
-          //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
-          //       only incoming dataset's projection has to match the table's 
schema, and not the whole one
-
-          if (mergeIntoWrites) {
-            // if its merge into writes, do not check for projection nor 
schema compatibility. Writers down the line will
-            // take care of it.
-            canonicalizedSourceSchema
-          } else {
-            if (!shouldValidateSchemasCompatibility) {
-              // if no validation is enabled, check for col drop
-              if (allowAutoEvolutionColumnDrop) {
-                canonicalizedSourceSchema
-              } else {
-                val reconciledSchema = if (setNullForMissingColumns) {
-                  
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
latestTableSchema)
-                } else {
-                  canonicalizedSourceSchema
-                }
-                checkValidEvolution(reconciledSchema, latestTableSchema)
-                reconciledSchema
-              }
-            } else {
-              checkSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema, true,
-                allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
-              canonicalizedSourceSchema
-            }
-          }
+          log.error(
+            s"""Failed to reconcile incoming batch schema with the table's one.
+               |Incoming schema ${sourceSchema.toString(true)}
+               |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
+               |Table's schema ${latestTableSchema.toString(true)}
+               |""".stripMargin)
+          throw new SchemaCompatibilityException("Failed to reconcile incoming 
schema with the table's one")
         }
     }
   }

Reply via email to