the-other-tim-brown commented on code in PR #17763:
URL: https://github.com/apache/hudi/pull/17763#discussion_r2655529568


##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -116,96 +118,112 @@ public AvroSchemaConverterWithTimestampNTZ(Configuration 
conf) {
     this.pathsToInt96 = new 
HashSet<>(Arrays.asList(conf.getStrings("parquet.avro.writeFixedAsInt96", new 
String[0])));
   }
 
-  /**
-   * Given a schema, check to see if it is a union of a null type and a 
regular schema,
-   * and then return the non-null sub-schema. Otherwise, return the given 
schema.
-   *
-   * @param schema The schema to check
-   * @return The non-null portion of a union schema, or the given schema
-   */
-  public static Schema getNonNull(Schema schema) {
-    if (schema.getType().equals(Schema.Type.UNION)) {
-      List<Schema> schemas = schema.getTypes();
-      if (schemas.size() == 2) {
-        if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
-          return schemas.get(1);
-        } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
-          return schemas.get(0);
-        } else {
-          return schema;
-        }
-      } else {
-        return schema;
-      }
-    } else {
-      return schema;
-    }
-  }
-
   @Override
-  public MessageType convert(Schema avroSchema) {
-    if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
-      throw new IllegalArgumentException("Avro schema must be a record.");
+  public MessageType convert(HoodieSchema schema) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      throw new IllegalArgumentException("Hoodie schema must be a record.");
     }
-    return new MessageType(avroSchema.getFullName(), 
convertFields(avroSchema.getFields(), ""));
+    return new MessageType(schema.getFullName(), 
convertFields(schema.getFields(), ""));
   }
 
-  private List<Type> convertFields(List<Schema.Field> fields, String 
schemaPath) {
+  private List<Type> convertFields(List<HoodieSchemaField> fields, String 
schemaPath) {
     List<Type> types = new ArrayList<Type>();

Review Comment:
   Nit: size this array to the incoming fields size



##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -267,121 +271,123 @@ private Type convertUnion(String fieldName, Schema 
schema, Type.Repetition repet
     }
   }
 
-  private Type convertUnionToGroupType(String fieldName, Type.Repetition 
repetition, List<Schema> nonNullSchemas,
+  private Type convertUnionToGroupType(String fieldName, Type.Repetition 
repetition, List<HoodieSchema> nonNullSchemas,
                                        String schemaPath) {
     List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
     int index = 0;
-    for (Schema childSchema : nonNullSchemas) {
+    for (HoodieSchema childSchema : nonNullSchemas) {
       unionTypes.add( convertField("member" + index++, childSchema, 
Type.Repetition.OPTIONAL, schemaPath));
     }
     return new GroupType(repetition, fieldName, unionTypes);
   }
 
-  private Type convertField(Schema.Field field, String schemaPath) {
+  private Type convertField(HoodieSchemaField field, String schemaPath) {
     return convertField(field.name(), field.schema(), schemaPath);
   }
 
   @Override
-  public Schema convert(MessageType parquetSchema) {
+  public HoodieSchema convert(MessageType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields(), 
new HashMap<>());
   }
 
-  Schema convert(GroupType parquetSchema) {
+  HoodieSchema convert(GroupType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields(), 
new HashMap<>());
   }
 
-  private Schema convertFields(String name, List<Type> parquetFields, 
Map<String, Integer> names) {
-    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+  private HoodieSchema convertFields(String name, List<Type> parquetFields, 
Map<String, Integer> names) {
+    List<HoodieSchemaField> fields = new ArrayList<>();

Review Comment:
   Similarly, size this list to the incoming list size



##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -116,96 +118,112 @@ public AvroSchemaConverterWithTimestampNTZ(Configuration 
conf) {
     this.pathsToInt96 = new 
HashSet<>(Arrays.asList(conf.getStrings("parquet.avro.writeFixedAsInt96", new 
String[0])));
   }
 
-  /**
-   * Given a schema, check to see if it is a union of a null type and a 
regular schema,
-   * and then return the non-null sub-schema. Otherwise, return the given 
schema.
-   *
-   * @param schema The schema to check
-   * @return The non-null portion of a union schema, or the given schema
-   */
-  public static Schema getNonNull(Schema schema) {
-    if (schema.getType().equals(Schema.Type.UNION)) {
-      List<Schema> schemas = schema.getTypes();
-      if (schemas.size() == 2) {
-        if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
-          return schemas.get(1);
-        } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
-          return schemas.get(0);
-        } else {
-          return schema;
-        }
-      } else {
-        return schema;
-      }
-    } else {
-      return schema;
-    }
-  }
-
   @Override
-  public MessageType convert(Schema avroSchema) {
-    if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
-      throw new IllegalArgumentException("Avro schema must be a record.");
+  public MessageType convert(HoodieSchema schema) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      throw new IllegalArgumentException("Hoodie schema must be a record.");
     }
-    return new MessageType(avroSchema.getFullName(), 
convertFields(avroSchema.getFields(), ""));
+    return new MessageType(schema.getFullName(), 
convertFields(schema.getFields(), ""));
   }
 
-  private List<Type> convertFields(List<Schema.Field> fields, String 
schemaPath) {
+  private List<Type> convertFields(List<HoodieSchemaField> fields, String 
schemaPath) {
     List<Type> types = new ArrayList<Type>();
-    for (Schema.Field field : fields) {
-      if (field.schema().getType().equals(Schema.Type.NULL)) {
+    for (HoodieSchemaField field : fields) {
+      if (field.schema().getType() == HoodieSchemaType.NULL) {
         continue; // Avro nulls are not encoded, unless they are null unions

Review Comment:
   nit: update comment to remove `Avro`



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java:
##########
@@ -181,21 +181,21 @@ private static StorageConfiguration<?> 
tryOverrideDefaultConfigs(StorageConfigur
     return conf;
   }
 
-  private ClosableIterator<IndexedRecord> 
getIndexedRecordIteratorInternal(Schema schema, Map<String, String> 
renamedColumns) throws IOException {
+  private ClosableIterator<IndexedRecord> 
getIndexedRecordIteratorInternal(HoodieSchema schema, Map<String, String> 
renamedColumns) throws IOException {
     // NOTE: We have to set both Avro read-schema and projection schema to make
     //       sure that in case the file-schema is not equal to read-schema 
we'd still
     //       be able to read that file (in case projection is a proper one)
     Configuration hadoopConf = 
storage.getConf().unwrapCopyAs(Configuration.class);
     //TODO boundary for now to revisit in later pr to use HoodieSchema
-    Schema repairedFileSchema = 
AvroSchemaRepair.repairLogicalTypes(getSchema().getAvroSchema(), schema);
+    Schema repairedFileSchema = 
AvroSchemaRepair.repairLogicalTypes(getSchema().toAvroSchema(), 
schema.toAvroSchema());
     Option<Schema> promotedSchema = Option.empty();
-    if (!renamedColumns.isEmpty() || 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema,
 schema)) {
+    if (!renamedColumns.isEmpty() || 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema,
 schema.toAvroSchema())) {

Review Comment:
   Will `recordNeedsRewriteForExtendedAvroTypePromotion` be handled in another 
PR?



##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -267,121 +271,123 @@ private Type convertUnion(String fieldName, Schema 
schema, Type.Repetition repet
     }
   }
 
-  private Type convertUnionToGroupType(String fieldName, Type.Repetition 
repetition, List<Schema> nonNullSchemas,
+  private Type convertUnionToGroupType(String fieldName, Type.Repetition 
repetition, List<HoodieSchema> nonNullSchemas,
                                        String schemaPath) {
     List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
     int index = 0;
-    for (Schema childSchema : nonNullSchemas) {
+    for (HoodieSchema childSchema : nonNullSchemas) {
       unionTypes.add( convertField("member" + index++, childSchema, 
Type.Repetition.OPTIONAL, schemaPath));
     }
     return new GroupType(repetition, fieldName, unionTypes);
   }
 
-  private Type convertField(Schema.Field field, String schemaPath) {
+  private Type convertField(HoodieSchemaField field, String schemaPath) {
     return convertField(field.name(), field.schema(), schemaPath);
   }
 
   @Override
-  public Schema convert(MessageType parquetSchema) {
+  public HoodieSchema convert(MessageType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields(), 
new HashMap<>());
   }
 
-  Schema convert(GroupType parquetSchema) {
+  HoodieSchema convert(GroupType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields(), 
new HashMap<>());
   }
 
-  private Schema convertFields(String name, List<Type> parquetFields, 
Map<String, Integer> names) {
-    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+  private HoodieSchema convertFields(String name, List<Type> parquetFields, 
Map<String, Integer> names) {
+    List<HoodieSchemaField> fields = new ArrayList<>();
     Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 
1);
     for (Type parquetType : parquetFields) {
-      Schema fieldSchema = convertField(parquetType, names);
+      HoodieSchema fieldSchema = convertField(parquetType, names);
       if (parquetType.isRepetition(REPEATED)) {
         throw new UnsupportedOperationException("REPEATED not supported 
outside LIST or MAP. Type: " + parquetType);
       } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
-        fields.add(new Schema.Field(
+        fields.add(HoodieSchemaField.of(
             parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
       } else { // REQUIRED
-        fields.add(new Schema.Field(
+        fields.add(HoodieSchemaField.of(
             parquetType.getName(), fieldSchema, null, (Object) null));
       }
     }
-    Schema schema = Schema.createRecord(name, null, nameCount > 1 ? name + 
nameCount : null, false);
-    schema.setFields(fields);
+    HoodieSchema schema = HoodieSchema.createRecord(name, null, nameCount > 1 
? name + nameCount : null, false, fields);
     return schema;
   }
 
-  private Schema convertField(final Type parquetType, Map<String, Integer> 
names) {
+  private HoodieSchema convertField(final Type parquetType, Map<String, 
Integer> names) {
     if (parquetType.isPrimitive()) {
       final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
       final PrimitiveTypeName parquetPrimitiveTypeName =
           asPrimitive.getPrimitiveTypeName();
       final LogicalTypeAnnotation annotation = 
parquetType.getLogicalTypeAnnotation();
-      Schema schema = parquetPrimitiveTypeName.convert(
-          new PrimitiveType.PrimitiveTypeNameConverter<Schema, 
RuntimeException>() {
+
+      // Handle logical type annotations directly with HoodieSchema creation 
methods
+      if (annotation != null) {
+        HoodieSchema logicalSchema = 
convertLogicalTypeAnnotationToHoodieSchema(annotation, parquetType);
+        if (logicalSchema != null) {
+          return logicalSchema;
+        }
+      }
+
+      // Fallback to basic type conversion if no logical type annotation
+      HoodieSchema schema = parquetPrimitiveTypeName.convert(
+          new PrimitiveType.PrimitiveTypeNameConverter<HoodieSchema, 
RuntimeException>() {
             @Override
-            public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
-              return Schema.create(Schema.Type.BOOLEAN);
+            public HoodieSchema convertBOOLEAN(PrimitiveTypeName 
primitiveTypeName) {
+              return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
             }
             @Override
-            public Schema convertINT32(PrimitiveTypeName primitiveTypeName) {
-              return Schema.create(Schema.Type.INT);
+            public HoodieSchema convertINT32(PrimitiveTypeName 
primitiveTypeName) {
+              return HoodieSchema.create(HoodieSchemaType.INT);
             }
             @Override
-            public Schema convertINT64(PrimitiveTypeName primitiveTypeName) {
-              return Schema.create(Schema.Type.LONG);
+            public HoodieSchema convertINT64(PrimitiveTypeName 
primitiveTypeName) {
+              return HoodieSchema.create(HoodieSchemaType.LONG);
             }
             @Override
-            public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
+            public HoodieSchema convertINT96(PrimitiveTypeName 
primitiveTypeName) {
               if (readInt96AsFixed) {
-                return Schema.createFixed("INT96", "INT96 represented as 
byte[12]", null, 12);
+                return HoodieSchema.createFixed("INT96", "INT96 represented as 
byte[12]", null, 12);
               }
               throw new IllegalArgumentException(
                   "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED 
flag to read as byte array.");
             }
             @Override
-            public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
-              return Schema.create(Schema.Type.FLOAT);
+            public HoodieSchema convertFLOAT(PrimitiveTypeName 
primitiveTypeName) {
+              return HoodieSchema.create(HoodieSchemaType.FLOAT);
             }
             @Override
-            public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
-              return Schema.create(Schema.Type.DOUBLE);
+            public HoodieSchema convertDOUBLE(PrimitiveTypeName 
primitiveTypeName) {
+              return HoodieSchema.create(HoodieSchemaType.DOUBLE);
             }
             @Override
-            public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName 
primitiveTypeName) {
+            public HoodieSchema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName 
primitiveTypeName) {
               if (annotation instanceof 
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
-                return Schema.create(Schema.Type.STRING);
+                return HoodieSchema.createUUID();
               } else {
                 int size = parquetType.asPrimitiveType().getTypeLength();
-                return Schema.createFixed(parquetType.getName(), null, null, 
size);
+                return HoodieSchema.createFixed(parquetType.getName(), null, 
null, size);
               }
             }
             @Override
-            public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
+            public HoodieSchema convertBINARY(PrimitiveTypeName 
primitiveTypeName) {
               if (annotation instanceof 
LogicalTypeAnnotation.StringLogicalTypeAnnotation ||
                   annotation instanceof  
LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
-                return Schema.create(Schema.Type.STRING);
+                return HoodieSchema.create(HoodieSchemaType.STRING);
               } else {
-                return Schema.create(Schema.Type.BYTES);
+                return HoodieSchema.create(HoodieSchemaType.BYTES);
               }
             }
           });
 
-      LogicalType logicalType = convertLogicalType(annotation);
-      if (logicalType != null && (!(annotation instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) ||
-          parquetPrimitiveTypeName == BINARY ||
-          parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
-        schema = logicalType.addToSchema(schema);
-      }
-
       return schema;
 
     } else {
       GroupType parquetGroupType = parquetType.asGroupType();
       LogicalTypeAnnotation logicalTypeAnnotation = 
parquetGroupType.getLogicalTypeAnnotation();
       if (logicalTypeAnnotation != null) {
-        return logicalTypeAnnotation.accept(new 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Schema>() {
+        return logicalTypeAnnotation.accept(new 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<HoodieSchema>() {
           @Override
-          public java.util.Optional<Schema> 
visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          public java.util.Optional<HoodieSchema> 
visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {

Review Comment:
   While we're here, let's import `Optional` to shorten the line lengths in 
this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to