the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2251878817
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +380,180 @@ public static Schema
createNewSchemaFromFieldsWithReference(Schema schema, List<
return newSchema;
}
+ /**
+ * If schemas are projection equivalent, then a record with schema1 does not
need to be projected to schema2
+ * because the projection will be the identity.
+ *
+ * Two schemas are considered projection equivalent if the field names and
types are equivalent.
+ * The names of records, namespaces, or docs do not need to match.
Nullability is ignored.
+ */
+ public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema
schema2) {
+ if (Objects.equals(schema1, schema2)) {
+ return true;
+ }
+ if (schema1 == null || schema2 == null) {
+ return false;
+ }
+ return
areSchemasProjectionEquivalentInternal(resolveNullableSchema(schema1),
resolveNullableSchema(schema2));
+ }
+
+ @VisibleForTesting
+ static boolean areSchemasProjectionEquivalentInternal(Schema schema1, Schema
schema2) {
+ if (Objects.equals(schema1, schema2)) {
+ return true;
+ }
+ switch (schema1.getType()) {
+ case RECORD:
+ if (schema2.getType() != Schema.Type.RECORD) {
+ return false;
+ }
+ List<Schema.Field> fields1 = schema1.getFields();
+ List<Schema.Field> fields2 = schema2.getFields();
+ if (fields1.size() != fields2.size()) {
+ return false;
+ }
+ for (int i = 0; i < fields1.size(); i++) {
+ if (!fields1.get(i).name().equalsIgnoreCase(fields2.get(i).name())) {
+ return false;
+ }
+ if (!areSchemasProjectionEquivalent(fields1.get(i).schema(),
fields2.get(i).schema())) {
+ return false;
+ }
+ }
+ return true;
+
+ case ARRAY:
+ if (schema2.getType() != Schema.Type.ARRAY) {
+ return false;
+ }
+ return areSchemasProjectionEquivalent(schema1.getElementType(),
schema2.getElementType());
+
+ case MAP:
+ if (schema2.getType() != Schema.Type.MAP) {
+ return false;
+ }
+ return areSchemasProjectionEquivalent(schema1.getValueType(),
schema2.getValueType());
+ case UNION:
+ throw new IllegalArgumentException("Union schemas are not supported
besides nullable");
+ default:
+ return areSchemaPrimitivesProjectionEquivalent(schema1, schema2);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean areSchemaPrimitivesProjectionEquivalent(Schema schema1,
Schema schema2) {
+ if (!areLogicalTypesProjectionEquivalent(schema1.getLogicalType(),
schema2.getLogicalType())) {
+ return false;
+ }
+ if (Objects.requireNonNull(schema1.getType()) == Schema.Type.FIXED) {
+ return schema2.getType() == Schema.Type.FIXED
+ && schema1.getFixedSize() == schema2.getFixedSize();
+ }
+ if (Objects.requireNonNull(schema1.getType()) == Schema.Type.ENUM) {
+ return schema2.getType() == Schema.Type.ENUM
+ && areEnumSymbolsProjectionEquivalent(schema1.getEnumSymbols(),
schema2.getEnumSymbols());
+ }
+ return Objects.equals(schema1.getType(), schema2.getType());
+ }
+
+ private static boolean areEnumSymbolsProjectionEquivalent(List<String>
enumSymbols1, List<String> enumSymbols2) {
+ Set<String> set1 = new HashSet<>(enumSymbols1);
+ Set<String> set2 = new HashSet<>(enumSymbols2);
+ return set2.containsAll(set1);
+ }
+
+ private static boolean areLogicalTypesProjectionEquivalent(LogicalType
logicalType1, LogicalType logicalType2) {
+ if (Objects.equals(logicalType1, logicalType2)) {
+ return true;
+ }
+ if (logicalType1 == null || logicalType2 == null) {
+ return false;
+ }
+ if (logicalType1 instanceof LogicalTypes.Decimal && logicalType2
instanceof LogicalTypes.Decimal) {
+ return ((LogicalTypes.Decimal) logicalType1).getScale() ==
((LogicalTypes.Decimal) logicalType2).getScale()
+ && ((LogicalTypes.Decimal) logicalType1).getPrecision() ==
((LogicalTypes.Decimal) logicalType2).getPrecision();
+ }
+ return false;
+ }
+
+ /**
+ * Prunes a data schema to match the structure of a required schema while
preserving
+ * original metadata where possible.
+ *
+ * <p>This method recursively traverses both schemas and creates a new
schema that:
+ * <ul>
+ * <li>Contains only fields present in the required schema</li>
+ * <li>Preserves field metadata (type, documentation, default values) from
the data schema</li>
+ * <li>Optionally includes fields from the required schema that are marked
for exclusion</li>
+ * </ul>
+ *
+ * @param dataSchema the source schema containing the original data
structure and metadata
+ * @param requiredSchema the target schema that defines the desired
structure and field requirements
+ * @param excludeFields a set of field names that should be included from
the required schema
Review Comment:
The name makes it sound like you will exclude fields from the schema but you
are including them? Maybe something like mandatory fields is better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]