mxm commented on code in PR #14729:
URL: https://github.com/apache/iceberg/pull/14729#discussion_r2704764134


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java:
##########
@@ -43,23 +44,22 @@ public class CompareSchemasVisitor
     extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {
 
   private final Schema tableSchema;
+  private final boolean caseSensitive;
   private final boolean dropUnusedColumns;
 
-  private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) 
{
+  private CompareSchemasVisitor(
+      Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
     this.tableSchema = tableSchema;
+    this.caseSensitive = caseSensitive;
     this.dropUnusedColumns = dropUnusedColumns;
   }
 
-  public static Result visit(Schema dataSchema, Schema tableSchema) {
-    return visit(dataSchema, tableSchema, true, false);
-  }

Review Comment:
   Ack, re-added and deprecated.



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java:
##########
@@ -94,7 +94,7 @@ private static Types.NestedField[] primitiveFields(
   public void testAddTopLevelPrimitives() {
     Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes()));
     UpdateSchema updateApi = loadUpdateApi(new Schema());
-    EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
true, PRESERVE_COLUMNS);

Review Comment:
   Yes, I added a constant.



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java:
##########
@@ -682,6 +705,78 @@ private static Schema 
getNestedSchemaWithOptionalModifier(boolean nestedIsOption
                                                             9, "s4", 
StringType.get()))))))))))))));
   }
 
+  @Test
+  public void testCaseInsensitiveAddField() {
+    Schema existingSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "name", Types.StringType.get()));
+    Schema targetSchema =
+        new Schema(
+            Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "NAME", Types.StringType.get()),
+            Types.NestedField.optional(3, "AGE", Types.IntegerType.get()));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
false, false);
+    Schema result = updateApi.apply();
+    assertThat(result.columns()).hasSize(3);
+    assertThat(result.findField("AGE")).isNotNull();
+  }
+
+  @Test
+  public void testCaseInsensitiveMakeFieldOptional() {
+    Schema existingSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "name", Types.StringType.get()));
+    Schema targetSchema = new Schema(Types.NestedField.optional(1, "ID", 
Types.IntegerType.get()));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
false, false);
+    Schema result = updateApi.apply();
+    assertThat(result.findField("name").isOptional()).isTrue();
+  }
+
+  @Test
+  public void testCaseInsensitiveNestedStructField() {
+    Schema existingSchema =
+        new Schema(
+            optional(1, "struct1", StructType.of(optional(2, "field1", 
Types.StringType.get()))));
+    Schema targetSchema =
+        new Schema(
+            optional(
+                1,
+                "STRUCT1",
+                StructType.of(
+                    optional(2, "FIELD1", Types.StringType.get()),
+                    optional(3, "FIELD2", Types.IntegerType.get()))));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
false, false);
+    Schema result = updateApi.apply();
+    Types.StructType struct = 
result.findField("struct1").type().asStructType();
+    assertThat(struct.fields()).hasSize(2);
+    assertThat(struct.field("FIELD2")).isNotNull();
+  }
+
+  @Test
+  public void testCaseSensitiveDoesNotMatch() {
+    Schema existingSchema =
+        new Schema(Types.NestedField.optional(1, "id", 
Types.IntegerType.get()));
+    Schema targetSchema =
+        new Schema(
+            Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "name", Types.StringType.get()));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
true, false);

Review Comment:
   I agree, it can be confusing and error-prone. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to