xiarixiaoyao commented on code in PR #6361:
URL: https://github.com/apache/hudi/pull/6361#discussion_r1055253215


##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -109,40 +116,129 @@ public static InternalSchema reconcileSchema(Schema 
incomingSchema, InternalSche
   }
 
   /**
-   * Canonical the nullability.
-   * Do not allow change cols Nullability field from optional to required.
-   * If above problem occurs, try to correct it.
+   * Reconciles nullability requirements b/w {@code source} and {@code target} 
schemas,
+   * by adjusting these of the {@code source} schema to be in-line with the 
ones of the
+   * {@code target} one
    *
-   * @param writeSchema writeSchema hoodie used to write data.
-   * @param readSchema read schema
-   * @return canonical Schema
+   * @param sourceSchema source schema that needs reconciliation
+   * @param targetSchema target schema that source schema will be reconciled 
against
+   * @return schema (based off {@code source} one) that has nullability 
constraints reconciled
    */
-  public static Schema canonicalizeColumnNullability(Schema writeSchema, 
Schema readSchema) {
-    if (writeSchema.getFields().isEmpty() || readSchema.getFields().isEmpty()) 
{
-      return writeSchema;
+  public static Schema reconcileNullability(Schema sourceSchema, Schema 
targetSchema) {
+    if (sourceSchema.getFields().isEmpty() || 
targetSchema.getFields().isEmpty()) {
+      return sourceSchema;
     }
-    InternalSchema writeInternalSchema = 
AvroInternalSchemaConverter.convert(writeSchema);
-    InternalSchema readInternalSchema = 
AvroInternalSchemaConverter.convert(readSchema);
-    List<String> colNamesWriteSchema = 
writeInternalSchema.getAllColsFullName();
-    List<String> colNamesFromReadSchema = 
readInternalSchema.getAllColsFullName();
-    // try to deal with optional change. now when we use sparksql to update 
hudi table,
-    // sparksql Will change the col type from optional to required, this is a 
bug.
-    List<String> candidateUpdateCols = colNamesWriteSchema.stream().filter(f 
-> {
-      boolean exist = colNamesFromReadSchema.contains(f);
-      if (exist && (writeInternalSchema.findField(f).isOptional() != 
readInternalSchema.findField(f).isOptional())) {
-        return true;
-      } else {
-        return false;
-      }
-    }).collect(Collectors.toList());
+
+    InternalSchema sourceInternalSchema = convert(sourceSchema);
+    InternalSchema targetInternalSchema = convert(targetSchema);
+
+    List<String> colNamesSourceSchema = 
sourceInternalSchema.getAllColsFullName();
+    List<String> colNamesTargetSchema = 
targetInternalSchema.getAllColsFullName();
+    List<String> candidateUpdateCols = colNamesSourceSchema.stream()
+        .filter(f -> colNamesTargetSchema.contains(f)
+            && sourceInternalSchema.findField(f).isOptional() != 
targetInternalSchema.findField(f).isOptional())
+        .collect(Collectors.toList());
+
     if (candidateUpdateCols.isEmpty()) {
-      return writeSchema;
+      return sourceSchema;
+    }
+
+    // Reconcile nullability constraints (by executing phony schema change)
+    TableChanges.ColumnUpdateChange schemaChange =
+        reduce(candidateUpdateCols, 
TableChanges.ColumnUpdateChange.get(sourceInternalSchema),
+          (change, field) -> change.updateColumnNullability(field, true));
+
+    return 
convert(SchemaChangeUtils.applyTableChanges2Schema(sourceInternalSchema, 
schemaChange), sourceSchema.getName());

Review Comment:
   sourceSchema.getFullName() ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to