This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 16012afb3f Flink: Backport: DynamicSink: Convert existing required fields to optional when missing in the data schema (#13660) 16012afb3f is described below commit 16012afb3f6fe822318315fe1475fc1acf6d4770 Author: Maximilian Michels <m...@apache.org> AuthorDate: Thu Jul 24 17:11:15 2025 +0200 Flink: Backport: DynamicSink: Convert existing required fields to optional when missing in the data schema (#13660) backport of #13659. --- .../flink/sink/dynamic/CompareSchemasVisitor.java | 9 +++++ .../sink/dynamic/TestCompareSchemasVisitor.java | 22 +++++++++++- .../flink/sink/dynamic/TestDynamicIcebergSink.java | 41 +++++++++++++++------- .../flink/sink/dynamic/CompareSchemasVisitor.java | 9 +++++ .../sink/dynamic/TestCompareSchemasVisitor.java | 22 +++++++++++- .../flink/sink/dynamic/TestDynamicIcebergSink.java | 41 +++++++++++++++------- 6 files changed, 118 insertions(+), 26 deletions(-) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index bb0d32f8f6..41ffa60954 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -87,6 +87,15 @@ public class CompareSchemasVisitor return Result.SCHEMA_UPDATE_NEEDED; } + for (Types.NestedField tableField : tableSchemaType.asStructType().fields()) { + if (tableField.isRequired() && struct.field(tableField.name()) == null) { + // If a field from the table schema does not exist in the input schema, then we won't visit + // it and check for required/optional compatibility. The only choice is to make the table + // field optional. + return Result.SCHEMA_UPDATE_NEEDED; + } + } + if (struct.fields().size() != tableSchemaType.asStructType().fields().size()) { return Result.DATA_CONVERSION_NEEDED; } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index 487b0ee7d9..385a354889 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -112,7 +112,7 @@ class TestCompareSchemasVisitor { } @Test - void testWithRequiredChange() { + void testRequiredChangeForMatchingField() { Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); Schema tableSchema = @@ -123,6 +123,26 @@ class TestCompareSchemasVisitor { .isEqualTo(CompareSchemasVisitor.Result.SAME); } + @Test + void testRequiredChangeForNonMatchingField() { + Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema(optional(1, "id", IntegerType.get()), required(2, "extra", StringType.get())); + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testNoRequiredChangeForNonMatchingField() { + Schema dataSchema = new Schema(required(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } + @Test void testStructDifferentId() { assertThat( diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 9765f2f057..b61e297cc1 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -354,19 +354,40 @@ class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { } @Test - void testSchemaEvolutionNonBackwardsCompatible() throws Exception { - Schema backwardsIncompatibleSchema = + void testRowEvolutionMakeMissingRequiredFieldOptional() throws Exception { + Schema existingSchemaWithRequiredField = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get())); - // Required column is missing in this schema - Schema erroringSchema = - new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + CATALOG_EXTENSION + .catalog() + .createTable(TableIdentifier.of(DATABASE, "t1"), existingSchemaWithRequiredField); + + Schema writeSchemaWithoutRequiredField = + new Schema(Types.NestedField.optional(1, "id", Types.IntegerType.get())); List<DynamicIcebergDataImpl> rows = Lists.newArrayList( new DynamicIcebergDataImpl( - backwardsIncompatibleSchema, "t1", "main", PartitionSpec.unpartitioned()), + writeSchemaWithoutRequiredField, + existingSchemaWithRequiredField, + "t1", + "main", + PartitionSpec.unpartitioned())); + + runTest(rows, this.env, 1); + } + + @Test + void testSchemaEvolutionNonBackwardsCompatible() throws Exception { + Schema initialSchema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + // Type change is not allowed + Schema erroringSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(initialSchema, "t1", "main", PartitionSpec.unpartitioned()), new DynamicIcebergDataImpl( erroringSchema, "t1", "main", PartitionSpec.unpartitioned())); @@ -376,11 +397,7 @@ class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { } catch (JobExecutionException e) { assertThat( ExceptionUtils.findThrowable( - e, - t -> - t.getMessage() - .contains( - "Field 2 in target schema ROW<`id` INT NOT NULL, `data` STRING NOT NULL> is non-nullable but does not exist in source schema."))) + e, t -> t.getMessage().contains("Cannot change column type: id: int -> string"))) .isNotEmpty(); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index bb0d32f8f6..41ffa60954 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -87,6 +87,15 @@ public class CompareSchemasVisitor return Result.SCHEMA_UPDATE_NEEDED; } + for (Types.NestedField tableField : tableSchemaType.asStructType().fields()) { + if (tableField.isRequired() && struct.field(tableField.name()) == null) { + // If a field from the table schema does not exist in the input schema, then we won't visit + // it and check for required/optional compatibility. The only choice is to make the table + // field optional. + return Result.SCHEMA_UPDATE_NEEDED; + } + } + if (struct.fields().size() != tableSchemaType.asStructType().fields().size()) { return Result.DATA_CONVERSION_NEEDED; } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index 487b0ee7d9..385a354889 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -112,7 +112,7 @@ class TestCompareSchemasVisitor { } @Test - void testWithRequiredChange() { + void testRequiredChangeForMatchingField() { Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); Schema tableSchema = @@ -123,6 +123,26 @@ class TestCompareSchemasVisitor { .isEqualTo(CompareSchemasVisitor.Result.SAME); } + @Test + void testRequiredChangeForNonMatchingField() { + Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema(optional(1, "id", IntegerType.get()), required(2, "extra", StringType.get())); + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testNoRequiredChangeForNonMatchingField() { + Schema dataSchema = new Schema(required(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } + @Test void testStructDifferentId() { assertThat( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 9765f2f057..b61e297cc1 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -354,19 +354,40 @@ class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { } @Test - void testSchemaEvolutionNonBackwardsCompatible() throws Exception { - Schema backwardsIncompatibleSchema = + void testRowEvolutionMakeMissingRequiredFieldOptional() throws Exception { + Schema existingSchemaWithRequiredField = new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "data", Types.StringType.get())); - // Required column is missing in this schema - Schema erroringSchema = - new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + CATALOG_EXTENSION + .catalog() + .createTable(TableIdentifier.of(DATABASE, "t1"), existingSchemaWithRequiredField); + + Schema writeSchemaWithoutRequiredField = + new Schema(Types.NestedField.optional(1, "id", Types.IntegerType.get())); List<DynamicIcebergDataImpl> rows = Lists.newArrayList( new DynamicIcebergDataImpl( - backwardsIncompatibleSchema, "t1", "main", PartitionSpec.unpartitioned()), + writeSchemaWithoutRequiredField, + existingSchemaWithRequiredField, + "t1", + "main", + PartitionSpec.unpartitioned())); + + runTest(rows, this.env, 1); + } + + @Test + void testSchemaEvolutionNonBackwardsCompatible() throws Exception { + Schema initialSchema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + // Type change is not allowed + Schema erroringSchema = new Schema(Types.NestedField.required(1, "id", Types.StringType.get())); + + List<DynamicIcebergDataImpl> rows = + Lists.newArrayList( + new DynamicIcebergDataImpl(initialSchema, "t1", "main", PartitionSpec.unpartitioned()), new DynamicIcebergDataImpl( erroringSchema, "t1", "main", PartitionSpec.unpartitioned())); @@ -376,11 +397,7 @@ class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase { } catch (JobExecutionException e) { assertThat( ExceptionUtils.findThrowable( - e, - t -> - t.getMessage() - .contains( - "Field 2 in target schema ROW<`id` INT NOT NULL, `data` STRING NOT NULL> is non-nullable but does not exist in source schema."))) + e, t -> t.getMessage().contains("Cannot change column type: id: int -> string"))) .isNotEmpty(); } }