yihua commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r967193263


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -169,23 +179,107 @@ object HoodieSparkSqlWriter {
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-      val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
+      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))
+
+      val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+      val reconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+
+      val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
+      var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
+
+      val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+      val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, 
tableIdentifier, sparkContext.hadoopConfiguration)
+
+      val writerSchema: Schema = latestTableSchemaOpt match {
+        // In case table schema is empty we're just going to use the source 
schema as a
+        // writer's schema. No additional handling is required
+        case None => sourceSchema
+        // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+        case Some(latestTableSchema) =>
+          if (reconcileSchema) {
+            // In case we need to reconcile the schema and schema evolution is 
enabled,
+            // we will force-apply schema evolution to the writer's schema
+            if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
+              internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+            }
+
+            if (internalSchemaOpt.isDefined) {
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
+            } else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, 
latestTableSchema)) {
+              // In case schema reconciliation is enabled and source and 
latest table schemas
+              // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+              // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+              //
+              // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+              //       we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+              //       and [[latestTableSchema]] is treated as a writer's 
schema)
+              latestTableSchema
+            } else {
+              log.error(
+                s"""
+                   |Failed to reconcile incoming batch schema with the table's 
one.
+                   |Incoming schema ${sourceSchema.toString(true)}
+
+                   |Table's schema ${latestTableSchema.toString(true)}
+
+                   |""".stripMargin)
+              throw new SchemaCompatibilityException("Failed to reconcile 
incoming schema with the table's one")
+            }
+          } else {
+            // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+            // relative to the table's one, by doing a (minor) reconciliation 
of the nullability constraints:
+            // for ex, if in incoming schema column A is designated as 
non-null, but it's designated as nullable
+            // in the table's one we want to proceed w/ such operation, simply 
relaxing such constraint in the
+            // source schema.
+            val canonicalizedSourceSchema = 
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+            // In case reconciliation is disabled, we have to validate that 
the source's schema
+            // is compatible w/ the table's latest schema, such that we're 
able to read existing table's
+            // records using [[sourceSchema]].
+            if (TableSchemaResolver.isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema)) {
+              canonicalizedSourceSchema
+            } else {
+              log.error(
+                s"""
+                   |Incoming batch schema is not compatible with the table's 
one.
+                   |Incoming schema ${canonicalizedSourceSchema.toString(true)}
+                   |Table's schema ${latestTableSchema.toString(true)}
+                   |""".stripMargin)
+              throw new SchemaCompatibilityException("Incoming batch schema is 
not compatible with the table's one")
+            }
+          }
+      }
+
+      validateSchemaForHoodieIsDeleted(writerSchema)
+
+      // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS
+      //       We have to register w/ Kryo all of the Avro schemas that might 
potentially be used to decode
+      //       records into Avro format. Otherwise, Kryo wouldn't be able to 
apply an optimization allowing
+      //       it to avoid the need to ser/de the whole schema along _every_ 
Avro record
+      val targetAvroSchemas = sourceSchema +: writerSchema +: 
latestTableSchemaOpt.toSeq

Review Comment:
   It looks like this schema reconcilation logic is not specific to Spark.  
Could we extract it and allow Flink/Java engine to leverage it as well?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -295,91 +295,19 @@ private MessageType convertAvroSchemaToParquet(Schema 
schema) {
   }
 
   /**
-   * HUDI specific validation of schema evolution. Ensures that a newer schema 
can be used for the dataset by
-   * checking if the data written using the old schema can be read using the 
new schema.
+   * Establishes whether {@code prevSchema} is compatible w/ {@code 
newSchema}, as
+   * defined by Avro's {@link SchemaCompatibility}
    *
-   * HUDI requires a Schema to be specified in HoodieWriteConfig and is used 
by the HoodieWriteClient to
-   * create the records. The schema is also saved in the data files (parquet 
format) and log files (avro format).
-   * Since a schema is required each time new data is ingested into a HUDI 
dataset, schema can be evolved over time.
-   *
-   * New Schema is compatible only if:
-   * A1. There is no change in schema
-   * A2. A field has been added and it has a default value specified
-   *
-   * New Schema is incompatible if:
-   * B1. A field has been deleted
-   * B2. A field has been renamed (treated as delete + add)
-   * B3. A field's type has changed to be incompatible with the older type
-   *
-   * Issue with org.apache.avro.SchemaCompatibility:
-   *  org.apache.avro.SchemaCompatibility checks schema compatibility between 
a writer schema (which originally wrote
-   *  the AVRO record) and a readerSchema (with which we are reading the 
record). It ONLY guarantees that that each
-   *  field in the reader record can be populated from the writer record. 
Hence, if the reader schema is missing a
-   *  field, it is still compatible with the writer schema.
-   *
-   *  In other words, org.apache.avro.SchemaCompatibility was written to 
guarantee that we can read the data written
-   *  earlier. It does not guarantee schema evolution for HUDI (B1 above).
-   *
-   * Implementation: This function implements specific HUDI specific checks 
(listed below) and defers the remaining
-   * checks to the org.apache.avro.SchemaCompatibility code.
-   *
-   * Checks:
-   * C1. If there is no change in schema: success
-   * C2. If a field has been deleted in new schema: failure
-   * C3. If a field has been added in new schema: it should have default value 
specified
-   * C4. If a field has been renamed(treated as delete + add): failure
-   * C5. If a field type has changed: failure
-   *
-   * @param oldSchema Older schema to check.
-   * @param newSchema Newer schema to check.
-   * @return True if the schema validation is successful
-   *
-   * TODO revisit this method: it's implemented incorrectly as it might be 
applying different criteria
-   *      to top-level record and nested record (for ex, if that nested record 
is contained w/in an array)
+   * @param prevSchema previous instance of the schema
+   * @param newSchema new instance of the schema
    */
-  public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) 
{
-    if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == 
Schema.Type.RECORD) {
-      // record names must match:
-      if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) {
-        return false;
-      }
-
-      // Check that each field in the oldSchema can populated the newSchema
-      for (final Field oldSchemaField : oldSchema.getFields()) {
-        final Field newSchemaField = 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
-        if (newSchemaField == null) {
-          // C4 or C2: newSchema does not correspond to any field in the 
oldSchema
-          return false;
-        } else {
-          if (!isSchemaCompatible(oldSchemaField.schema(), 
newSchemaField.schema())) {
-            // C5: The fields do not have a compatible type
-            return false;
-          }
-        }
-      }
-
-      // Check that new fields added in newSchema have default values as they 
will not be
-      // present in oldSchema and hence cannot be populated on reading records 
from existing data.
-      for (final Field newSchemaField : newSchema.getFields()) {
-        final Field oldSchemaField = 
SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
-        if (oldSchemaField == null) {
-          if (newSchemaField.defaultVal() == null) {
-            // C3: newly added field in newSchema does not have a default value
-            return false;
-          }
-        }
-      }
-
-      // All fields in the newSchema record can be populated from the 
oldSchema record
-      return true;
-    } else {
-      // Use the checks implemented by Avro
-      // newSchema is the schema which will be used to read the records 
written earlier using oldSchema. Hence, in the
-      // check below, use newSchema as the reader schema and oldSchema as the 
writer schema.
-      org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult 
=
-          
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(newSchema, 
oldSchema);
-      return compatResult.getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-    }
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema) {
+    // NOTE: We're establishing compatibility of the {@code prevSchema} and 
{@code newSchema}
+    //       as following: {@code newSchema} is considered compatible to 
{@code prevSchema},
+    //       iff data written using {@code prevSchema} could be read by {@code 
newSchema}
+    SchemaCompatibility.SchemaPairCompatibility result =
+        SchemaCompatibility.checkReaderWriterCompatibility(newSchema, 
prevSchema);
+    return result.getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;

Review Comment:
   Basically the schema check is relaxed here without field-by-field 
comparison, with only read compatibility check.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -169,23 +179,107 @@ object HoodieSparkSqlWriter {
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-      val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
+      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))

Review Comment:
   Why do we need this now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to