the-other-tim-brown commented on code in PR #17535:
URL: https://github.com/apache/hudi/pull/17535#discussion_r2608866464


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -1015,11 +1016,13 @@ static void validateSecondaryIndexSchemaEvolution(
       }
       
       // Use AvroSchemaCompatibility's field lookup logic to handle aliases
-      Schema.Field writerField = 
AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
+      HoodieSchemaField writerField = 
HoodieSchemaCompatibility.lookupWriterField(writerSchema, tableField);
       
       if (writerField != null && 
!tableField.schema().equals(writerField.schema())) {
         // Check if this is just making the field nullable/non-nullable, which 
is safe from SI perspective
-        if 
(getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema())))
 {
+        HoodieSchema nonNullTableField = 
HoodieSchemaUtils.getNonNullTypeFromUnion(tableField.schema());

Review Comment:
   Let's use the `getNonNullType` method on the `HoodieSchema` when possible



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -191,4 +215,42 @@ public static boolean 
areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
     }
     return 
AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(), 
schema2.toAvroSchema());
   }
+
+  /**
+   * Identifies the writer field that corresponds to the specified reader 
field.
+   * This function is adapted from AvroSchemaCompatibility#lookupWriterField
+   *
+   * <p>
+   * Matching includes reader name aliases.
+   * </p>
+   *
+   * @param writerSchema Schema of the record where to look for the writer 
field.
+   * @param readerField  Reader field to identify the corresponding writer 
field
+   *                     of.
+   * @return the writer field, if any does correspond, or None.
+   */
+  public static HoodieSchemaField lookupWriterField(final HoodieSchema 
writerSchema, final HoodieSchemaField readerField) {
+    assert (writerSchema.getType() == HoodieSchemaType.RECORD);
+    final List<HoodieSchemaField> writerFields = new ArrayList<>();

Review Comment:
   It looks like the list is expected to have 0 or 1 elements so an Option may 
fit this usecase better, what do you think?



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java:
##########
@@ -191,4 +215,42 @@ public static boolean 
areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
     }
     return 
AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(), 
schema2.toAvroSchema());
   }
+
+  /**
+   * Identifies the writer field that corresponds to the specified reader 
field.
+   * This function is adapted from AvroSchemaCompatibility#lookupWriterField
+   *
+   * <p>
+   * Matching includes reader name aliases.
+   * </p>
+   *
+   * @param writerSchema Schema of the record where to look for the writer 
field.
+   * @param readerField  Reader field to identify the corresponding writer 
field
+   *                     of.
+   * @return the writer field, if any does correspond, or None.
+   */
+  public static HoodieSchemaField lookupWriterField(final HoodieSchema 
writerSchema, final HoodieSchemaField readerField) {
+    assert (writerSchema.getType() == HoodieSchemaType.RECORD);

Review Comment:
   Let's use the `ValidationUtils` here so we can return a more customized  
error message if the type is not `RECORD`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -116,21 +115,21 @@ public ClosableIterator<HoodieRecord<InternalRow>> 
getRecordIterator(HoodieSchem
   @Override
   public ClosableIterator<HoodieRecord<InternalRow>> 
getRecordIterator(HoodieSchema schema) throws IOException {
     //TODO boundary to revisit in later pr to use HoodieSchema directly

Review Comment:
   Let's remove this TODO now



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -1005,8 +1006,8 @@ static void validateSecondaryIndexSchemaEvolution(
     for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
       String columnName = entry.getKey();
       String indexName = entry.getValue();
-      
-      Schema.Field tableField = tableSchema.getField(columnName);
+
+      HoodieSchemaField tableField = tableSchema.getField(columnName).get();
       
       if (tableField == null) {

Review Comment:
   Let's update this to first return an `Option` and then we can check if it is 
present instead of checking for `null` here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java:
##########
@@ -158,19 +156,15 @@ private static Object inferDateValue(
         throw new IllegalArgumentException(
             "Unknown date format for partition path: " + partitionPath);
     }
-    if (fieldSchema.getLogicalType() instanceof LogicalTypes.Date) {
+    if (fieldSchema.getType() == HoodieSchemaType.DATE) {
       return Date.valueOf(time.toLocalDate());
     }
     return Timestamp.from(time.toInstant(ZoneOffset.UTC));
   }
 
-  private static boolean isTimeBasedLogicalType(LogicalType logicalType) {
-    return logicalType instanceof LogicalTypes.Date
-        || logicalType instanceof LogicalTypes.TimestampMillis
-        || logicalType instanceof LogicalTypes.TimestampMicros
-        || logicalType instanceof LogicalTypes.TimeMillis
-        || logicalType instanceof LogicalTypes.TimeMicros
-        || logicalType instanceof LogicalTypes.LocalTimestampMicros
-        || logicalType instanceof LogicalTypes.LocalTimestampMillis;
+  private static boolean isTimeBasedLogicalType(HoodieSchemaType logicalType) {

Review Comment:
   ```suggestion
     private static boolean isTimeBasedType(HoodieSchemaType type) {
   ```
   
   The logical type is an Avro concept. We have proper types for date, time, 
timestamp, etc now



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java:
##########
@@ -83,28 +84,27 @@ class TestTableSchemaResolver {
 
   @Test
   void testRecreateSchemaWhenDropPartitionColumns() {
-    Schema originSchema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    HoodieSchema originSchema = 
HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
 
     // case2
     String[] pts1 = new String[0];
-    Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts1));
+    HoodieSchema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts1));
     assertEquals(originSchema, s2);
 
     // case3: partition_path is in originSchema
     String[] pts2 = {"partition_path"};
-    Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts2));
+    HoodieSchema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts2));
     assertEquals(originSchema, s3);
 
     // case4: user_partition is not in originSchema
     String[] pts3 = {"user_partition"};
-    Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts3));
+    HoodieSchema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts3));
     assertNotEquals(originSchema, s4);
     assertTrue(s4.getFields().stream().anyMatch(f -> 
f.name().equals("user_partition")));
-    Schema.Field f = s4.getField("user_partition");
-    assertEquals(f.schema(), 
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING));
+    HoodieSchemaField f = s4.getField("user_partition").get();
+    assertEquals(f.schema(), 
HoodieSchema.createNullable(HoodieSchemaType.STRING));
 
     // case5: user_partition is in originSchema, but partition_path is in 
originSchema
-    String[] pts4 = {"user_partition", "partition_path"};

Review Comment:
   Do you think this is an error in the original test? It seems like this 
should be used below.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java:
##########
@@ -53,17 +51,17 @@ public Object[] getPartitionFieldVals(Option<String[]> 
partitionFields,
 
   private static Object[] getPartitionValues(String[] partitionFields,
                                              String partitionPath,
-                                             Schema schema) {
+                                             HoodieSchema schema) {
     String[] parts = partitionPath.split("/");
     int pathSegment = 0;
     boolean hasDateField = false;
     Object[] partitionValues = new Object[partitionFields.length];
     for (int i = 0; i < partitionFields.length; i++) {
       String partitionField = partitionFields[i];
-      Schema.Field field = schema.getField(partitionField);
+      Option<HoodieSchemaField> fieldOpt = schema.getField(partitionField);
       // if the field is not present in the schema, we assume it is a string
-      Schema fieldSchema = field == null ? Schema.create(Schema.Type.STRING) : 
getNonNullTypeFromUnion(field.schema());
-      LogicalType logicalType = fieldSchema.getLogicalType();
+      HoodieSchema fieldSchema = fieldOpt.isEmpty() ? 
HoodieSchema.create(HoodieSchemaType.STRING) : 
HoodieSchemaUtils.getNonNullTypeFromUnion(fieldOpt.get().schema());
+      HoodieSchemaType logicalType = fieldSchema.getType();

Review Comment:
   ```suggestion
         Option<HoodieSchemaField> field = schema.getField(partitionField);
         // if the field is not present in the schema, we assume it is a string
         HoodieSchema fieldSchema = field.map(f -> 
f.schema().getNonNullType()).orElseGet(() -> 
HoodieSchema.create(HoodieSchemaType.STRING));
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -62,6 +66,38 @@ public KafkaOffsetPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
   }
 
   @Override
+  public HoodieSchema processSchema(HoodieSchema schema) {

Review Comment:
   This was not overridden on purpose in case a user is extending the 
deprecated method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to