alexeykudinkin commented on code in PR #6361:
URL: https://github.com/apache/hudi/pull/6361#discussion_r1055671181
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -109,40 +116,129 @@ public static InternalSchema reconcileSchema(Schema
incomingSchema, InternalSche
}
/**
- * Canonical the nullability.
- * Do not allow change cols Nullability field from optional to required.
- * If above problem occurs, try to correct it.
+ * Reconciles nullability requirements b/w {@code source} and {@code target}
schemas,
+ * by adjusting these of the {@code source} schema to be in-line with the
ones of the
+ * {@code target} one
*
- * @param writeSchema writeSchema hoodie used to write data.
- * @param readSchema read schema
- * @return canonical Schema
+ * @param sourceSchema source schema that needs reconciliation
+ * @param targetSchema target schema that source schema will be reconciled
against
+ * @return schema (based off {@code source} one) that has nullability
constraints reconciled
*/
- public static Schema canonicalizeColumnNullability(Schema writeSchema,
Schema readSchema) {
- if (writeSchema.getFields().isEmpty() || readSchema.getFields().isEmpty())
{
- return writeSchema;
+ public static Schema reconcileNullability(Schema sourceSchema, Schema
targetSchema) {
+ if (sourceSchema.getFields().isEmpty() ||
targetSchema.getFields().isEmpty()) {
+ return sourceSchema;
}
- InternalSchema writeInternalSchema =
AvroInternalSchemaConverter.convert(writeSchema);
- InternalSchema readInternalSchema =
AvroInternalSchemaConverter.convert(readSchema);
- List<String> colNamesWriteSchema =
writeInternalSchema.getAllColsFullName();
- List<String> colNamesFromReadSchema =
readInternalSchema.getAllColsFullName();
- // try to deal with optional change. now when we use sparksql to update
hudi table,
- // sparksql Will change the col type from optional to required, this is a
bug.
- List<String> candidateUpdateCols = colNamesWriteSchema.stream().filter(f
-> {
- boolean exist = colNamesFromReadSchema.contains(f);
- if (exist && (writeInternalSchema.findField(f).isOptional() !=
readInternalSchema.findField(f).isOptional())) {
- return true;
- } else {
- return false;
- }
- }).collect(Collectors.toList());
+
+ InternalSchema sourceInternalSchema = convert(sourceSchema);
+ InternalSchema targetInternalSchema = convert(targetSchema);
+
+ List<String> colNamesSourceSchema =
sourceInternalSchema.getAllColsFullName();
+ List<String> colNamesTargetSchema =
targetInternalSchema.getAllColsFullName();
+ List<String> candidateUpdateCols = colNamesSourceSchema.stream()
+ .filter(f -> colNamesTargetSchema.contains(f)
+ && sourceInternalSchema.findField(f).isOptional() !=
targetInternalSchema.findField(f).isOptional())
+ .collect(Collectors.toList());
+
if (candidateUpdateCols.isEmpty()) {
- return writeSchema;
+ return sourceSchema;
+ }
+
+ // Reconcile nullability constraints (by executing phony schema change)
+ TableChanges.ColumnUpdateChange schemaChange =
+ reduce(candidateUpdateCols,
TableChanges.ColumnUpdateChange.get(sourceInternalSchema),
+ (change, field) -> change.updateColumnNullability(field, true));
+
+ return
convert(SchemaChangeUtils.applyTableChanges2Schema(sourceInternalSchema,
schemaChange), sourceSchema.getName());
+ }
+
+ /**
+ * Reconciles field names of {@code source} schema (produced in
case-insensitive context, like Spark),
+ * w/ the field names of the {@code target} schema such that {@code source}
schema could
+ * be used in case-sensitive context as well (like Avro)
+ *
+ * @param sourceSchema source schema that needs reconciliation
+ * @param targetSchema target schema that source schema will be reconciled
against
+ * @return schema (based off {@code source} one) that has overlapping
field-names in the same
+ * case as the target schema
+ */
+ public static Schema reconcileFieldNamesCasing(Schema sourceSchema, Schema
targetSchema) {
+ if (sourceSchema.getFields().isEmpty() ||
targetSchema.getFields().isEmpty()) {
+ return sourceSchema;
+ }
+
+ InternalSchema sourceInternalSchema = convert(sourceSchema);
+ InternalSchema targetInternalSchema = convert(targetSchema);
+
+ // Collect field's fully-qualified name pairs of the fields whose names
diverge only
+ // in its letters' casing
+ List<Pair<String, String>> reconciliationTargetColumnNamePairs =
+ resolveCaseMismatchingFieldNamePairs(sourceInternalSchema,
targetInternalSchema);
+
+ if (reconciliationTargetColumnNamePairs.isEmpty()) {
+ return sourceSchema;
}
- // try to correct all changes
- TableChanges.ColumnUpdateChange updateChange =
TableChanges.ColumnUpdateChange.get(writeInternalSchema);
- candidateUpdateCols.stream().forEach(f ->
updateChange.updateColumnNullability(f, true));
- InternalSchema updatedSchema =
SchemaChangeUtils.applyTableChanges2Schema(writeInternalSchema, updateChange);
- return AvroInternalSchemaConverter.convert(updatedSchema,
writeSchema.getFullName());
+
+ // Reconcile field names (by executing phony schema change)
+ TableChanges.ColumnUpdateChange schemaChange =
+ reduce(reconciliationTargetColumnNamePairs,
TableChanges.ColumnUpdateChange.get(sourceInternalSchema, true),
+ (change, sourceTargetPair) ->
change.renameColumn(sourceTargetPair.getLeft(), sourceTargetPair.getRight()));
+
+ return
convert(SchemaChangeUtils.applyTableChanges2Schema(sourceInternalSchema,
schemaChange), sourceSchema.getName());
+ }
+
+ /**
+ * Resolves fields in the {@code source} schema matching {@code target}
schema, even though
+ * their letter casing might be mismatched. Returns fully-qualified names
pairs for the fields from
+ * the {@code source} schema for which, there is correspondent field in the
{@code target} schema
+ * that only differs in its letters case.
+ *
+ * For example, provided two schemas {@code A} and {@code B}, such that:
+ * {@code struct A { fieldA: ... }}, while {@code struct B { FieldA: ... }},
this method will
+ * produce following list: {@code Map("fieldA" -> "FieldA")}
+ *
+ * NOTE: This method doesn't perform validation whether types of the
correspondent fields are
+ * conformant, and simply applies a case-insensitive resolution logic
+ */
+ public static Map<String, String>
resolveCaseMismatchingFieldNamePairs(Schema sourceSchema,
+
Schema targetSchema) {
+ if (sourceSchema.getFields().isEmpty() ||
targetSchema.getFields().isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ InternalSchema sourceInternalSchema = convert(sourceSchema);
+ InternalSchema targetInternalSchema = convert(targetSchema);
+
+ return resolveCaseMismatchingFieldNamePairs(sourceInternalSchema,
targetInternalSchema)
+ .stream()
+ .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+ }
+
+ private static List<Pair<String, String>>
resolveCaseMismatchingFieldNamePairs(InternalSchema sourceSchema,
+
InternalSchema targetSchema) {
+ List<String> colNamesSourceSchema = sourceSchema.getAllColsFullName();
+
+ // To reconcile field names of the source schema we enumerate all possible
fields (including
+ // nested ones) and produce a mapping convert them to lower-case to
correlate across
+ // source and target schemas
+ Map<String, String> colNamesMapTargetSchema =
+ targetSchema.getAllColsFullName().stream()
+ .collect(Collectors.toMap(String::toLowerCase,
Function.identity()));
+
+ return colNamesSourceSchema.stream()
+ .map(sourceFieldName -> {
+ // NOTE: Here we're only reconciling the fields that differ only in
casing of their
+ // corresponding names (therefore a) fields have to be present
in both schemas and
+ // b) they have to differ)
+ String lowerCaseSourceFieldName =
sourceFieldName.toLowerCase(Locale.ROOT);
Review Comment:
Good call. This code actually is remnant from the original PR before it was
split into this and #6358
I'm going to apply your suggestion and extract this code to a separate change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]