xiarixiaoyao commented on code in PR #6361:
URL: https://github.com/apache/hudi/pull/6361#discussion_r1055263282
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -109,40 +116,129 @@ public static InternalSchema reconcileSchema(Schema
incomingSchema, InternalSche
}
/**
- * Canonical the nullability.
- * Do not allow change cols Nullability field from optional to required.
- * If above problem occurs, try to correct it.
+ * Reconciles nullability requirements b/w {@code source} and {@code target}
schemas,
+ * by adjusting these of the {@code source} schema to be in-line with the
ones of the
+ * {@code target} one
*
- * @param writeSchema writeSchema hoodie used to write data.
- * @param readSchema read schema
- * @return canonical Schema
+ * @param sourceSchema source schema that needs reconciliation
+ * @param targetSchema target schema that source schema will be reconciled
against
+ * @return schema (based off {@code source} one) that has nullability
constraints reconciled
*/
- public static Schema canonicalizeColumnNullability(Schema writeSchema,
Schema readSchema) {
- if (writeSchema.getFields().isEmpty() || readSchema.getFields().isEmpty())
{
- return writeSchema;
+ public static Schema reconcileNullability(Schema sourceSchema, Schema
targetSchema) {
+ if (sourceSchema.getFields().isEmpty() ||
targetSchema.getFields().isEmpty()) {
+ return sourceSchema;
}
- InternalSchema writeInternalSchema =
AvroInternalSchemaConverter.convert(writeSchema);
- InternalSchema readInternalSchema =
AvroInternalSchemaConverter.convert(readSchema);
- List<String> colNamesWriteSchema =
writeInternalSchema.getAllColsFullName();
- List<String> colNamesFromReadSchema =
readInternalSchema.getAllColsFullName();
- // try to deal with optional change. now when we use sparksql to update
hudi table,
- // sparksql Will change the col type from optional to required, this is a
bug.
- List<String> candidateUpdateCols = colNamesWriteSchema.stream().filter(f
-> {
- boolean exist = colNamesFromReadSchema.contains(f);
- if (exist && (writeInternalSchema.findField(f).isOptional() !=
readInternalSchema.findField(f).isOptional())) {
- return true;
- } else {
- return false;
- }
- }).collect(Collectors.toList());
+
+ InternalSchema sourceInternalSchema = convert(sourceSchema);
+ InternalSchema targetInternalSchema = convert(targetSchema);
+
+ List<String> colNamesSourceSchema =
sourceInternalSchema.getAllColsFullName();
+ List<String> colNamesTargetSchema =
targetInternalSchema.getAllColsFullName();
+ List<String> candidateUpdateCols = colNamesSourceSchema.stream()
+ .filter(f -> colNamesTargetSchema.contains(f)
+ && sourceInternalSchema.findField(f).isOptional() !=
targetInternalSchema.findField(f).isOptional())
+ .collect(Collectors.toList());
+
if (candidateUpdateCols.isEmpty()) {
- return writeSchema;
+ return sourceSchema;
+ }
+
+ // Reconcile nullability constraints (by executing phony schema change)
+ TableChanges.ColumnUpdateChange schemaChange =
+ reduce(candidateUpdateCols,
TableChanges.ColumnUpdateChange.get(sourceInternalSchema),
+ (change, field) -> change.updateColumnNullability(field, true));
+
+ return
convert(SchemaChangeUtils.applyTableChanges2Schema(sourceInternalSchema,
schemaChange), sourceSchema.getName());
+ }
+
+ /**
+ * Reconciles field names of {@code source} schema (produced in
case-insensitive context, like Spark),
+ * w/ the field names of the {@code target} schema such that {@code source}
schema could
+ * be used in case-sensitive context as well (like Avro)
+ *
+ * @param sourceSchema source schema that needs reconciliation
+ * @param targetSchema target schema that source schema will be reconciled
against
+ * @return schema (based off {@code source} one) that has overlapping
field-names in the same
+ * case as the target schema
+ */
+ public static Schema reconcileFieldNamesCasing(Schema sourceSchema, Schema
targetSchema) {
+ if (sourceSchema.getFields().isEmpty() ||
targetSchema.getFields().isEmpty()) {
+ return sourceSchema;
+ }
+
+ InternalSchema sourceInternalSchema = convert(sourceSchema);
+ InternalSchema targetInternalSchema = convert(targetSchema);
+
+ // Collect field's fully-qualified name pairs of the fields whose names
diverge only
+ // in its letters' casing
+ List<Pair<String, String>> reconciliationTargetColumnNamePairs =
+ resolveCaseMismatchingFieldNamePairs(sourceInternalSchema,
targetInternalSchema);
+
+ if (reconciliationTargetColumnNamePairs.isEmpty()) {
+ return sourceSchema;
}
- // try to correct all changes
- TableChanges.ColumnUpdateChange updateChange =
TableChanges.ColumnUpdateChange.get(writeInternalSchema);
- candidateUpdateCols.stream().forEach(f ->
updateChange.updateColumnNullability(f, true));
- InternalSchema updatedSchema =
SchemaChangeUtils.applyTableChanges2Schema(writeInternalSchema, updateChange);
- return AvroInternalSchemaConverter.convert(updatedSchema,
writeSchema.getFullName());
+
+ // Reconcile field names (by executing phony schema change)
+ TableChanges.ColumnUpdateChange schemaChange =
+ reduce(reconciliationTargetColumnNamePairs,
TableChanges.ColumnUpdateChange.get(sourceInternalSchema, true),
+ (change, sourceTargetPair) ->
change.renameColumn(sourceTargetPair.getLeft(), sourceTargetPair.getRight()));
+
+ return
convert(SchemaChangeUtils.applyTableChanges2Schema(sourceInternalSchema,
schemaChange), sourceSchema.getName());
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]