lokeshj1703 commented on code in PR #17601:
URL: https://github.com/apache/hudi/pull/17601#discussion_r2652595215


##########
hudi-common/src/avro/test/java/org/apache/parquet/schema/TestAvroSchemaRepair.java:
##########
@@ -0,0 +1,984 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.schema;
+
+import org.apache.hudi.avro.AvroSchemaUtils;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link AvroSchemaRepair}.
+ */
+public class TestAvroSchemaRepair {
+
+  @Test
+  public void testNoRepairNeededIdenticalSchemas() {
+    Schema requestedSchema = Schema.create(Schema.Type.LONG);
+    Schema tableSchema = Schema.create(Schema.Type.LONG);
+
+    Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, 
tableSchema);
+
+    assertSame(requestedSchema, result, "When schemas are identical, should 
return same instance");
+
+  }
+
+  @Test
+  public void testNoRepairNeededDifferentPrimitiveTypes() {
+    Schema requestedSchema = Schema.create(Schema.Type.STRING);
+    Schema tableSchema = Schema.create(Schema.Type.INT);
+
+    Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, 
tableSchema);
+
+    assertSame(requestedSchema, result, "When types differ, should return 
original schema");
+  }
+
+  @Test
+  public void testRepairLongWithoutLogicalTypeToLocalTimestampMillis() {
+    Schema requestedSchema = Schema.create(Schema.Type.LONG);
+    Schema tableSchema = 
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+
+    Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, 
tableSchema);
+
+    assertNotSame(requestedSchema, result, "Should create a new schema with 
logical type");
+    assertEquals(Schema.Type.LONG, result.getType());
+    assertEquals(LogicalTypes.localTimestampMillis(), result.getLogicalType());
+  }
+
+  @Test
+  public void testRepairLongWithoutLogicalTypeToLocalTimestampMicros() {
+    Schema requestedSchema = Schema.create(Schema.Type.LONG);
+    Schema tableSchema = 
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
+    Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, 
tableSchema);
+
+    assertNotSame(requestedSchema, result, "Should create a new schema with 
logical type");
+    assertEquals(Schema.Type.LONG, result.getType());
+    assertEquals(LogicalTypes.localTimestampMicros(), result.getLogicalType());
+  }

Review Comment:
   Should we repairing in this scenario? What if actual file logical type is 
millis but it is not present in the schema?



##########
hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java:
##########
@@ -42,13 +43,12 @@ public class ConvertingGenericData extends GenericData {
   private static final TimeConversions.TimeMicrosConversion 
TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
   private static final TimeConversions.TimestampMicrosConversion 
TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
 
-  // NOTE: Those are not supported in Avro 1.8.2
-  // TODO re-enable upon upgrading to 1.10
-  // private static final TimeConversions.TimestampMillisConversion 
TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
-  // private static final TimeConversions.TimeMillisConversion 
TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
-  // private static final TimeConversions.LocalTimestampMillisConversion 
LOCAL_TIMESTAMP_MILLIS_CONVERSION = new 
TimeConversions.LocalTimestampMillisConversion();
-  // private static final TimeConversions.LocalTimestampMicrosConversion 
LOCAL_TIMESTAMP_MICROS_CONVERSION = new 
TimeConversions.LocalTimestampMicrosConversion();
-
+  // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)

Review Comment:
   I think we might not need changes in this class since it is more about 
adding support for logical types?



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -260,15 +260,18 @@ private static Type 
visitAvroPrimitiveToBuildInternalType(Schema primitive) {
       } else if (logical instanceof LogicalTypes.Date) {
         return Types.DateType.get();
 
-      } else if (
-              logical instanceof LogicalTypes.TimeMillis
-                      || logical instanceof LogicalTypes.TimeMicros) {
+      } else if (logical instanceof LogicalTypes.TimeMillis) {

Review Comment:
   Changes in this class can also be revisited since these may not be required 
for 0.x branch



##########
pom.xml:
##########
@@ -2396,6 +2398,7 @@
         <slf4j.version>2.0.7</slf4j.version>
         <skip.hudi-spark2.unit.tests>true</skip.hudi-spark2.unit.tests>
         <skipITs>true</skipITs>
+        <spark32orEarlier>false</spark32orEarlier>

Review Comment:
   The property is not used anywhere. We will also need to set it as false in 
other profiles.



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -1040,12 +1051,35 @@ private static Object rewritePrimaryType(Object 
oldValue, Schema oldSchema, Sche
         case NULL:
         case BOOLEAN:
         case INT:
-        case LONG:
         case FLOAT:
         case DOUBLE:
         case BYTES:
         case STRING:
           return oldValue;
+        case LONG:

Review Comment:
   Even these changes can be revisited. They may not be required.



##########
hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.schema;
+
+import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AvroSchemaRepair {
+  public static boolean isLocalTimestampSupported = 
isLocalTimestampMillisSupported();
+
+  public static Schema repairLogicalTypes(Schema fileSchema, Schema 
tableSchema) {
+    Schema repairedSchema = repairAvroSchema(fileSchema, tableSchema);
+    if (repairedSchema != fileSchema) {
+      return AvroSchemaCache.intern(repairedSchema);
+    }
+    return fileSchema;
+  }
+
+  /**
+   * Performs schema repair on a schema, handling nullable unions.
+   */
+  private static Schema repairAvroSchema(Schema fileSchema, Schema 
tableSchema) {
+    // Always resolve nullable schemas first (returns unchanged if not a union)
+    Schema nonNullFileSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(fileSchema);
+    Schema nonNullTableSchema = 
AvroSchemaUtils.getNonNullTypeFromUnion(tableSchema);
+
+    // Perform repair on the non-null types
+    Schema nonNullRepairedSchema = repairAvroSchemaNonNull(nonNullFileSchema, 
nonNullTableSchema);
+
+    // If nothing changed, return the original schema
+    if (nonNullRepairedSchema == nonNullFileSchema) {
+      return fileSchema;
+    }
+
+    // If the original was a union, wrap the repaired schema back in a 
nullable union
+    if (fileSchema.getType() == Schema.Type.UNION) {
+      return AvroSchemaUtils.createNullableSchema(nonNullRepairedSchema);
+    }
+
+    return nonNullRepairedSchema;
+  }
+
+  /**
+   * Repairs non-nullable schemas (after unions have been resolved).
+   */
+  private static Schema repairAvroSchemaNonNull(Schema fileSchema, Schema 
tableSchema) {
+    // If schemas are already equal, nothing to repair
+    if (fileSchema.equals(tableSchema)) {
+      return fileSchema;
+    }
+
+    // If types are different, no repair can be done
+    if (fileSchema.getType() != tableSchema.getType()) {
+      return fileSchema;
+    }
+
+    // Handle record types (nested structs)
+    if (fileSchema.getType() == Schema.Type.RECORD) {
+      return repairRecord(fileSchema, tableSchema);
+    }
+
+    // Handle array types
+    if (fileSchema.getType() == Schema.Type.ARRAY) {
+      Schema repairedElementSchema = 
repairAvroSchema(fileSchema.getElementType(), tableSchema.getElementType());
+      // If element didn't change, return original array schema
+      if (repairedElementSchema == fileSchema.getElementType()) {
+        return fileSchema;
+      }
+      return Schema.createArray(repairedElementSchema);
+    }
+
+    // Handle map types
+    if (fileSchema.getType() == Schema.Type.MAP) {
+      Schema repairedValueSchema = repairAvroSchema(fileSchema.getValueType(), 
tableSchema.getValueType());
+      // If value didn't change, return original map schema
+      if (repairedValueSchema == fileSchema.getValueType()) {
+        return fileSchema;
+      }
+      return Schema.createMap(repairedValueSchema);
+    }
+
+    // Check primitive if we need to repair
+    if (needsLogicalTypeRepair(fileSchema, tableSchema)) {
+      // If we need to repair, return the table schema
+      return tableSchema;
+    }
+
+    // Default: return file schema
+    return fileSchema;
+  }
+
+  /**
+   * Quick check if a logical type repair is needed (no allocations).
+   */
+  private static boolean needsLogicalTypeRepair(Schema fileSchema, Schema 
tableSchema) {
+    if (fileSchema.getType() != Schema.Type.LONG || tableSchema.getType() != 
Schema.Type.LONG) {
+      return false;
+    }
+
+    LogicalType fileSchemaLogicalType = fileSchema.getLogicalType();
+    LogicalType tableSchemaLogicalType = tableSchema.getLogicalType();
+
+    // if file scheam has no logical type, and the table has a local 
timestamp, then we need to repair
+    if (fileSchemaLogicalType == null) {
+      try {
+        return tableSchemaLogicalType instanceof 
LogicalTypes.LocalTimestampMillis
+            || tableSchemaLogicalType instanceof 
LogicalTypes.LocalTimestampMicros;
+      } catch (Exception e) {
+        return false;
+      }
+    }
+
+    // if file schema is timestamp-micros, and the table is timestamp-millis, 
then we need to repair
+    return fileSchemaLogicalType instanceof LogicalTypes.TimestampMicros
+        && tableSchemaLogicalType instanceof LogicalTypes.TimestampMillis;
+  }
+
+  /**
+   * Performs record repair, returning the original schema if nothing changed.
+   */
+  private static Schema repairRecord(Schema fileSchema, Schema tableSchema) {
+    List<Schema.Field> fields = fileSchema.getFields();
+
+    // First pass: find the first field that changes
+    int firstChangedIndex = -1;
+    Schema firstRepairedSchema = null;
+
+    for (int i = 0; i < fields.size(); i++) {
+      Schema.Field requestedField = fields.get(i);
+      Schema.Field tableField = tableSchema.getField(requestedField.name());
+      if (tableField != null) {
+        Schema repairedSchema = repairAvroSchema(requestedField.schema(), 
tableField.schema());
+        if (repairedSchema != requestedField.schema()) {
+          firstChangedIndex = i;
+          firstRepairedSchema = repairedSchema;
+          break;
+        }
+      }
+    }
+
+    // If nothing changed, return the original schema
+    if (firstChangedIndex == -1) {
+      return fileSchema;
+    }
+
+    // Second pass: build the new schema with repaired fields
+    List<Schema.Field> repairedFields = new ArrayList<>(fields.size());
+
+    // Copy all fields before the first changed field
+    for (int i = 0; i < firstChangedIndex; i++) {
+      Schema.Field field = fields.get(i);
+      // Must create new Field since they cannot be reused
+      repairedFields.add(HoodieAvroUtils.createNewSchemaField(field));
+    }
+
+    // Add the first changed field (using cached repaired schema)
+    Schema.Field firstChangedField = fields.get(firstChangedIndex);
+    repairedFields.add(HoodieAvroUtils.createNewSchemaField(
+        firstChangedField.name(),
+        firstRepairedSchema,
+        firstChangedField.doc(),
+        firstChangedField.defaultVal()
+    ));
+
+    // Process remaining fields
+    for (int i = firstChangedIndex + 1; i < fields.size(); i++) {
+      Schema.Field requestedField = fields.get(i);
+      Schema.Field tableField = tableSchema.getField(requestedField.name());
+      Schema repairedSchema;
+
+      if (tableField != null) {
+        repairedSchema = repairAvroSchema(requestedField.schema(), 
tableField.schema());
+      } else {
+        repairedSchema = requestedField.schema();
+      }
+
+      // Must create new Field since they cannot be reused
+      repairedFields.add(HoodieAvroUtils.createNewSchemaField(
+          requestedField.name(),
+          repairedSchema,
+          requestedField.doc(),
+          requestedField.defaultVal()
+      ));
+    }
+
+    return Schema.createRecord(
+        fileSchema.getName(),
+        fileSchema.getDoc(),
+        fileSchema.getNamespace(),
+        fileSchema.isError(),
+        repairedFields
+    );
+  }
+
+  public static boolean hasTimestampMillisField(Schema tableSchema) {
+    switch (tableSchema.getType()) {
+      case RECORD:
+        for (Schema.Field field : tableSchema.getFields()) {
+          if (hasTimestampMillisField(field.schema())) {
+            return true;
+          }
+        }
+        return false;
+
+      case ARRAY:
+        return hasTimestampMillisField(tableSchema.getElementType());
+
+      case MAP:
+        return hasTimestampMillisField(tableSchema.getValueType());
+
+      case UNION:
+        return 
hasTimestampMillisField(AvroSchemaUtils.getNonNullTypeFromUnion(tableSchema));
+
+      default:
+        return tableSchema.getType() == Schema.Type.LONG
+            && (tableSchema.getLogicalType() instanceof 
LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof 
LogicalTypes.LocalTimestampMillis);
+    }
+  }
+
+  /**
+   * Check if LogicalTypes.LocalTimestampMillis is supported in the current 
Avro version
+   *
+   * @return true if LocalTimestampMillis is available, false otherwise
+   */
+  public static boolean isLocalTimestampMillisSupported() {
+    try {
+      return Arrays.stream(LogicalTypes.class.getDeclaredClasses())
+          .anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis"));
+    } catch (Exception e) {
+      return false;
+    }
+  }
+}

Review Comment:
   It seems like these APIs are not used. Should we remove these?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -156,7 +157,7 @@ private static class RecordIterator implements 
ClosableIterator<IndexedRecord> {
     private int totalRecords = 0;
     private int readRecords = 0;
 
-    private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] 
content) throws IOException {
+    private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] 
content, boolean enableLogicalTimestampFieldRepair) throws IOException {

Review Comment:
   This is always true. Can be omitted.



##########
hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java:
##########
@@ -57,12 +57,20 @@ private ConvertingGenericData() {
     addLogicalTypeConversion(DATE_CONVERSION);
     addLogicalTypeConversion(TIME_MICROS_CONVERSION);
     addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION);
-    // NOTE: Those are not supported in Avro 1.8.2
-    // TODO re-enable upon upgrading to 1.10
-    // addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
-    // addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
-    // addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
-    // addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
+    // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
+    // Only add conversions if they're available

Review Comment:
   Should we validate the fix and added tests with spark 2? I am not sure if CI 
covers it by default.



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -199,6 +203,25 @@ private static boolean isProjectionOfInternal(Schema 
sourceSchema,
     return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
   }
 
+  public static Option<Schema> findNestedFieldSchema(Schema schema, String 
fieldName) {
+    if (StringUtils.isNullOrEmpty(fieldName)) {
+      return Option.empty();
+    }
+    String[] parts = fieldName.split("\\.");
+    for (String part : parts) {
+      Schema.Field foundField = getNonNullTypeFromUnion(schema).getField(part);
+      if (foundField == null) {
+        throw new HoodieAvroSchemaException(fieldName + " not a field in " + 
schema);
+      }
+      schema = foundField.schema();
+    }
+    return Option.of(getNonNullTypeFromUnion(schema));
+  }
+
+  public static Option<Schema.Type> findNestedFieldType(Schema schema, String 
fieldName) {
+    return findNestedFieldSchema(schema, fieldName).map(Schema::getType);
+  }
+

Review Comment:
   These APIs are not used anywhere.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java:
##########
@@ -52,6 +52,12 @@ public static Instant microsToInstant(long microsFromEpoch) {
     return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
   }
 
+  public static Instant nanosToInstant(long nanosFromEpoch) {

Review Comment:
   These are unused



##########
pom.xml:
##########
@@ -2396,6 +2398,7 @@
         <slf4j.version>2.0.7</slf4j.version>
         <skip.hudi-spark2.unit.tests>true</skip.hudi-spark2.unit.tests>
         <skipITs>true</skipITs>
+        <spark32orEarlier>false</spark32orEarlier>

Review Comment:
   Is the schema repair only done for Spark 3.4+ versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to