This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 16012afb3f Flink: Backport: DynamicSink: Convert existing required 
fields to optional when missing in the data schema (#13660)
16012afb3f is described below

commit 16012afb3f6fe822318315fe1475fc1acf6d4770
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Thu Jul 24 17:11:15 2025 +0200

    Flink: Backport: DynamicSink: Convert existing required fields to optional 
when missing in the data schema (#13660)
    
    backport of #13659.
---
 .../flink/sink/dynamic/CompareSchemasVisitor.java  |  9 +++++
 .../sink/dynamic/TestCompareSchemasVisitor.java    | 22 +++++++++++-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 41 +++++++++++++++-------
 .../flink/sink/dynamic/CompareSchemasVisitor.java  |  9 +++++
 .../sink/dynamic/TestCompareSchemasVisitor.java    | 22 +++++++++++-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 41 +++++++++++++++-------
 6 files changed, 118 insertions(+), 26 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
index bb0d32f8f6..41ffa60954 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
@@ -87,6 +87,15 @@ public class CompareSchemasVisitor
       return Result.SCHEMA_UPDATE_NEEDED;
     }
 
+    for (Types.NestedField tableField : 
tableSchemaType.asStructType().fields()) {
+      if (tableField.isRequired() && struct.field(tableField.name()) == null) {
+        // If a field from the table schema does not exist in the input 
schema, then we won't visit
+        // it and check for required/optional compatibility. The only choice 
is to make the table
+        // field optional.
+        return Result.SCHEMA_UPDATE_NEEDED;
+      }
+    }
+
     if (struct.fields().size() != 
tableSchemaType.asStructType().fields().size()) {
       return Result.DATA_CONVERSION_NEEDED;
     }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
index 487b0ee7d9..385a354889 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
@@ -112,7 +112,7 @@ class TestCompareSchemasVisitor {
   }
 
   @Test
-  void testWithRequiredChange() {
+  void testRequiredChangeForMatchingField() {
     Schema dataSchema =
         new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
     Schema tableSchema =
@@ -123,6 +123,26 @@ class TestCompareSchemasVisitor {
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
+  @Test
+  void testRequiredChangeForNonMatchingField() {
+    Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()));
+    Schema tableSchema =
+        new Schema(optional(1, "id", IntegerType.get()), required(2, "extra", 
StringType.get()));
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+        .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
+    assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema))
+        .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
+  }
+
+  @Test
+  void testNoRequiredChangeForNonMatchingField() {
+    Schema dataSchema = new Schema(required(1, "id", IntegerType.get()));
+    Schema tableSchema =
+        new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+        .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+  }
+
   @Test
   void testStructDifferentId() {
     assertThat(
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 9765f2f057..b61e297cc1 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -354,19 +354,40 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
   }
 
   @Test
-  void testSchemaEvolutionNonBackwardsCompatible() throws Exception {
-    Schema backwardsIncompatibleSchema =
+  void testRowEvolutionMakeMissingRequiredFieldOptional() throws Exception {
+    Schema existingSchemaWithRequiredField =
         new Schema(
-            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
             Types.NestedField.required(2, "data", Types.StringType.get()));
-    // Required column is missing in this schema
-    Schema erroringSchema =
-        new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(TableIdentifier.of(DATABASE, "t1"), 
existingSchemaWithRequiredField);
+
+    Schema writeSchemaWithoutRequiredField =
+        new Schema(Types.NestedField.optional(1, "id", 
Types.IntegerType.get()));
 
     List<DynamicIcebergDataImpl> rows =
         Lists.newArrayList(
             new DynamicIcebergDataImpl(
-                backwardsIncompatibleSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
+                writeSchemaWithoutRequiredField,
+                existingSchemaWithRequiredField,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned()));
+
+    runTest(rows, this.env, 1);
+  }
+
+  @Test
+  void testSchemaEvolutionNonBackwardsCompatible() throws Exception {
+    Schema initialSchema = new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+    // Type change is not allowed
+    Schema erroringSchema = new Schema(Types.NestedField.required(1, "id", 
Types.StringType.get()));
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(initialSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
             new DynamicIcebergDataImpl(
                 erroringSchema, "t1", "main", PartitionSpec.unpartitioned()));
 
@@ -376,11 +397,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     } catch (JobExecutionException e) {
       assertThat(
               ExceptionUtils.findThrowable(
-                  e,
-                  t ->
-                      t.getMessage()
-                          .contains(
-                              "Field 2 in target schema ROW<`id` INT NOT NULL, 
`data` STRING NOT NULL> is non-nullable but does not exist in source schema.")))
+                  e, t -> t.getMessage().contains("Cannot change column type: 
id: int -> string")))
           .isNotEmpty();
     }
   }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
index bb0d32f8f6..41ffa60954 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
@@ -87,6 +87,15 @@ public class CompareSchemasVisitor
       return Result.SCHEMA_UPDATE_NEEDED;
     }
 
+    for (Types.NestedField tableField : 
tableSchemaType.asStructType().fields()) {
+      if (tableField.isRequired() && struct.field(tableField.name()) == null) {
+        // If a field from the table schema does not exist in the input 
schema, then we won't visit
+        // it and check for required/optional compatibility. The only choice 
is to make the table
+        // field optional.
+        return Result.SCHEMA_UPDATE_NEEDED;
+      }
+    }
+
     if (struct.fields().size() != 
tableSchemaType.asStructType().fields().size()) {
       return Result.DATA_CONVERSION_NEEDED;
     }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
index 487b0ee7d9..385a354889 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
@@ -112,7 +112,7 @@ class TestCompareSchemasVisitor {
   }
 
   @Test
-  void testWithRequiredChange() {
+  void testRequiredChangeForMatchingField() {
     Schema dataSchema =
         new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
     Schema tableSchema =
@@ -123,6 +123,26 @@ class TestCompareSchemasVisitor {
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
+  @Test
+  void testRequiredChangeForNonMatchingField() {
+    Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()));
+    Schema tableSchema =
+        new Schema(optional(1, "id", IntegerType.get()), required(2, "extra", 
StringType.get()));
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+        .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
+    assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema))
+        .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
+  }
+
+  @Test
+  void testNoRequiredChangeForNonMatchingField() {
+    Schema dataSchema = new Schema(required(1, "id", IntegerType.get()));
+    Schema tableSchema =
+        new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+        .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+  }
+
   @Test
   void testStructDifferentId() {
     assertThat(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 9765f2f057..b61e297cc1 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -354,19 +354,40 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
   }
 
   @Test
-  void testSchemaEvolutionNonBackwardsCompatible() throws Exception {
-    Schema backwardsIncompatibleSchema =
+  void testRowEvolutionMakeMissingRequiredFieldOptional() throws Exception {
+    Schema existingSchemaWithRequiredField =
         new Schema(
-            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
             Types.NestedField.required(2, "data", Types.StringType.get()));
-    // Required column is missing in this schema
-    Schema erroringSchema =
-        new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+
+    CATALOG_EXTENSION
+        .catalog()
+        .createTable(TableIdentifier.of(DATABASE, "t1"), 
existingSchemaWithRequiredField);
+
+    Schema writeSchemaWithoutRequiredField =
+        new Schema(Types.NestedField.optional(1, "id", 
Types.IntegerType.get()));
 
     List<DynamicIcebergDataImpl> rows =
         Lists.newArrayList(
             new DynamicIcebergDataImpl(
-                backwardsIncompatibleSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
+                writeSchemaWithoutRequiredField,
+                existingSchemaWithRequiredField,
+                "t1",
+                "main",
+                PartitionSpec.unpartitioned()));
+
+    runTest(rows, this.env, 1);
+  }
+
+  @Test
+  void testSchemaEvolutionNonBackwardsCompatible() throws Exception {
+    Schema initialSchema = new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+    // Type change is not allowed
+    Schema erroringSchema = new Schema(Types.NestedField.required(1, "id", 
Types.StringType.get()));
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(initialSchema, "t1", "main", 
PartitionSpec.unpartitioned()),
             new DynamicIcebergDataImpl(
                 erroringSchema, "t1", "main", PartitionSpec.unpartitioned()));
 
@@ -376,11 +397,7 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     } catch (JobExecutionException e) {
       assertThat(
               ExceptionUtils.findThrowable(
-                  e,
-                  t ->
-                      t.getMessage()
-                          .contains(
-                              "Field 2 in target schema ROW<`id` INT NOT NULL, 
`data` STRING NOT NULL> is non-nullable but does not exist in source schema.")))
+                  e, t -> t.getMessage().contains("Cannot change column type: 
id: int -> string")))
           .isNotEmpty();
     }
   }

Reply via email to