This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d13d8ae9c82e7a7d8a78fdf08ae25f55713954c2 Author: Jon Vexler <[email protected]> AuthorDate: Wed Oct 22 11:40:40 2025 -0400 fix(core): add table level validation for decimal evolution (#14089) Co-authored-by: Jonathan Vexler <=> --- .../apache/hudi/avro/AvroSchemaCompatibility.java | 40 +++++++++++-- .../org/apache/hudi/avro/TestAvroSchemaUtils.java | 70 ++++++++++++++++++++++ 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java index 27e326fd2c5f..a1ad79236e2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.util.Either; import org.apache.hudi.common.util.Option; import org.apache.avro.AvroRuntimeException; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -522,15 +523,46 @@ public class AvroSchemaCompatibility { private SchemaCompatibilityResult checkFixedSize(final Schema reader, final Schema writer, final Deque<LocationInfo> locations) { - SchemaCompatibilityResult result = SchemaCompatibilityResult.compatible(); int actual = reader.getFixedSize(); int expected = writer.getFixedSize(); if (actual != expected) { String message = String.format("Fixed size field '%s' expected: %d, found: %d", getLocationName(locations, reader.getType()), expected, actual); - result = SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH, reader, writer, + return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH, reader, writer, message, asList(locations)); } - return result; + return checkDecimalWidening(reader, writer, locations); + } + + private SchemaCompatibilityResult checkDecimalWidening(final Schema reader, final Schema writer, + final Deque<LocationInfo> locations) { + boolean isReaderDecimal = reader.getLogicalType() instanceof LogicalTypes.Decimal; + boolean isWriterDecimal = writer.getLogicalType() instanceof LogicalTypes.Decimal; + if (!isReaderDecimal && !isWriterDecimal) { + return SchemaCompatibilityResult.compatible(); + } + + if (!isReaderDecimal || !isWriterDecimal) { + String message = String.format("Decimal field '%s' expected: %s, found: %s", getLocationName(locations, reader.getType()), writer, reader); + return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.DECIMAL_MISMATCH, reader, writer, + message, asList(locations)); + } + + int readerScale = ((LogicalTypes.Decimal) reader.getLogicalType()).getScale(); + int writerScale = ((LogicalTypes.Decimal) writer.getLogicalType()).getScale(); + int readerPrecision = ((LogicalTypes.Decimal) reader.getLogicalType()).getPrecision(); + int writerPrecision = ((LogicalTypes.Decimal) writer.getLogicalType()).getPrecision(); + if (readerScale == writerScale && readerPrecision == writerPrecision) { + return SchemaCompatibilityResult.compatible(); + } + + if (((readerPrecision - readerScale) < (writerPrecision - writerScale)) || (readerScale < writerScale)) { + String message = String.format("Decimal field '%s' evolution is lossy. Existing precision: %d, scale: %d, Incoming precision: %d, scale: %d", + getLocationName(locations, reader.getType()), writerPrecision, writerScale, readerPrecision, readerScale); + return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.DECIMAL_MISMATCH, reader, writer, + message, asList(locations)); + } + + return SchemaCompatibilityResult.compatible(); } private SchemaCompatibilityResult checkSchemaNames(final Schema reader, final Schema writer, @@ -593,7 +625,7 @@ public class AvroSchemaCompatibility { public enum SchemaIncompatibilityType { NAME_MISMATCH, FIXED_SIZE_MISMATCH, MISSING_ENUM_SYMBOLS, READER_FIELD_MISSING_DEFAULT_VALUE, TYPE_MISMATCH, - MISSING_UNION_BRANCH + DECIMAL_MISMATCH, MISSING_UNION_BRANCH } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java index 41b2d1cf1be8..ccfdded4b592 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java @@ -27,8 +27,11 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.schema.MessageType; +import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; @@ -36,7 +39,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -336,6 +341,71 @@ public class TestAvroSchemaUtils { AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, true, Collections.emptySet()); } + @ParameterizedTest(name = "[{index}] oldSize={0}, oldPrecision={1}, oldScale={2} -> newSize={3}, newPrecision={4}, newScale={5}") + @MethodSource("provideCompatibleDecimalSchemas") + void testCompatibleDecimalSchemas(int oldSize, int oldPrecision, int oldScale, + int newSize, int newPrecision, int newScale) { + Schema oldSchema = createFixedDecimalSchema(oldSize, oldPrecision, oldScale); + Schema newSchema = createFixedDecimalSchema(newSize, newPrecision, newScale); + + assertDoesNotThrow(() -> + AvroSchemaUtils.checkSchemaCompatible(oldSchema, newSchema, true, false, Collections.emptySet()), + "Schemas should be compatible" + ); + } + + @ParameterizedTest(name = "[{index}] oldSize={0}, oldPrecision={1}, oldScale={2} -> newSize={3}, newPrecision={4}, newScale={5}") + @MethodSource("provideIncompatibleDecimalSchemas") + void testIncompatibleDecimalSchemas(int oldSize, int oldPrecision, int oldScale, + int newSize, int newPrecision, int newScale) { + Schema oldSchema = createFixedDecimalSchema(oldSize, oldPrecision, oldScale); + Schema newSchema = createFixedDecimalSchema(newSize, newPrecision, newScale); + + assertThrows(Exception.class, () -> + AvroSchemaUtils.checkSchemaCompatible(oldSchema, newSchema, true, false, Collections.emptySet()), + "Schemas should be incompatible" + ); + } + + private static Stream<Arguments> provideCompatibleDecimalSchemas() { + return Stream.of( + // Same size, same precision and scale + Arguments.of(8, 10, 2, 8, 10, 2), + + // Same size, increased precision, same scale + Arguments.of(8, 10, 2, 8, 15, 2), + + // Same size, increased precision and increased scale + Arguments.of(16, 20, 5, 16, 25, 10) + ); + } + + private static Stream<Arguments> provideIncompatibleDecimalSchemas() { + return Stream.of( + // Same size, decreased precision + Arguments.of(8, 15, 2, 8, 10, 2), + + // Same size, same precision, increased scale + Arguments.of(8, 10, 2, 8, 10, 5), + + // Same size, decreased precision, same scale + Arguments.of(16, 25, 3, 16, 20, 3), + + // Same size, both decreased precision and increased scale + Arguments.of(8, 18, 4, 8, 15, 6) + ); + } + + private Schema createFixedDecimalSchema(int size, int precision, int scale) { + Schema fixedSchema = SchemaBuilder.fixed("FixedDecimal").size(size); + Schema decimalSchema = LogicalTypes.decimal(precision, scale).addToSchema(fixedSchema); + + return SchemaBuilder.record("FixedDecimalSchema") + .fields() + .name("decimalField").type(decimalSchema).noDefault() + .endRecord(); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
