lokeshj1703 commented on code in PR #17601: URL: https://github.com/apache/hudi/pull/17601#discussion_r2652595215
########## hudi-common/src/avro/test/java/org/apache/parquet/schema/TestAvroSchemaRepair.java: ########## @@ -0,0 +1,984 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.schema; + +import org.apache.hudi.avro.AvroSchemaUtils; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests {@link AvroSchemaRepair}. + */ +public class TestAvroSchemaRepair { + + @Test + public void testNoRepairNeededIdenticalSchemas() { + Schema requestedSchema = Schema.create(Schema.Type.LONG); + Schema tableSchema = Schema.create(Schema.Type.LONG); + + Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, tableSchema); + + assertSame(requestedSchema, result, "When schemas are identical, should return same instance"); + + } + + @Test + public void testNoRepairNeededDifferentPrimitiveTypes() { + Schema requestedSchema = Schema.create(Schema.Type.STRING); + Schema tableSchema = Schema.create(Schema.Type.INT); + + Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, tableSchema); + + assertSame(requestedSchema, result, "When types differ, should return original schema"); + } + + @Test + public void testRepairLongWithoutLogicalTypeToLocalTimestampMillis() { + Schema requestedSchema = Schema.create(Schema.Type.LONG); + Schema tableSchema = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + + Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, tableSchema); + + assertNotSame(requestedSchema, result, "Should create a new schema with logical type"); + assertEquals(Schema.Type.LONG, result.getType()); + assertEquals(LogicalTypes.localTimestampMillis(), result.getLogicalType()); + } + + @Test + public void testRepairLongWithoutLogicalTypeToLocalTimestampMicros() { + Schema requestedSchema = Schema.create(Schema.Type.LONG); + Schema tableSchema = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + + Schema result = AvroSchemaRepair.repairLogicalTypes(requestedSchema, tableSchema); + + assertNotSame(requestedSchema, result, "Should create a new schema with logical type"); + assertEquals(Schema.Type.LONG, result.getType()); + assertEquals(LogicalTypes.localTimestampMicros(), result.getLogicalType()); + } Review Comment: Should we repairing in this scenario? What if actual file logical type is millis but it is not present in the schema? ########## hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java: ########## @@ -42,13 +43,12 @@ public class ConvertingGenericData extends GenericData { private static final TimeConversions.TimeMicrosConversion TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion(); private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion(); - // NOTE: Those are not supported in Avro 1.8.2 - // TODO re-enable upon upgrading to 1.10 - // private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion(); - // private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion(); - // private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion(); - // private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion(); - + // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2) Review Comment: I think we might not need changes in this class since it is more about adding support for logical types? ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java: ########## @@ -260,15 +260,18 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) { } else if (logical instanceof LogicalTypes.Date) { return Types.DateType.get(); - } else if ( - logical instanceof LogicalTypes.TimeMillis - || logical instanceof LogicalTypes.TimeMicros) { + } else if (logical instanceof LogicalTypes.TimeMillis) { Review Comment: Changes in this class can also be revisited since these may not be required for 0.x branch ########## pom.xml: ########## @@ -2396,6 +2398,7 @@ <slf4j.version>2.0.7</slf4j.version> <skip.hudi-spark2.unit.tests>true</skip.hudi-spark2.unit.tests> <skipITs>true</skipITs> + <spark32orEarlier>false</spark32orEarlier> Review Comment: The property is not used anywhere. We will also need to set it as false in other profiles. ########## hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java: ########## @@ -1040,12 +1051,35 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche case NULL: case BOOLEAN: case INT: - case LONG: case FLOAT: case DOUBLE: case BYTES: case STRING: return oldValue; + case LONG: Review Comment: Even these changes can be revisited. They may not be required. ########## hudi-common/src/avro/java/org/apache/parquet/schema/AvroSchemaRepair.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.schema; + +import org.apache.hudi.avro.AvroSchemaCache; +import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.avro.HoodieAvroUtils; + +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class AvroSchemaRepair { + public static boolean isLocalTimestampSupported = isLocalTimestampMillisSupported(); + + public static Schema repairLogicalTypes(Schema fileSchema, Schema tableSchema) { + Schema repairedSchema = repairAvroSchema(fileSchema, tableSchema); + if (repairedSchema != fileSchema) { + return AvroSchemaCache.intern(repairedSchema); + } + return fileSchema; + } + + /** + * Performs schema repair on a schema, handling nullable unions. + */ + private static Schema repairAvroSchema(Schema fileSchema, Schema tableSchema) { + // Always resolve nullable schemas first (returns unchanged if not a union) + Schema nonNullFileSchema = AvroSchemaUtils.getNonNullTypeFromUnion(fileSchema); + Schema nonNullTableSchema = AvroSchemaUtils.getNonNullTypeFromUnion(tableSchema); + + // Perform repair on the non-null types + Schema nonNullRepairedSchema = repairAvroSchemaNonNull(nonNullFileSchema, nonNullTableSchema); + + // If nothing changed, return the original schema + if (nonNullRepairedSchema == nonNullFileSchema) { + return fileSchema; + } + + // If the original was a union, wrap the repaired schema back in a nullable union + if (fileSchema.getType() == Schema.Type.UNION) { + return AvroSchemaUtils.createNullableSchema(nonNullRepairedSchema); + } + + return nonNullRepairedSchema; + } + + /** + * Repairs non-nullable schemas (after unions have been resolved). + */ + private static Schema repairAvroSchemaNonNull(Schema fileSchema, Schema tableSchema) { + // If schemas are already equal, nothing to repair + if (fileSchema.equals(tableSchema)) { + return fileSchema; + } + + // If types are different, no repair can be done + if (fileSchema.getType() != tableSchema.getType()) { + return fileSchema; + } + + // Handle record types (nested structs) + if (fileSchema.getType() == Schema.Type.RECORD) { + return repairRecord(fileSchema, tableSchema); + } + + // Handle array types + if (fileSchema.getType() == Schema.Type.ARRAY) { + Schema repairedElementSchema = repairAvroSchema(fileSchema.getElementType(), tableSchema.getElementType()); + // If element didn't change, return original array schema + if (repairedElementSchema == fileSchema.getElementType()) { + return fileSchema; + } + return Schema.createArray(repairedElementSchema); + } + + // Handle map types + if (fileSchema.getType() == Schema.Type.MAP) { + Schema repairedValueSchema = repairAvroSchema(fileSchema.getValueType(), tableSchema.getValueType()); + // If value didn't change, return original map schema + if (repairedValueSchema == fileSchema.getValueType()) { + return fileSchema; + } + return Schema.createMap(repairedValueSchema); + } + + // Check primitive if we need to repair + if (needsLogicalTypeRepair(fileSchema, tableSchema)) { + // If we need to repair, return the table schema + return tableSchema; + } + + // Default: return file schema + return fileSchema; + } + + /** + * Quick check if a logical type repair is needed (no allocations). + */ + private static boolean needsLogicalTypeRepair(Schema fileSchema, Schema tableSchema) { + if (fileSchema.getType() != Schema.Type.LONG || tableSchema.getType() != Schema.Type.LONG) { + return false; + } + + LogicalType fileSchemaLogicalType = fileSchema.getLogicalType(); + LogicalType tableSchemaLogicalType = tableSchema.getLogicalType(); + + // if file scheam has no logical type, and the table has a local timestamp, then we need to repair + if (fileSchemaLogicalType == null) { + try { + return tableSchemaLogicalType instanceof LogicalTypes.LocalTimestampMillis + || tableSchemaLogicalType instanceof LogicalTypes.LocalTimestampMicros; + } catch (Exception e) { + return false; + } + } + + // if file schema is timestamp-micros, and the table is timestamp-millis, then we need to repair + return fileSchemaLogicalType instanceof LogicalTypes.TimestampMicros + && tableSchemaLogicalType instanceof LogicalTypes.TimestampMillis; + } + + /** + * Performs record repair, returning the original schema if nothing changed. + */ + private static Schema repairRecord(Schema fileSchema, Schema tableSchema) { + List<Schema.Field> fields = fileSchema.getFields(); + + // First pass: find the first field that changes + int firstChangedIndex = -1; + Schema firstRepairedSchema = null; + + for (int i = 0; i < fields.size(); i++) { + Schema.Field requestedField = fields.get(i); + Schema.Field tableField = tableSchema.getField(requestedField.name()); + if (tableField != null) { + Schema repairedSchema = repairAvroSchema(requestedField.schema(), tableField.schema()); + if (repairedSchema != requestedField.schema()) { + firstChangedIndex = i; + firstRepairedSchema = repairedSchema; + break; + } + } + } + + // If nothing changed, return the original schema + if (firstChangedIndex == -1) { + return fileSchema; + } + + // Second pass: build the new schema with repaired fields + List<Schema.Field> repairedFields = new ArrayList<>(fields.size()); + + // Copy all fields before the first changed field + for (int i = 0; i < firstChangedIndex; i++) { + Schema.Field field = fields.get(i); + // Must create new Field since they cannot be reused + repairedFields.add(HoodieAvroUtils.createNewSchemaField(field)); + } + + // Add the first changed field (using cached repaired schema) + Schema.Field firstChangedField = fields.get(firstChangedIndex); + repairedFields.add(HoodieAvroUtils.createNewSchemaField( + firstChangedField.name(), + firstRepairedSchema, + firstChangedField.doc(), + firstChangedField.defaultVal() + )); + + // Process remaining fields + for (int i = firstChangedIndex + 1; i < fields.size(); i++) { + Schema.Field requestedField = fields.get(i); + Schema.Field tableField = tableSchema.getField(requestedField.name()); + Schema repairedSchema; + + if (tableField != null) { + repairedSchema = repairAvroSchema(requestedField.schema(), tableField.schema()); + } else { + repairedSchema = requestedField.schema(); + } + + // Must create new Field since they cannot be reused + repairedFields.add(HoodieAvroUtils.createNewSchemaField( + requestedField.name(), + repairedSchema, + requestedField.doc(), + requestedField.defaultVal() + )); + } + + return Schema.createRecord( + fileSchema.getName(), + fileSchema.getDoc(), + fileSchema.getNamespace(), + fileSchema.isError(), + repairedFields + ); + } + + public static boolean hasTimestampMillisField(Schema tableSchema) { + switch (tableSchema.getType()) { + case RECORD: + for (Schema.Field field : tableSchema.getFields()) { + if (hasTimestampMillisField(field.schema())) { + return true; + } + } + return false; + + case ARRAY: + return hasTimestampMillisField(tableSchema.getElementType()); + + case MAP: + return hasTimestampMillisField(tableSchema.getValueType()); + + case UNION: + return hasTimestampMillisField(AvroSchemaUtils.getNonNullTypeFromUnion(tableSchema)); + + default: + return tableSchema.getType() == Schema.Type.LONG + && (tableSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis || tableSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis); + } + } + + /** + * Check if LogicalTypes.LocalTimestampMillis is supported in the current Avro version + * + * @return true if LocalTimestampMillis is available, false otherwise + */ + public static boolean isLocalTimestampMillisSupported() { + try { + return Arrays.stream(LogicalTypes.class.getDeclaredClasses()) + .anyMatch(c -> c.getSimpleName().equals("LocalTimestampMillis")); + } catch (Exception e) { + return false; + } + } +} Review Comment: It seems like these APIs are not used. Should we remove these? ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java: ########## @@ -156,7 +157,7 @@ private static class RecordIterator implements ClosableIterator<IndexedRecord> { private int totalRecords = 0; private int readRecords = 0; - private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content) throws IOException { + private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content, boolean enableLogicalTimestampFieldRepair) throws IOException { Review Comment: This is always true. Can be omitted. ########## hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java: ########## @@ -57,12 +57,20 @@ private ConvertingGenericData() { addLogicalTypeConversion(DATE_CONVERSION); addLogicalTypeConversion(TIME_MICROS_CONVERSION); addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION); - // NOTE: Those are not supported in Avro 1.8.2 - // TODO re-enable upon upgrading to 1.10 - // addLogicalTypeConversion(TIME_MILLIS_CONVERSION); - // addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION); - // addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION); - // addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION); + // NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2) + // Only add conversions if they're available Review Comment: Should we validate the fix and added tests with spark 2? I am not sure if CI covers it by default. ########## hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java: ########## @@ -199,6 +203,25 @@ private static boolean isProjectionOfInternal(Schema sourceSchema, return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema); } + public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldName) { + if (StringUtils.isNullOrEmpty(fieldName)) { + return Option.empty(); + } + String[] parts = fieldName.split("\\."); + for (String part : parts) { + Schema.Field foundField = getNonNullTypeFromUnion(schema).getField(part); + if (foundField == null) { + throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); + } + schema = foundField.schema(); + } + return Option.of(getNonNullTypeFromUnion(schema)); + } + + public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) { + return findNestedFieldSchema(schema, fieldName).map(Schema::getType); + } + Review Comment: These APIs are not used anywhere. ########## hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java: ########## @@ -52,6 +52,12 @@ public static Instant microsToInstant(long microsFromEpoch) { return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); } + public static Instant nanosToInstant(long nanosFromEpoch) { Review Comment: These are unused ########## pom.xml: ########## @@ -2396,6 +2398,7 @@ <slf4j.version>2.0.7</slf4j.version> <skip.hudi-spark2.unit.tests>true</skip.hudi-spark2.unit.tests> <skipITs>true</skipITs> + <spark32orEarlier>false</spark32orEarlier> Review Comment: Is the schema repair only done for Spark 3.4+ versions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
