the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2252697499
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java:
##########
@@ -70,21 +76,153 @@ public class HiveAvroSerializer {
private final List<String> columnNames;
private final List<TypeInfo> columnTypes;
- private final ObjectInspector objectInspector;
+ private final ArrayWritableObjectInspector objectInspector;
+ private final Schema recordSchema;
private static final Logger LOG =
LoggerFactory.getLogger(HiveAvroSerializer.class);
- public HiveAvroSerializer(ObjectInspector objectInspector, List<String>
columnNames, List<TypeInfo> columnTypes) {
+ public HiveAvroSerializer(Schema schema) {
+ this.recordSchema = schema;
+ this.columnNames =
schema.getFields().stream().map(Schema.Field::name).map(String::toLowerCase).collect(Collectors.toList());
+ try {
+ this.columnTypes = HiveTypeUtils.generateColumnTypes(schema);
+ } catch (AvroSerdeException e) {
+ throw new HoodieAvroSchemaException(String.format("Failed to generate
hive column types from avro schema: %s, due to %s", schema, e));
+ }
+ StructTypeInfo rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(this.columnNames, this.columnTypes);
+ this.objectInspector = new ArrayWritableObjectInspector(rowTypeInfo);
+ }
+
+ public HiveAvroSerializer(ArrayWritableObjectInspector objectInspector,
List<String> columnNames, List<TypeInfo> columnTypes) {
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.objectInspector = objectInspector;
+ this.recordSchema = null;
+ }
+
+ public Object getValue(ArrayWritable record, String fieldName) {
+ if (StringUtils.isNullOrEmpty(fieldName)) {
+ return null;
+ }
+ Object currentObject = record;
+ ObjectInspector currentOI = this.objectInspector;
Review Comment:
Nit: spell out `ObjectInspector` instead of `OI`. When I read it quickly I
think it is `IO` 😆
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +380,180 @@ public static Schema
createNewSchemaFromFieldsWithReference(Schema schema, List<
return newSchema;
}
+ /**
+ * If schemas are projection equivalent, then a record with schema1 does not
need to be projected to schema2
+ * because the projection will be the identity.
+ *
+ * Two schemas are considered projection equivalent if the field names and
types are equivalent.
+ * The names of records, namespaces, or docs do not need to match.
Nullability is ignored.
+ */
+ public static boolean areSchemasProjectionEquivalent(Schema schema1, Schema
schema2) {
+ if (Objects.equals(schema1, schema2)) {
+ return true;
+ }
+ if (schema1 == null || schema2 == null) {
+ return false;
+ }
+ return
areSchemasProjectionEquivalentInternal(resolveNullableSchema(schema1),
resolveNullableSchema(schema2));
+ }
+
+ @VisibleForTesting
+ static boolean areSchemasProjectionEquivalentInternal(Schema schema1, Schema
schema2) {
+ if (Objects.equals(schema1, schema2)) {
+ return true;
+ }
+ switch (schema1.getType()) {
+ case RECORD:
+ if (schema2.getType() != Schema.Type.RECORD) {
+ return false;
+ }
+ List<Schema.Field> fields1 = schema1.getFields();
+ List<Schema.Field> fields2 = schema2.getFields();
+ if (fields1.size() != fields2.size()) {
+ return false;
+ }
+ for (int i = 0; i < fields1.size(); i++) {
+ if (!fields1.get(i).name().equalsIgnoreCase(fields2.get(i).name())) {
+ return false;
+ }
+ if (!areSchemasProjectionEquivalent(fields1.get(i).schema(),
fields2.get(i).schema())) {
+ return false;
+ }
+ }
+ return true;
+
+ case ARRAY:
+ if (schema2.getType() != Schema.Type.ARRAY) {
+ return false;
+ }
+ return areSchemasProjectionEquivalent(schema1.getElementType(),
schema2.getElementType());
+
+ case MAP:
+ if (schema2.getType() != Schema.Type.MAP) {
+ return false;
+ }
+ return areSchemasProjectionEquivalent(schema1.getValueType(),
schema2.getValueType());
+ case UNION:
+ throw new IllegalArgumentException("Union schemas are not supported
besides nullable");
+ default:
+ return areSchemaPrimitivesProjectionEquivalent(schema1, schema2);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean areSchemaPrimitivesProjectionEquivalent(Schema schema1,
Schema schema2) {
+ if (!areLogicalTypesProjectionEquivalent(schema1.getLogicalType(),
schema2.getLogicalType())) {
+ return false;
+ }
+ if (Objects.requireNonNull(schema1.getType()) == Schema.Type.FIXED) {
+ return schema2.getType() == Schema.Type.FIXED
+ && schema1.getFixedSize() == schema2.getFixedSize();
+ }
+ if (Objects.requireNonNull(schema1.getType()) == Schema.Type.ENUM) {
+ return schema2.getType() == Schema.Type.ENUM
+ && areEnumSymbolsProjectionEquivalent(schema1.getEnumSymbols(),
schema2.getEnumSymbols());
+ }
+ return Objects.equals(schema1.getType(), schema2.getType());
+ }
+
+ private static boolean areEnumSymbolsProjectionEquivalent(List<String>
enumSymbols1, List<String> enumSymbols2) {
+ Set<String> set1 = new HashSet<>(enumSymbols1);
+ Set<String> set2 = new HashSet<>(enumSymbols2);
+ return set2.containsAll(set1);
+ }
+
+ private static boolean areLogicalTypesProjectionEquivalent(LogicalType
logicalType1, LogicalType logicalType2) {
+ if (Objects.equals(logicalType1, logicalType2)) {
+ return true;
+ }
+ if (logicalType1 == null || logicalType2 == null) {
+ return false;
+ }
+ if (logicalType1 instanceof LogicalTypes.Decimal && logicalType2
instanceof LogicalTypes.Decimal) {
+ return ((LogicalTypes.Decimal) logicalType1).getScale() ==
((LogicalTypes.Decimal) logicalType2).getScale()
+ && ((LogicalTypes.Decimal) logicalType1).getPrecision() ==
((LogicalTypes.Decimal) logicalType2).getPrecision();
+ }
+ return false;
+ }
+
+ /**
+ * Prunes a data schema to match the structure of a required schema while
preserving
+ * original metadata where possible.
+ *
+ * <p>This method recursively traverses both schemas and creates a new
schema that:
+ * <ul>
+ * <li>Contains only fields present in the required schema</li>
+ * <li>Preserves field metadata (type, documentation, default values) from
the data schema</li>
+ * <li>Optionally includes fields from the required schema that are marked
for exclusion</li>
+ * </ul>
+ *
+ * @param dataSchema the source schema containing the original data
structure and metadata
+ * @param requiredSchema the target schema that defines the desired
structure and field requirements
+ * @param mandatoryFields a set of top level field names that should be
included from the required schema
+ * even if they don't exist in the data schema. This
allows for fields like cdc operation
+ * don't exist in the data schema
+ *
+ * @return a new pruned schema that matches the required schema structure
while preserving
+ * data schema metadata where possible
+ */
+ public static Schema pruneDataSchema(Schema dataSchema, Schema
requiredSchema, Set<String> mandatoryFields) {
+ Schema prunedDataSchema =
pruneDataSchemaInternal(resolveNullableSchema(dataSchema),
resolveNullableSchema(requiredSchema), mandatoryFields);
+ if (dataSchema.isNullable() && !prunedDataSchema.isNullable()) {
+ return createNullableSchema(prunedDataSchema);
+ }
+ return prunedDataSchema;
+ }
+
+ private static Schema pruneDataSchemaInternal(Schema dataSchema, Schema
requiredSchema, Set<String> MandatoryFields) {
Review Comment:
nit: make `Mandatory` lowercase
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]