the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2252697499


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java:
##########
@@ -70,21 +76,153 @@ public class HiveAvroSerializer {
 
   private final List<String> columnNames;
   private final List<TypeInfo> columnTypes;
-  private final ObjectInspector objectInspector;
+  private final ArrayWritableObjectInspector objectInspector;
+  private final Schema recordSchema;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveAvroSerializer.class);
 
-  public HiveAvroSerializer(ObjectInspector objectInspector, List<String> 
columnNames, List<TypeInfo> columnTypes) {
+  public HiveAvroSerializer(Schema schema) {
+    this.recordSchema = schema;
+    this.columnNames = 
schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
+    try {
+      this.columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+    } catch (AvroSerdeException e) {
+      throw new HoodieAvroSchemaException(String.format("Failed to generate 
hive column types from avro schema: %s, due to %s", schema, e));
+    }
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(this.columnNames, this.columnTypes);
+    this.objectInspector = new ArrayWritableObjectInspector(rowTypeInfo);
+  }
+
+  public HiveAvroSerializer(ArrayWritableObjectInspector objectInspector, 
List<String> columnNames, List<TypeInfo> columnTypes) {
     this.columnNames = columnNames;
     this.columnTypes = columnTypes;
     this.objectInspector = objectInspector;
+    this.recordSchema = null;
+  }
+
+  public Object getValue(ArrayWritable record, String fieldName) {
+    if (StringUtils.isNullOrEmpty(fieldName)) {
+      return null;
+    }
+    Object currentObject = record;
+    ObjectInspector currentOI = this.objectInspector;

Review Comment:
   Nit: spell out `ObjectInspector` instead of `OI`. When I read it quickly I 
think it is `IO` 😆 



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +380,180 @@ public static Schema 
createNewSchemaFromFieldsWithReference(Schema schema, List<
     return newSchema;
   }
 
+  /**
+   * If schemas are projection equivalent, then a record with schema1 does not 
need to be projected to schema2
+   * because the projection will be the identity.
+   *
+   *  Two schemas are considered projection equivalent if the field names and 
types are equivalent.
+   *  The names of records, namespaces, or docs do not need to match. 
Nullability is ignored.
+   */
+  public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    if (schema1 == null || schema2 == null) {
+      return false;
+    }
+    return 
areSchemasProjectionEquivalentInternal(resolveNullableSchema(schema1), 
resolveNullableSchema(schema2));
+  }
+
+  @VisibleForTesting
+  static boolean areSchemasProjectionEquivalentInternal(Schema schema1, Schema 
schema2) {
+    if (Objects.equals(schema1, schema2)) {
+      return true;
+    }
+    switch (schema1.getType()) {
+      case RECORD:
+        if (schema2.getType() != Schema.Type.RECORD) {
+          return false;
+        }
+        List<Schema.Field> fields1 = schema1.getFields();
+        List<Schema.Field> fields2 = schema2.getFields();
+        if (fields1.size() != fields2.size()) {
+          return false;
+        }
+        for (int i = 0; i < fields1.size(); i++) {
+          if (!fields1.get(i).name().equalsIgnoreCase(fields2.get(i).name())) {
+            return false;
+          }
+          if (!areSchemasProjectionEquivalent(fields1.get(i).schema(), 
fields2.get(i).schema())) {
+            return false;
+          }
+        }
+        return true;
+
+      case ARRAY:
+        if (schema2.getType() != Schema.Type.ARRAY) {
+          return false;
+        }
+        return areSchemasProjectionEquivalent(schema1.getElementType(), 
schema2.getElementType());
+
+      case MAP:
+        if (schema2.getType() != Schema.Type.MAP) {
+          return false;
+        }
+        return areSchemasProjectionEquivalent(schema1.getValueType(), 
schema2.getValueType());
+      case UNION:
+        throw new IllegalArgumentException("Union schemas are not supported 
besides nullable");
+      default:
+        return areSchemaPrimitivesProjectionEquivalent(schema1, schema2);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean areSchemaPrimitivesProjectionEquivalent(Schema schema1, 
Schema schema2) {
+    if (!areLogicalTypesProjectionEquivalent(schema1.getLogicalType(), 
schema2.getLogicalType())) {
+      return false;
+    }
+    if (Objects.requireNonNull(schema1.getType()) == Schema.Type.FIXED) {
+      return schema2.getType() == Schema.Type.FIXED
+          && schema1.getFixedSize() == schema2.getFixedSize();
+    }
+    if (Objects.requireNonNull(schema1.getType()) == Schema.Type.ENUM) {
+      return schema2.getType() == Schema.Type.ENUM
+          && areEnumSymbolsProjectionEquivalent(schema1.getEnumSymbols(), 
schema2.getEnumSymbols());
+    }
+    return Objects.equals(schema1.getType(), schema2.getType());
+  }
+
+  private static boolean areEnumSymbolsProjectionEquivalent(List<String> 
enumSymbols1, List<String> enumSymbols2) {
+    Set<String> set1 = new HashSet<>(enumSymbols1);
+    Set<String> set2 = new HashSet<>(enumSymbols2);
+    return set2.containsAll(set1);
+  }
+
+  private static boolean areLogicalTypesProjectionEquivalent(LogicalType 
logicalType1, LogicalType logicalType2) {
+    if (Objects.equals(logicalType1, logicalType2)) {
+      return true;
+    }
+    if (logicalType1 == null || logicalType2 == null) {
+      return false;
+    }
+    if (logicalType1 instanceof LogicalTypes.Decimal && logicalType2 
instanceof LogicalTypes.Decimal) {
+      return ((LogicalTypes.Decimal) logicalType1).getScale() == 
((LogicalTypes.Decimal) logicalType2).getScale()
+          && ((LogicalTypes.Decimal) logicalType1).getPrecision() == 
((LogicalTypes.Decimal) logicalType2).getPrecision();
+    }
+    return false;
+  }
+
+  /**
+   * Prunes a data schema to match the structure of a required schema while 
preserving
+   * original metadata where possible.
+   *
+   * <p>This method recursively traverses both schemas and creates a new 
schema that:
+   * <ul>
+   *   <li>Contains only fields present in the required schema</li>
+   *   <li>Preserves field metadata (type, documentation, default values) from 
the data schema</li>
+   *   <li>Optionally includes fields from the required schema that are marked 
for exclusion</li>
+   * </ul>
+   *
+   * @param dataSchema the source schema containing the original data 
structure and metadata
+   * @param requiredSchema the target schema that defines the desired 
structure and field requirements
+   * @param mandatoryFields a set of top level field names that should be 
included from the required schema
+   *                     even if they don't exist in the data schema. This 
allows for fields like cdc operation
+   *                     don't exist in the data schema
+   *
+   * @return a new pruned schema that matches the required schema structure 
while preserving
+   *         data schema metadata where possible
+   */
+  public static Schema pruneDataSchema(Schema dataSchema, Schema 
requiredSchema, Set<String> mandatoryFields) {
+    Schema prunedDataSchema = 
pruneDataSchemaInternal(resolveNullableSchema(dataSchema), 
resolveNullableSchema(requiredSchema), mandatoryFields);
+    if (dataSchema.isNullable() && !prunedDataSchema.isNullable()) {
+      return createNullableSchema(prunedDataSchema);
+    }
+    return prunedDataSchema;
+  }
+
+  private static Schema pruneDataSchemaInternal(Schema dataSchema, Schema 
requiredSchema, Set<String> MandatoryFields) {

Review Comment:
   nit: make `Mandatory` lowercase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to