nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1030756548


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +385,91 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {
+      // In case table schema is empty we're just going to use the source 
schema as a
+      // writer's schema. No additional handling is required
+      case None => sourceSchema
+      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+      case Some(latestTableSchema) =>
+        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
+        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
+        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+          
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+          AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+        } else {
+          sourceSchema
+        }
+
+        if (shouldReconcileSchema) {
+          internalSchemaOpt match {
+            case Some(internalSchema) =>
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+
+            case None =>
+              // In case schema reconciliation is enabled we will employ 
(legacy) reconciliation
+              // strategy to produce target writer's schema (see definition 
below)
+              val (reconciledSchema, isCompatible) = 
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)

Review Comment:
   👍 



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -44,31 +61,65 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+  // TODO java-doc
+  public static boolean isCompatibleProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return isProjectionOfInternal(sourceSchema, targetSchema, 
AvroSchemaUtils::isSchemaCompatible);
+  }
+
   /**
-   * Validate whether the {@code targetSchema} is a projection of {@code 
sourceSchema}.
+   * Validate whether the {@code targetSchema} is a strict projection of 
{@code sourceSchema}.
    *
-   * Schema B is considered a projection of schema A iff
+   * Schema B is considered a strict projection of schema A iff
    * <ol>
    *   <li>Schemas A and B are equal, or</li>
+   *   <li>Schemas A and B are array schemas and element-type of B is a strict 
projection
+   *   of the element-type of A, or</li>
+   *   <li>Schemas A and B are map schemas and value-type of B is a strict 
projection
+   *   of the value-type of A, or</li>
+   *   <li>Schemas A and B are union schemas (of the same size) and every 
element-type of B
+   *   is a strict projection of the corresponding element-type of A, or</li>
    *   <li>Schemas A and B are record schemas and every field of the record B 
has corresponding
    *   counterpart (w/ the same name) in the schema A, such that the schema of 
the field of the schema
-   *   B is also a projection of the A field's schema</li>
+   *   B is also a strict projection of the A field's schema</li>
    * </ol>
    */
-  public static boolean isProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
-    if (sourceSchema.getType() != Schema.Type.RECORD
-        || targetSchema.getType() != Schema.Type.RECORD) {
-      return Objects.equals(sourceSchema, targetSchema);
-    }
+  public static boolean isStrictProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return isProjectionOfInternal(sourceSchema, targetSchema, Objects::equals);
+  }
 
-    for (Schema.Field targetField : targetSchema.getFields()) {
-      Schema.Field sourceField = sourceSchema.getField(targetField.name());
-      if (sourceField == null || !isProjectionOf(sourceField.schema(), 
targetField.schema())) {
-        return false;
+  private static boolean isProjectionOfInternal(Schema sourceSchema,
+                                                Schema targetSchema,
+                                                BiFunction<Schema, Schema, 
Boolean> atomicTypeEqualityPredicate) {
+    if (sourceSchema.getType() == targetSchema.getType()) {
+      if (sourceSchema.getType() == Schema.Type.RECORD) {
+        for (Schema.Field targetField : targetSchema.getFields()) {
+          Schema.Field sourceField = sourceSchema.getField(targetField.name());
+          if (sourceField == null || 
!isProjectionOfInternal(sourceField.schema(), targetField.schema(), 
atomicTypeEqualityPredicate)) {
+            return false;
+          }
+        }
+        return true;
+      } else if (sourceSchema.getType() == Schema.Type.ARRAY) {
+        return isProjectionOfInternal(sourceSchema.getElementType(), 
targetSchema.getElementType(), atomicTypeEqualityPredicate);
+      } else if (sourceSchema.getType() == Schema.Type.MAP) {
+        return isProjectionOfInternal(sourceSchema.getValueType(), 
targetSchema.getValueType(), atomicTypeEqualityPredicate);
+      } else if (sourceSchema.getType() == Schema.Type.UNION) {
+        List<Schema> sourceNestedSchemas = sourceSchema.getTypes();
+        List<Schema> targetNestedSchemas = targetSchema.getTypes();
+        if (sourceNestedSchemas.size() != targetNestedSchemas.size()) {
+          return false;
+        }
+
+        for (int i = 0; i < sourceNestedSchemas.size(); ++i) {

Review Comment:
   so, ordering of UNION also matters in equality/projection/compatibility is 
it ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to