This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d13d8ae9c82e7a7d8a78fdf08ae25f55713954c2
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Oct 22 11:40:40 2025 -0400

    fix(core): add table level validation for decimal evolution (#14089)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../apache/hudi/avro/AvroSchemaCompatibility.java  | 40 +++++++++++--
 .../org/apache/hudi/avro/TestAvroSchemaUtils.java  | 70 ++++++++++++++++++++++
 2 files changed, 106 insertions(+), 4 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java
index 27e326fd2c5f..a1ad79236e2b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -522,15 +523,46 @@ public class AvroSchemaCompatibility {
 
     private SchemaCompatibilityResult checkFixedSize(final Schema reader, 
final Schema writer,
                                                      final Deque<LocationInfo> 
locations) {
-      SchemaCompatibilityResult result = 
SchemaCompatibilityResult.compatible();
       int actual = reader.getFixedSize();
       int expected = writer.getFixedSize();
       if (actual != expected) {
         String message = String.format("Fixed size field '%s' expected: %d, 
found: %d", getLocationName(locations, reader.getType()), expected, actual);
-        result = 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH,
 reader, writer,
+        return 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH,
 reader, writer,
             message, asList(locations));
       }
-      return result;
+      return checkDecimalWidening(reader, writer, locations);
+    }
+
+    private SchemaCompatibilityResult checkDecimalWidening(final Schema 
reader, final Schema writer,
+                                                           final 
Deque<LocationInfo> locations) {
+      boolean isReaderDecimal = reader.getLogicalType() instanceof 
LogicalTypes.Decimal;
+      boolean isWriterDecimal = writer.getLogicalType() instanceof 
LogicalTypes.Decimal;
+      if (!isReaderDecimal && !isWriterDecimal) {
+        return SchemaCompatibilityResult.compatible();
+      }
+
+      if (!isReaderDecimal || !isWriterDecimal) {
+        String message = String.format("Decimal field '%s' expected: %s, 
found: %s", getLocationName(locations, reader.getType()), writer, reader);
+        return 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.DECIMAL_MISMATCH,
 reader, writer,
+            message, asList(locations));
+      }
+
+      int readerScale = ((LogicalTypes.Decimal) 
reader.getLogicalType()).getScale();
+      int writerScale = ((LogicalTypes.Decimal) 
writer.getLogicalType()).getScale();
+      int readerPrecision = ((LogicalTypes.Decimal) 
reader.getLogicalType()).getPrecision();
+      int writerPrecision = ((LogicalTypes.Decimal) 
writer.getLogicalType()).getPrecision();
+      if (readerScale == writerScale && readerPrecision == writerPrecision) {
+        return SchemaCompatibilityResult.compatible();
+      }
+
+      if (((readerPrecision - readerScale) < (writerPrecision - writerScale)) 
|| (readerScale < writerScale)) {
+        String message = String.format("Decimal field '%s' evolution is lossy. 
Existing precision: %d, scale: %d, Incoming precision: %d, scale: %d",
+            getLocationName(locations, reader.getType()), writerPrecision, 
writerScale, readerPrecision, readerScale);
+        return 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.DECIMAL_MISMATCH,
 reader, writer,
+            message, asList(locations));
+      }
+
+      return SchemaCompatibilityResult.compatible();
     }
 
     private SchemaCompatibilityResult checkSchemaNames(final Schema reader, 
final Schema writer,
@@ -593,7 +625,7 @@ public class AvroSchemaCompatibility {
 
   public enum SchemaIncompatibilityType {
     NAME_MISMATCH, FIXED_SIZE_MISMATCH, MISSING_ENUM_SYMBOLS, 
READER_FIELD_MISSING_DEFAULT_VALUE, TYPE_MISMATCH,
-    MISSING_UNION_BRANCH
+    DECIMAL_MISMATCH, MISSING_UNION_BRANCH
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
index 41b2d1cf1be8..ccfdded4b592 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
@@ -27,8 +27,11 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.MessageType;
+import org.apache.avro.SchemaBuilder;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
@@ -36,7 +39,9 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -336,6 +341,71 @@ public class TestAvroSchemaUtils {
     AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, 
shouldValidate, true, Collections.emptySet());
   }
 
+  @ParameterizedTest(name = "[{index}] oldSize={0}, oldPrecision={1}, 
oldScale={2} -> newSize={3}, newPrecision={4}, newScale={5}")
+  @MethodSource("provideCompatibleDecimalSchemas")
+  void testCompatibleDecimalSchemas(int oldSize, int oldPrecision, int 
oldScale,
+                                    int newSize, int newPrecision, int 
newScale) {
+    Schema oldSchema = createFixedDecimalSchema(oldSize, oldPrecision, 
oldScale);
+    Schema newSchema = createFixedDecimalSchema(newSize, newPrecision, 
newScale);
+
+    assertDoesNotThrow(() ->
+            AvroSchemaUtils.checkSchemaCompatible(oldSchema, newSchema, true, 
false, Collections.emptySet()),
+        "Schemas should be compatible"
+    );
+  }
+
+  @ParameterizedTest(name = "[{index}] oldSize={0}, oldPrecision={1}, 
oldScale={2} -> newSize={3}, newPrecision={4}, newScale={5}")
+  @MethodSource("provideIncompatibleDecimalSchemas")
+  void testIncompatibleDecimalSchemas(int oldSize, int oldPrecision, int 
oldScale,
+                                      int newSize, int newPrecision, int 
newScale) {
+    Schema oldSchema = createFixedDecimalSchema(oldSize, oldPrecision, 
oldScale);
+    Schema newSchema = createFixedDecimalSchema(newSize, newPrecision, 
newScale);
+
+    assertThrows(Exception.class, () ->
+            AvroSchemaUtils.checkSchemaCompatible(oldSchema, newSchema, true, 
false, Collections.emptySet()),
+        "Schemas should be incompatible"
+    );
+  }
+
+  private static Stream<Arguments> provideCompatibleDecimalSchemas() {
+    return Stream.of(
+        // Same size, same precision and scale
+        Arguments.of(8, 10, 2, 8, 10, 2),
+
+        // Same size, increased precision, same scale
+        Arguments.of(8, 10, 2, 8, 15, 2),
+
+        // Same size, increased precision and increased scale
+        Arguments.of(16, 20, 5, 16, 25, 10)
+    );
+  }
+
+  private static Stream<Arguments> provideIncompatibleDecimalSchemas() {
+    return Stream.of(
+        // Same size, decreased precision
+        Arguments.of(8, 15, 2, 8, 10, 2),
+
+        // Same size, same precision, increased scale
+        Arguments.of(8, 10, 2, 8, 10, 5),
+
+        // Same size, decreased precision, same scale
+        Arguments.of(16, 25, 3, 16, 20, 3),
+
+        // Same size, both decreased precision and increased scale
+        Arguments.of(8, 18, 4, 8, 15, 6)
+    );
+  }
+
+  private Schema createFixedDecimalSchema(int size, int precision, int scale) {
+    Schema fixedSchema = SchemaBuilder.fixed("FixedDecimal").size(size);
+    Schema decimalSchema = LogicalTypes.decimal(precision, 
scale).addToSchema(fixedSchema);
+
+    return SchemaBuilder.record("FixedDecimalSchema")
+        .fields()
+        .name("decimalField").type(decimalSchema).noDefault()
+        .endRecord();
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {

Reply via email to