the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2251878817


##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +380,180 @@ public static Schema 
createNewSchemaFromFieldsWithReference(Schema schema, List<
     return newSchema;
   }
 
+  /**
+   * If schemas are projection equivalent, then a record with schema1 does not 
need to be projected to schema2
+   * because the projection will be the identity.
+   *
+   *  Two schemas are considered projection equivalent if the field names and 
types are equivalent.
+   *  The names of records, namespaces, or docs do not need to match. 
Nullability is ignored.
+   */
+  public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    if (schema1 == null || schema2 == null) {
+      return false;
+    }
+    return 
areSchemasProjectionEquivalentInternal(resolveNullableSchema(schema1), 
resolveNullableSchema(schema2));
+  }
+
+  @VisibleForTesting
+  static boolean areSchemasProjectionEquivalentInternal(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    switch (schema1.getType()) {
+      case RECORD:
+        if (schema2.getType() != Schema.Type.RECORD) {
+          return false;
+        }
+        List<Schema.Field> fields1 = schema1.getFields();
+        List<Schema.Field> fields2 = schema2.getFields();
+        if (fields1.size() != fields2.size()) {
+          return false;
+        }
+        for (int i = 0; i < fields1.size(); i++) {
+          if (!fields1.get(i).name().equalsIgnoreCase(fields2.get(i).name())) {
+            return false;
+          }
+          if (!areSchemasProjectionEquivalent(fields1.get(i).schema(), 
fields2.get(i).schema())) {
+            return false;
+          }
+        }
+        return true;
+
+      case ARRAY:
+        if (schema2.getType() != Schema.Type.ARRAY) {
+          return false;
+        }
+        return areSchemasProjectionEquivalent(schema1.getElementType(), 
schema2.getElementType());
+
+      case MAP:
+        if (schema2.getType() != Schema.Type.MAP) {
+          return false;
+        }
+        return areSchemasProjectionEquivalent(schema1.getValueType(), 
schema2.getValueType());
+      case UNION:
+        throw new IllegalArgumentException("Union schemas are not supported 
besides nullable");
+      default:
+        return areSchemaPrimitivesProjectionEquivalent(schema1, schema2);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean areSchemaPrimitivesProjectionEquivalent(Schema schema1, 
Schema schema2) {
+    if (!areLogicalTypesProjectionEquivalent(schema1.getLogicalType(), 
schema2.getLogicalType())) {
+      return false;
+    }
+    if (Objects.requireNonNull(schema1.getType()) == Schema.Type.FIXED) {
+      return schema2.getType() == Schema.Type.FIXED
+          && schema1.getFixedSize() == schema2.getFixedSize();
+    }
+    if (Objects.requireNonNull(schema1.getType()) == Schema.Type.ENUM) {
+      return schema2.getType() == Schema.Type.ENUM
+          && areEnumSymbolsProjectionEquivalent(schema1.getEnumSymbols(), 
schema2.getEnumSymbols());
+    }
+    return Objects.equals(schema1.getType(), schema2.getType());
+  }
+
+  private static boolean areEnumSymbolsProjectionEquivalent(List<String> 
enumSymbols1, List<String> enumSymbols2) {
+    Set<String> set1 = new HashSet<>(enumSymbols1);
+    Set<String> set2 = new HashSet<>(enumSymbols2);
+    return set2.containsAll(set1);
+  }
+
+  private static boolean areLogicalTypesProjectionEquivalent(LogicalType 
logicalType1, LogicalType logicalType2) {
+    if (Objects.equals(logicalType1, logicalType2)) {
+      return true;
+    }
+    if (logicalType1 == null || logicalType2 == null) {
+      return false;
+    }
+    if (logicalType1 instanceof LogicalTypes.Decimal && logicalType2 
instanceof LogicalTypes.Decimal) {
+      return ((LogicalTypes.Decimal) logicalType1).getScale() == 
((LogicalTypes.Decimal) logicalType2).getScale()
+          && ((LogicalTypes.Decimal) logicalType1).getPrecision() == 
((LogicalTypes.Decimal) logicalType2).getPrecision();
+    }
+    return false;
+  }
+
+  /**
+   * Prunes a data schema to match the structure of a required schema while 
preserving
+   * original metadata where possible.
+   *
+   * <p>This method recursively traverses both schemas and creates a new 
schema that:
+   * <ul>
+   *   <li>Contains only fields present in the required schema</li>
+   *   <li>Preserves field metadata (type, documentation, default values) from 
the data schema</li>
+   *   <li>Optionally includes fields from the required schema that are marked 
for exclusion</li>
+   * </ul>
+   *
+   * @param dataSchema the source schema containing the original data 
structure and metadata
+   * @param requiredSchema the target schema that defines the desired 
structure and field requirements
+   * @param excludeFields a set of field names that should be included from 
the required schema

Review Comment:
   The name makes it sound like you will exclude fields from the schema but you 
are including them? Maybe something like mandatory fields is better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to