This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 504640f78b Flink: Dynamic Sink: Add case-insensitive field matching 
(#14729)
504640f78b is described below

commit 504640f78bfde7338666d3451ac43fadbbc5627a
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Jan 19 17:45:29 2026 +0100

    Flink: Dynamic Sink: Add case-insensitive field matching (#14729)
---
 .../flink/sink/dynamic/CompareSchemasVisitor.java  |  47 ++++---
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  20 ++-
 .../flink/sink/dynamic/DynamicRecordProcessor.java |  20 ++-
 .../sink/dynamic/DynamicTableUpdateOperator.java   |  17 ++-
 .../flink/sink/dynamic/EvolveSchemaVisitor.java    |  24 +++-
 .../flink/sink/dynamic/TableMetadataCache.java     |  35 +++--
 .../iceberg/flink/sink/dynamic/TableUpdater.java   |  18 +--
 .../sink/dynamic/TestCompareSchemasVisitor.java    | 143 +++++++++++++++----
 .../flink/sink/dynamic/TestDynamicIcebergSink.java |  85 ++++++++++++
 .../dynamic/TestDynamicTableUpdateOperator.java    |  82 +++++++++--
 .../sink/dynamic/TestEvolveSchemaVisitor.java      | 154 ++++++++++++++++++---
 .../flink/sink/dynamic/TestTableMetadataCache.java | 104 +++++++++++---
 .../flink/sink/dynamic/TestTableUpdater.java       | 109 +++++++++++----
 13 files changed, 698 insertions(+), 160 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
index 60561b0f56..cb4fc62c01 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.flink.sink.dynamic;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
@@ -43,26 +45,31 @@ public class CompareSchemasVisitor
     extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {
 
   private final Schema tableSchema;
+  private final boolean caseSensitive;
   private final boolean dropUnusedColumns;
 
-  private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) 
{
+  private CompareSchemasVisitor(
+      Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
     this.tableSchema = tableSchema;
+    this.caseSensitive = caseSensitive;
     this.dropUnusedColumns = dropUnusedColumns;
   }
 
-  public static Result visit(Schema dataSchema, Schema tableSchema) {
-    return visit(dataSchema, tableSchema, true, false);
-  }
-
   public static Result visit(
       Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean 
dropUnusedColumns) {
     return visit(
         dataSchema,
         -1,
-        new CompareSchemasVisitor(tableSchema, dropUnusedColumns),
+        new CompareSchemasVisitor(tableSchema, caseSensitive, 
dropUnusedColumns),
         new PartnerIdByNameAccessors(tableSchema, caseSensitive));
   }
 
+  @VisibleForTesting
+  @Deprecated
+  public static Result visit(Schema dataSchema, Schema tableSchema) {
+    return visit(dataSchema, tableSchema, true, false);
+  }
+
   @Override
   public Result schema(Schema dataSchema, Integer tableSchemaId, Result 
downstream) {
     if (tableSchemaId == null) {
@@ -92,7 +99,7 @@ public class CompareSchemasVisitor
     }
 
     for (Types.NestedField tableField : 
tableSchemaType.asStructType().fields()) {
-      if (struct.field(tableField.name()) == null
+      if (getFieldFromStruct(tableField.name(), struct, caseSensitive) == null
           && (tableField.isRequired() || dropUnusedColumns)) {
         // If a field from the table schema does not exist in the input 
schema, then we won't visit
         // it. The only choice is to make the table field optional or drop it.
@@ -105,11 +112,10 @@ public class CompareSchemasVisitor
     }
 
     for (int i = 0; i < struct.fields().size(); ++i) {
-      if (!struct
-          .fields()
-          .get(i)
-          .name()
-          .equals(tableSchemaType.asStructType().fields().get(i).name())) {
+      String fieldName = struct.fields().get(i).name();
+      String tableFieldName = 
tableSchemaType.asStructType().fields().get(i).name();
+      if ((caseSensitive && !fieldName.equals(tableFieldName))
+          || (!caseSensitive && !fieldName.equalsIgnoreCase(tableFieldName))) {
         return Result.DATA_CONVERSION_NEEDED;
       }
     }
@@ -117,6 +123,12 @@ public class CompareSchemasVisitor
     return result;
   }
 
+  @Nullable
+  static Types.NestedField getFieldFromStruct(
+      String fieldName, Types.StructType struct, boolean caseSensitive) {
+    return caseSensitive ? struct.field(fieldName) : 
struct.caseInsensitiveField(fieldName);
+  }
+
   @Override
   public Result field(Types.NestedField field, Integer tableSchemaId, Result 
typeResult) {
     if (tableSchemaId == null) {
@@ -191,14 +203,10 @@ public class CompareSchemasVisitor
 
   static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
     private final Schema tableSchema;
-    private boolean caseSensitive = true;
+    private boolean caseSensitive;
 
-    PartnerIdByNameAccessors(Schema tableSchema) {
+    PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) {
       this.tableSchema = tableSchema;
-    }
-
-    private PartnerIdByNameAccessors(Schema tableSchema, boolean 
caseSensitive) {
-      this(tableSchema);
       this.caseSensitive = caseSensitive;
     }
 
@@ -211,8 +219,7 @@ public class CompareSchemasVisitor
         struct = 
tableSchema.findField(tableSchemaFieldId).type().asStructType();
       }
 
-      Types.NestedField field =
-          caseSensitive ? struct.field(name) : 
struct.caseInsensitiveField(name);
+      Types.NestedField field = getFieldFromStruct(name, struct, 
caseSensitive);
       if (field != null) {
         return field.fieldId();
       }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e1bc8deb9d..61b1f84a43 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -191,6 +191,7 @@ public class DynamicIcebergSink
     private int cacheMaximumSize = 100;
     private long cacheRefreshMs = 1_000;
     private int inputSchemasPerTableCacheMaximumSize = 10;
+    private boolean caseSensitive = true;
 
     Builder() {}
 
@@ -353,6 +354,15 @@ public class DynamicIcebergSink
       return this;
     }
 
+    /**
+     * Set whether schema field name matching should be case-sensitive. The 
default is to match the
+     * field names case-sensitive.
+     */
+    public Builder<T> caseSensitive(boolean newCaseSensitive) {
+      this.caseSensitive = newCaseSensitive;
+      return this;
+    }
+
     private String operatorName(String suffix) {
       return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
     }
@@ -399,11 +409,12 @@ public class DynamicIcebergSink
                       generator,
                       catalogLoader,
                       immediateUpdate,
-                      dropUnusedColumns,
                       cacheMaximumSize,
                       cacheRefreshMs,
                       inputSchemasPerTableCacheMaximumSize,
-                      tableCreator))
+                      tableCreator,
+                      caseSensitive,
+                      dropUnusedColumns))
               .uid(prefixIfNotNull(uidPrefix, "-generator"))
               .name(operatorName("generator"))
               .returns(type);
@@ -418,11 +429,12 @@ public class DynamicIcebergSink
               .map(
                   new DynamicTableUpdateOperator(
                       catalogLoader,
-                      dropUnusedColumns,
                       cacheMaximumSize,
                       cacheRefreshMs,
                       inputSchemasPerTableCacheMaximumSize,
-                      tableCreator))
+                      tableCreator,
+                      caseSensitive,
+                      dropUnusedColumns))
               .uid(prefixIfNotNull(uidPrefix, "-updater"))
               .name(operatorName("Updater"))
               .returns(type)
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
index 427aa6ceaf..07dfad2780 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
@@ -45,6 +45,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, 
DynamicRecordInternal
   private final long cacheRefreshMs;
   private final int inputSchemasPerTableCacheMaximumSize;
   private final TableCreator tableCreator;
+  private final boolean caseSensitive;
 
   private transient TableMetadataCache tableCache;
   private transient HashKeyGenerator hashKeyGenerator;
@@ -57,19 +58,21 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, 
DynamicRecordInternal
       DynamicRecordGenerator<T> generator,
       CatalogLoader catalogLoader,
       boolean immediateUpdate,
-      boolean dropUnusedColumns,
       int cacheMaximumSize,
       long cacheRefreshMs,
       int inputSchemasPerTableCacheMaximumSize,
-      TableCreator tableCreator) {
+      TableCreator tableCreator,
+      boolean caseSensitive,
+      boolean dropUnusedColumns) {
     this.generator = generator;
     this.catalogLoader = catalogLoader;
     this.immediateUpdate = immediateUpdate;
-    this.dropUnusedColumns = dropUnusedColumns;
     this.cacheMaximumSize = cacheMaximumSize;
     this.cacheRefreshMs = cacheRefreshMs;
     this.inputSchemasPerTableCacheMaximumSize = 
inputSchemasPerTableCacheMaximumSize;
     this.tableCreator = tableCreator;
+    this.caseSensitive = caseSensitive;
+    this.dropUnusedColumns = dropUnusedColumns;
   }
 
   @Override
@@ -78,12 +81,17 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, 
DynamicRecordInternal
     Catalog catalog = catalogLoader.loadCatalog();
     this.tableCache =
         new TableMetadataCache(
-            catalog, cacheMaximumSize, cacheRefreshMs, 
inputSchemasPerTableCacheMaximumSize);
+            catalog,
+            cacheMaximumSize,
+            cacheRefreshMs,
+            inputSchemasPerTableCacheMaximumSize,
+            caseSensitive,
+            dropUnusedColumns);
     this.hashKeyGenerator =
         new HashKeyGenerator(
             cacheMaximumSize, 
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
     if (immediateUpdate) {
-      updater = new TableUpdater(tableCache, catalog, dropUnusedColumns);
+      updater = new TableUpdater(tableCache, catalog, caseSensitive, 
dropUnusedColumns);
     } else {
       updateStream =
           new OutputTag<>(
@@ -109,7 +117,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, 
DynamicRecordInternal
 
     TableMetadataCache.ResolvedSchemaInfo foundSchema =
         exists
-            ? tableCache.schema(data.tableIdentifier(), data.schema(), 
dropUnusedColumns)
+            ? tableCache.schema(data.tableIdentifier(), data.schema())
             : TableMetadataCache.NOT_FOUND;
 
     PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), 
data.spec()) : null;
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
index 8f38d4f8be..456f20adf5 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
@@ -43,22 +43,25 @@ class DynamicTableUpdateOperator
   private final long cacheRefreshMs;
   private final int inputSchemasPerTableCacheMaximumSize;
   private final TableCreator tableCreator;
+  private final boolean caseSensitive;
 
   private transient TableUpdater updater;
 
   DynamicTableUpdateOperator(
       CatalogLoader catalogLoader,
-      boolean dropUnusedColumns,
       int cacheMaximumSize,
       long cacheRefreshMs,
       int inputSchemasPerTableCacheMaximumSize,
-      TableCreator tableCreator) {
+      TableCreator tableCreator,
+      boolean caseSensitive,
+      boolean dropUnusedColumns) {
     this.catalogLoader = catalogLoader;
-    this.dropUnusedColumns = dropUnusedColumns;
     this.cacheMaximumSize = cacheMaximumSize;
     this.cacheRefreshMs = cacheRefreshMs;
     this.inputSchemasPerTableCacheMaximumSize = 
inputSchemasPerTableCacheMaximumSize;
     this.tableCreator = tableCreator;
+    this.caseSensitive = caseSensitive;
+    this.dropUnusedColumns = dropUnusedColumns;
   }
 
   @Override
@@ -68,8 +71,14 @@ class DynamicTableUpdateOperator
     this.updater =
         new TableUpdater(
             new TableMetadataCache(
-                catalog, cacheMaximumSize, cacheRefreshMs, 
inputSchemasPerTableCacheMaximumSize),
+                catalog,
+                cacheMaximumSize,
+                cacheRefreshMs,
+                inputSchemasPerTableCacheMaximumSize,
+                caseSensitive,
+                dropUnusedColumns),
             catalog,
+            caseSensitive,
             dropUnusedColumns);
   }
 
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
index e106cf5754..d9747d201e 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
@@ -59,6 +59,7 @@ public class EvolveSchemaVisitor extends 
SchemaWithPartnerVisitor<Integer, Boole
   private final UpdateSchema api;
   private final Schema existingSchema;
   private final Schema targetSchema;
+  private final boolean caseSensitive;
   private final boolean dropUnusedColumns;
 
   private EvolveSchemaVisitor(
@@ -66,11 +67,13 @@ public class EvolveSchemaVisitor extends 
SchemaWithPartnerVisitor<Integer, Boole
       UpdateSchema api,
       Schema existingSchema,
       Schema targetSchema,
+      boolean caseSensitive,
       boolean dropUnusedColumns) {
     this.identifier = identifier;
-    this.api = api;
+    this.api = api.caseSensitive(caseSensitive);
     this.existingSchema = existingSchema;
     this.targetSchema = targetSchema;
+    this.caseSensitive = caseSensitive;
     this.dropUnusedColumns = dropUnusedColumns;
   }
 
@@ -82,6 +85,7 @@ public class EvolveSchemaVisitor extends 
SchemaWithPartnerVisitor<Integer, Boole
    * @param api an UpdateSchema for adding changes
    * @param existingSchema an existing schema
    * @param targetSchema a new schema to compare with the existing
+   * @param caseSensitive whether field name matching should be case-sensitive
    * @param dropUnusedColumns whether to drop columns not present in target 
schema
    */
   public static void visit(
@@ -89,12 +93,14 @@ public class EvolveSchemaVisitor extends 
SchemaWithPartnerVisitor<Integer, Boole
       UpdateSchema api,
       Schema existingSchema,
       Schema targetSchema,
+      boolean caseSensitive,
       boolean dropUnusedColumns) {
     visit(
         targetSchema,
         -1,
-        new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema, 
dropUnusedColumns),
-        new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
+        new EvolveSchemaVisitor(
+            identifier, api, existingSchema, targetSchema, caseSensitive, 
dropUnusedColumns),
+        new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema, 
caseSensitive));
   }
 
   @Override
@@ -107,14 +113,16 @@ public class EvolveSchemaVisitor extends 
SchemaWithPartnerVisitor<Integer, Boole
     Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
     String after = null;
     for (Types.NestedField targetField : struct.fields()) {
-      Types.NestedField nestedField = partnerStruct.field(targetField.name());
+      Types.NestedField nestedField =
+          CompareSchemasVisitor.getFieldFromStruct(
+              targetField.name(), partnerStruct, caseSensitive);
       final String columnName;
       if (nestedField != null) {
         updateColumn(nestedField, targetField);
         columnName = this.existingSchema.findColumnName(nestedField.fieldId());
       } else {
         addColumn(partnerId, targetField);
-        columnName = this.targetSchema.findColumnName(targetField.fieldId());
+        columnName = targetSchema.findColumnName(targetField.fieldId());
       }
 
       setPosition(columnName, after);
@@ -122,7 +130,11 @@ public class EvolveSchemaVisitor extends 
SchemaWithPartnerVisitor<Integer, Boole
     }
 
     for (Types.NestedField existingField : partnerStruct.fields()) {
-      if (struct.field(existingField.name()) == null) {
+      Types.NestedField targetField =
+          caseSensitive
+              ? struct.field(existingField.name())
+              : struct.caseInsensitiveField(existingField.name());
+      if (targetField == null) {
         String columnName = 
this.existingSchema.findColumnName(existingField.fieldId());
         if (dropUnusedColumns) {
           LOG.debug("{}: Dropping column: {}", identifier.name(), columnName);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 3be8bbcd91..fdefc01402 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -55,10 +55,24 @@ class TableMetadataCache {
   private final Clock cacheRefreshClock;
   private final int inputSchemasPerTableCacheMaximumSize;
   private final Map<TableIdentifier, CacheItem> tableCache;
+  private final boolean caseSensitive;
+  private final boolean dropUnusedColumns;
 
   TableMetadataCache(
-      Catalog catalog, int maximumSize, long refreshMs, int 
inputSchemasPerTableCacheMaximumSize) {
-    this(catalog, maximumSize, refreshMs, 
inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
+      Catalog catalog,
+      int maximumSize,
+      long refreshMs,
+      int inputSchemasPerTableCacheMaximumSize,
+      boolean caseSensitive,
+      boolean dropUnusedColumns) {
+    this(
+        catalog,
+        maximumSize,
+        refreshMs,
+        inputSchemasPerTableCacheMaximumSize,
+        caseSensitive,
+        dropUnusedColumns,
+        Clock.systemUTC());
   }
 
   @VisibleForTesting
@@ -67,12 +81,16 @@ class TableMetadataCache {
       int maximumSize,
       long refreshMs,
       int inputSchemasPerTableCacheMaximumSize,
+      boolean caseSensitive,
+      boolean dropUnusedColumns,
       Clock cacheRefreshClock) {
     this.catalog = catalog;
     this.refreshMs = refreshMs;
-    this.cacheRefreshClock = cacheRefreshClock;
     this.inputSchemasPerTableCacheMaximumSize = 
inputSchemasPerTableCacheMaximumSize;
     this.tableCache = new LRUCache<>(maximumSize);
+    this.caseSensitive = caseSensitive;
+    this.dropUnusedColumns = dropUnusedColumns;
+    this.cacheRefreshClock = cacheRefreshClock;
   }
 
   Tuple2<Boolean, Exception> exists(TableIdentifier identifier) {
@@ -90,8 +108,8 @@ class TableMetadataCache {
     return branch(identifier, branch, true);
   }
 
-  ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean 
dropUnusedColumns) {
-    return schema(identifier, input, true, dropUnusedColumns);
+  ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) {
+    return schema(identifier, input, true);
   }
 
   PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) {
@@ -125,7 +143,7 @@ class TableMetadataCache {
   }
 
   private ResolvedSchemaInfo schema(
-      TableIdentifier identifier, Schema input, boolean allowRefresh, boolean 
dropUnusedColumns) {
+      TableIdentifier identifier, Schema input, boolean allowRefresh) {
     CacheItem cached = tableCache.get(identifier);
     Schema compatible = null;
     if (cached != null && cached.tableExists) {
@@ -140,7 +158,8 @@ class TableMetadataCache {
 
       for (Map.Entry<Integer, Schema> tableSchema : 
cached.tableSchemas.entrySet()) {
         CompareSchemasVisitor.Result result =
-            CompareSchemasVisitor.visit(input, tableSchema.getValue(), true, 
dropUnusedColumns);
+            CompareSchemasVisitor.visit(
+                input, tableSchema.getValue(), caseSensitive, 
dropUnusedColumns);
         if (result == CompareSchemasVisitor.Result.SAME) {
           ResolvedSchemaInfo newResult =
               new ResolvedSchemaInfo(
@@ -158,7 +177,7 @@ class TableMetadataCache {
 
     if (needsRefresh(identifier, cached, allowRefresh)) {
       refreshTable(identifier);
-      return schema(identifier, input, false, dropUnusedColumns);
+      return schema(identifier, input, false);
     } else if (compatible != null) {
       ResolvedSchemaInfo newResult =
           new ResolvedSchemaInfo(
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
index d8809efbe5..b0bdad8ed1 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
@@ -43,11 +43,14 @@ class TableUpdater {
   private static final Logger LOG = 
LoggerFactory.getLogger(TableUpdater.class);
   private final TableMetadataCache cache;
   private final Catalog catalog;
+  private final boolean caseSensitive;
   private final boolean dropUnusedColumns;
 
-  TableUpdater(TableMetadataCache cache, Catalog catalog, boolean 
dropUnusedColumns) {
+  TableUpdater(
+      TableMetadataCache cache, Catalog catalog, boolean caseSensitive, 
boolean dropUnusedColumns) {
     this.cache = cache;
     this.catalog = catalog;
+    this.caseSensitive = caseSensitive;
     this.dropUnusedColumns = dropUnusedColumns;
   }
 
@@ -120,15 +123,14 @@ class TableUpdater {
 
   private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema(
       TableIdentifier identifier, Schema schema) {
-    TableMetadataCache.ResolvedSchemaInfo fromCache =
-        cache.schema(identifier, schema, dropUnusedColumns);
+    TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, 
schema);
     if (fromCache.compareResult() != 
CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
       return fromCache;
     } else {
       Table table = catalog.loadTable(identifier);
       Schema tableSchema = table.schema();
       CompareSchemasVisitor.Result result =
-          CompareSchemasVisitor.visit(schema, tableSchema, true, 
dropUnusedColumns);
+          CompareSchemasVisitor.visit(schema, tableSchema, caseSensitive, 
dropUnusedColumns);
       switch (result) {
         case SAME:
           cache.update(identifier, table);
@@ -145,20 +147,20 @@ class TableUpdater {
           LOG.info(
               "Triggering schema update for table {} {} to {}", identifier, 
tableSchema, schema);
           UpdateSchema updateApi = table.updateSchema();
-          EvolveSchemaVisitor.visit(identifier, updateApi, tableSchema, 
schema, dropUnusedColumns);
+          EvolveSchemaVisitor.visit(
+              identifier, updateApi, tableSchema, schema, caseSensitive, 
dropUnusedColumns);
 
           try {
             updateApi.commit();
             cache.update(identifier, table);
             TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration =
-                cache.schema(identifier, schema, dropUnusedColumns);
+                cache.schema(identifier, schema);
             Schema newSchema = comparisonAfterMigration.resolvedTableSchema();
             LOG.info("Table {} schema updated from {} to {}", identifier, 
tableSchema, newSchema);
             return comparisonAfterMigration;
           } catch (CommitFailedException e) {
             cache.invalidate(identifier);
-            TableMetadataCache.ResolvedSchemaInfo newSchema =
-                cache.schema(identifier, schema, dropUnusedColumns);
+            TableMetadataCache.ResolvedSchemaInfo newSchema = 
cache.schema(identifier, schema);
             if (newSchema.compareResult() != 
CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
               LOG.debug("Table {} schema updated concurrently to {}", 
identifier, schema);
               return newSchema;
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
index cc8e6898d2..9e4d600f93 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
@@ -33,6 +33,9 @@ import org.junit.jupiter.api.Test;
 
 class TestCompareSchemasVisitor {
 
+  private static final boolean CASE_SENSITIVE = true;
+  private static final boolean CASE_INSENSITIVE = false;
+
   private static final boolean DROP_COLUMNS = true;
   private static final boolean PRESERVE_COLUMNS = false;
 
@@ -47,7 +50,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(1, "id", IntegerType.get(), "comment"),
                     optional(2, "data", StringType.get()),
-                    optional(3, "extra", StringType.get()))))
+                    optional(3, "extra", StringType.get())),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
@@ -62,7 +67,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(1, "id", IntegerType.get()),
                     optional(2, "data", StringType.get()),
-                    optional(3, "extra", StringType.get()))))
+                    optional(3, "extra", StringType.get())),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
@@ -75,7 +82,9 @@ class TestCompareSchemasVisitor {
                     optional(1, "data", StringType.get()),
                     optional(2, "extra", StringType.get())),
                 new Schema(
-                    optional(0, "id", IntegerType.get()), optional(1, "data", 
StringType.get()))))
+                    optional(0, "id", IntegerType.get()), optional(1, "data", 
StringType.get())),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -88,7 +97,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(0, "id", IntegerType.get()),
                     optional(1, "data", StringType.get()),
-                    optional(2, "extra", StringType.get()))))
+                    optional(2, "extra", StringType.get())),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
   }
 
@@ -99,7 +110,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(1, "id", LongType.get()), optional(2, "extra", 
StringType.get())),
                 new Schema(
-                    optional(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()))))
+                    optional(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get())),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -110,7 +123,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get())),
                 new Schema(
-                    optional(1, "id", LongType.get()), optional(2, "extra", 
StringType.get()))))
+                    optional(1, "id", LongType.get()), optional(2, "extra", 
StringType.get())),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
   }
 
@@ -120,9 +135,11 @@ class TestCompareSchemasVisitor {
         new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
     Schema tableSchema =
         new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+    assertThat(
+            CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
-    assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema))
+    assertThat(
+            CompareSchemasVisitor.visit(tableSchema, dataSchema, 
CASE_SENSITIVE, PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
@@ -131,9 +148,11 @@ class TestCompareSchemasVisitor {
     Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()));
     Schema tableSchema =
         new Schema(optional(1, "id", IntegerType.get()), required(2, "extra", 
StringType.get()));
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+    assertThat(
+            CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
-    assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema))
+    assertThat(
+            CompareSchemasVisitor.visit(tableSchema, dataSchema, 
CASE_SENSITIVE, PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -142,7 +161,8 @@ class TestCompareSchemasVisitor {
     Schema dataSchema = new Schema(required(1, "id", IntegerType.get()));
     Schema tableSchema =
         new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", 
StringType.get()));
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+    assertThat(
+            CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
   }
 
@@ -155,8 +175,9 @@ class TestCompareSchemasVisitor {
                     optional(2, "struct1", StructType.of(optional(3, "extra", 
IntegerType.get())))),
                 new Schema(
                     optional(0, "id", IntegerType.get()),
-                    optional(
-                        1, "struct1", StructType.of(optional(2, "extra", 
IntegerType.get()))))))
+                    optional(1, "struct1", StructType.of(optional(2, "extra", 
IntegerType.get())))),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
@@ -169,8 +190,9 @@ class TestCompareSchemasVisitor {
                     optional(1, "struct1", StructType.of(optional(2, "extra", 
LongType.get())))),
                 new Schema(
                     optional(1, "id", IntegerType.get()),
-                    optional(
-                        2, "struct1", StructType.of(optional(3, "extra", 
IntegerType.get()))))))
+                    optional(2, "struct1", StructType.of(optional(3, "extra", 
IntegerType.get())))),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -185,7 +207,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(0, "id", IntegerType.get()),
                     optional(
-                        1, "map1", MapType.ofOptional(2, 3, IntegerType.get(), 
StringType.get())))))
+                        1, "map1", MapType.ofOptional(2, 3, IntegerType.get(), 
StringType.get()))),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
@@ -200,7 +224,9 @@ class TestCompareSchemasVisitor {
                 new Schema(
                     optional(1, "id", IntegerType.get()),
                     optional(
-                        2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), 
StringType.get())))))
+                        2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), 
StringType.get()))),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -213,7 +239,9 @@ class TestCompareSchemasVisitor {
                     optional(2, "list1", ListType.ofOptional(3, 
IntegerType.get()))),
                 new Schema(
                     optional(0, "id", IntegerType.get()),
-                    optional(1, "list1", ListType.ofOptional(2, 
IntegerType.get())))))
+                    optional(1, "list1", ListType.ofOptional(2, 
IntegerType.get()))),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SAME);
   }
 
@@ -226,10 +254,76 @@ class TestCompareSchemasVisitor {
                     optional(1, "list1", ListType.ofOptional(2, 
LongType.get()))),
                 new Schema(
                     optional(1, "id", IntegerType.get()),
-                    optional(2, "list1", ListType.ofOptional(3, 
IntegerType.get())))))
+                    optional(2, "list1", ListType.ofOptional(3, 
IntegerType.get()))),
+                CASE_SENSITIVE,
+                PRESERVE_COLUMNS))
+        .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
+  }
+
+  @Test
+  void testCaseInsensitiveFieldMatching() {
+    assertThat(
+            CompareSchemasVisitor.visit(
+                new Schema(
+                    optional(1, "ID", IntegerType.get()),
+                    optional(2, "Data", StringType.get()),
+                    optional(3, "EXTRA", StringType.get())),
+                new Schema(
+                    optional(1, "id", IntegerType.get()),
+                    optional(2, "data", StringType.get()),
+                    optional(3, "extra", StringType.get())),
+                CASE_INSENSITIVE,
+                PRESERVE_COLUMNS))
+        .isEqualTo(CompareSchemasVisitor.Result.SAME);
+  }
+
+  @Test
+  void testCaseSensitiveFieldMatchingDefault() {
+    assertThat(
+            CompareSchemasVisitor.visit(
+                new Schema(
+                    optional(1, "ID", IntegerType.get()),
+                    optional(2, "Data", StringType.get()),
+                    optional(3, "EXTRA", StringType.get())),
+                new Schema(
+                    optional(1, "id", IntegerType.get()),
+                    optional(2, "data", StringType.get()),
+                    optional(3, "extra", StringType.get())),
+                CASE_SENSITIVE,
+                DROP_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
+  @Test
+  void testCaseInsensitiveNestedStruct() {
+    assertThat(
+            CompareSchemasVisitor.visit(
+                new Schema(
+                    optional(1, "ID", IntegerType.get()),
+                    optional(2, "STRUCT1", StructType.of(optional(3, "NESTED", 
StringType.get())))),
+                new Schema(
+                    optional(1, "id", IntegerType.get()),
+                    optional(2, "struct1", StructType.of(optional(3, "nested", 
StringType.get())))),
+                CASE_INSENSITIVE,
+                PRESERVE_COLUMNS))
+        .isEqualTo(CompareSchemasVisitor.Result.SAME);
+  }
+
+  @Test
+  void testCaseInsensitiveWithMoreColumns() {
+    assertThat(
+            CompareSchemasVisitor.visit(
+                new Schema(
+                    optional(0, "ID", IntegerType.get()), optional(1, "DATA", 
StringType.get())),
+                new Schema(
+                    optional(0, "id", IntegerType.get()),
+                    optional(1, "data", StringType.get()),
+                    optional(2, "extra", StringType.get())),
+                CASE_INSENSITIVE,
+                PRESERVE_COLUMNS))
+        .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+  }
+
   @Test
   void testDropUnusedColumnsEnabled() {
     Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()));
@@ -239,7 +333,7 @@ class TestCompareSchemasVisitor {
             optional(2, "data", StringType.get()),
             optional(3, "extra", StringType.get()));
 
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, 
DROP_COLUMNS))
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, DROP_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -249,7 +343,7 @@ class TestCompareSchemasVisitor {
     Schema tableSchema =
         new Schema(optional(1, "id", IntegerType.get()), required(2, "data", 
StringType.get()));
 
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, 
DROP_COLUMNS))
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, DROP_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -262,7 +356,7 @@ class TestCompareSchemasVisitor {
             optional(3, "extra", StringType.get()));
     Schema tableSchema = new Schema(optional(1, "id", IntegerType.get()));
 
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, 
DROP_COLUMNS))
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, DROP_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
   }
 
@@ -282,10 +376,11 @@ class TestCompareSchemasVisitor {
                     optional(3, "field1", StringType.get()),
                     optional(4, "field2", IntegerType.get()))));
 
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, 
DROP_COLUMNS))
+    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, DROP_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
 
-    assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, 
PRESERVE_COLUMNS))
+    assertThat(
+            CompareSchemasVisitor.visit(dataSchema, tableSchema, 
CASE_SENSITIVE, PRESERVE_COLUMNS))
         .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
   }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 0c07bc9461..d9602e12eb 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -977,6 +977,91 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     assertThat(records).hasSize(2);
   }
 
+  @Test
+  void testCaseInsensitiveSchemaMatching() throws Exception {
+    Schema lowerCaseSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+    Schema upperCaseSchema =
+        new Schema(
+            Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "DATA", Types.StringType.get()));
+
+    Schema mixedCaseSchema =
+        new Schema(
+            Types.NestedField.optional(1, "Id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "Data", Types.StringType.get()));
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                lowerCaseSchema, "t1", "main", PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                upperCaseSchema, "t1", "main", PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                mixedCaseSchema, "t1", "main", PartitionSpec.unpartitioned()));
+
+    DataStream<DynamicIcebergDataImpl> dataStream =
+        env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+    env.setParallelism(2);
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .writeParallelism(2)
+        .immediateTableUpdate(true)
+        .caseSensitive(false)
+        .append();
+
+    env.execute("Test Case Insensitive Iceberg DataStream");
+
+    verifyResults(rows);
+  }
+
+  @Test
+  void testCaseSensitiveSchemaMatchingCreatesNewFields() throws Exception {
+    Schema lowerCaseSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+    Schema upperCaseSchema =
+        new Schema(
+            Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "DATA", Types.StringType.get()));
+
+    List<DynamicIcebergDataImpl> rows =
+        Lists.newArrayList(
+            new DynamicIcebergDataImpl(
+                lowerCaseSchema, "t1", "main", PartitionSpec.unpartitioned()),
+            new DynamicIcebergDataImpl(
+                upperCaseSchema, "t1", "main", PartitionSpec.unpartitioned()));
+
+    DataStream<DynamicIcebergDataImpl> dataStream =
+        env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+    env.setParallelism(2);
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .writeParallelism(2)
+        .immediateTableUpdate(true)
+        .caseSensitive(true)
+        .append();
+
+    env.execute("Test Case Sensitive Iceberg DataStream");
+
+    Table table = 
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1"));
+    Schema resultSchema = table.schema();
+    assertThat(resultSchema.columns()).hasSize(4);
+    assertThat(resultSchema.findField("id")).isNotNull();
+    assertThat(resultSchema.findField("ID")).isNotNull();
+    assertThat(resultSchema.findField("data")).isNotNull();
+    assertThat(resultSchema.findField("DATA")).isNotNull();
+  }
+
   /**
    * Represents a concurrent duplicate commit during an ongoing commit 
operation, which can happen
    * in production scenarios when using REST catalog.
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
index d68dd58c08..1c8e6df859 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
@@ -32,9 +32,17 @@ import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 class TestDynamicTableUpdateOperator {
 
+  private static final boolean CASE_SENSITIVE = true;
+  private static final boolean CASE_INSENSITIVE = false;
+
+  private static final boolean DROP_COLUMNS = true;
+  private static final boolean PRESERVE_COLUMNS = false;
+
   @RegisterExtension
   private static final HadoopCatalogExtension CATALOG_EXTENSION =
       new HadoopCatalogExtension(DATABASE, TABLE);
@@ -59,11 +67,12 @@ class TestDynamicTableUpdateOperator {
     DynamicTableUpdateOperator operator =
         new DynamicTableUpdateOperator(
             CATALOG_EXTENSION.catalogLoader(),
-            false,
             cacheMaximumSize,
             cacheRefreshMs,
             inputSchemaCacheMaximumSize,
-            TableCreator.DEFAULT);
+            TableCreator.DEFAULT,
+            CASE_SENSITIVE,
+            PRESERVE_COLUMNS);
     operator.open(null);
 
     DynamicRecordInternal input =
@@ -93,11 +102,12 @@ class TestDynamicTableUpdateOperator {
     DynamicTableUpdateOperator operator =
         new DynamicTableUpdateOperator(
             CATALOG_EXTENSION.catalogLoader(),
-            false,
             cacheMaximumSize,
             cacheRefreshMs,
             inputSchemaCacheMaximumSize,
-            TableCreator.DEFAULT);
+            TableCreator.DEFAULT,
+            CASE_SENSITIVE,
+            PRESERVE_COLUMNS);
     operator.open(null);
 
     catalog.createTable(table, SCHEMA1);
@@ -122,6 +132,59 @@ class TestDynamicTableUpdateOperator {
     
assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCaseInSensitivity(boolean caseSensitive) throws Exception {
+    int cacheMaximumSize = 10;
+    int cacheRefreshMs = 1000;
+    int inputSchemaCacheMaximumSize = 10;
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier table = TableIdentifier.of(TABLE);
+
+    Schema initialSchema = new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+    Schema caseSensitiveSchema =
+        new Schema(Types.NestedField.required(1, "Id", 
Types.IntegerType.get()));
+
+    DynamicTableUpdateOperator operator =
+        new DynamicTableUpdateOperator(
+            CATALOG_EXTENSION.catalogLoader(),
+            cacheMaximumSize,
+            cacheRefreshMs,
+            inputSchemaCacheMaximumSize,
+            TableCreator.DEFAULT,
+            caseSensitive,
+            PRESERVE_COLUMNS);
+    operator.open(null);
+
+    catalog.createTable(table, initialSchema);
+    DynamicRecordInternal input =
+        new DynamicRecordInternal(
+            TABLE,
+            "branch",
+            caseSensitiveSchema,
+            GenericRowData.of(1, "test"),
+            PartitionSpec.unpartitioned(),
+            42,
+            false,
+            Collections.emptySet());
+    DynamicRecordInternal output = operator.map(input);
+
+    if (caseSensitive) {
+      // Schema changes due to case sensitivity
+      Schema expectedSchema =
+          new Schema(
+              Types.NestedField.optional(2, "Id", Types.IntegerType.get()),
+              Types.NestedField.optional(1, "id", Types.IntegerType.get()));
+      Schema tableSchema = catalog.loadTable(table).schema();
+      assertThat(tableSchema.sameSchema(expectedSchema)).isTrue();
+      assertThat(output.schema().sameSchema(expectedSchema)).isTrue();
+    } else {
+      // No schema change due to case insensitivity
+      
assertThat(catalog.loadTable(table).schema().sameSchema(initialSchema)).isTrue();
+      assertThat(output.schema().sameSchema(initialSchema)).isTrue();
+    }
+  }
+
   @Test
   void testDynamicTableUpdateOperatorPreserveUnusedColumns() throws Exception {
     int cacheMaximumSize = 10;
@@ -133,11 +196,12 @@ class TestDynamicTableUpdateOperator {
     DynamicTableUpdateOperator operator =
         new DynamicTableUpdateOperator(
             CATALOG_EXTENSION.catalogLoader(),
-            false, // dropUnusedColumns = false (default)
             cacheMaximumSize,
             cacheRefreshMs,
             inputSchemaCacheMaximumSize,
-            TableCreator.DEFAULT);
+            TableCreator.DEFAULT,
+            CASE_SENSITIVE,
+            PRESERVE_COLUMNS);
     operator.open(null);
 
     catalog.createTable(table, SCHEMA2);
@@ -173,12 +237,12 @@ class TestDynamicTableUpdateOperator {
     DynamicTableUpdateOperator operator =
         new DynamicTableUpdateOperator(
             CATALOG_EXTENSION.catalogLoader(),
-            // Drop unused columns
-            true,
             cacheMaximumSize,
             cacheRefreshMs,
             inputSchemaCacheMaximumSize,
-            TableCreator.DEFAULT);
+            TableCreator.DEFAULT,
+            CASE_INSENSITIVE,
+            DROP_COLUMNS);
     operator.open(null);
 
     catalog.createTable(table, SCHEMA2);
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
index 027adc4031..d2da73c669 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
@@ -50,6 +50,10 @@ import org.junit.jupiter.api.Test;
 public class TestEvolveSchemaVisitor {
 
   private static final TableIdentifier TABLE = TableIdentifier.of("table");
+
+  private static final boolean CASE_SENSITIVE = true;
+  private static final boolean CASE_INSENSITIVE = false;
+
   private static final boolean DROP_COLUMNS = true;
   private static final boolean PRESERVE_COLUMNS = false;
 
@@ -94,7 +98,8 @@ public class TestEvolveSchemaVisitor {
   public void testAddTopLevelPrimitives() {
     Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes()));
     UpdateSchema updateApi = loadUpdateApi(new Schema());
-    EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct());
   }
 
@@ -104,7 +109,8 @@ public class TestEvolveSchemaVisitor {
     
assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue();
 
     UpdateSchema updateApi = loadUpdateApi(existingSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, new Schema(), 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, new Schema(), CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     Schema newSchema = updateApi.apply();
     assertThat(newSchema.asStruct().fields()).hasSize(14);
     
assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue();
@@ -129,7 +135,8 @@ public class TestEvolveSchemaVisitor {
             optional(2, "b", StructType.of(optional(5, "nested2", 
StringType.get()))));
 
     UpdateSchema updateApi = loadUpdateApi(existingSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
DROP_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, 
DROP_COLUMNS);
 
     Schema newSchema = updateApi.apply();
     assertThat(newSchema.sameSchema(targetSchema)).isTrue();
@@ -151,7 +158,8 @@ public class TestEvolveSchemaVisitor {
     Schema targetSchema = new Schema(optional(1, "a", StringType.get()));
 
     UpdateSchema updateApi = loadUpdateApi(existingSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
 
     Schema newSchema = updateApi.apply();
     assertThat(newSchema.sameSchema(existingSchema)).isTrue();
@@ -164,7 +172,8 @@ public class TestEvolveSchemaVisitor {
     UpdateSchema updateApi = loadUpdateApi(existingSchema);
     Schema newSchema =
         new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", 
Types.LongType.get())));
-    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, newSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, newSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue();
   }
 
@@ -177,7 +186,8 @@ public class TestEvolveSchemaVisitor {
         new Schema(
             Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", 
StringType.get())));
     UpdateSchema updateApi = loadUpdateApi(existingSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -186,7 +196,8 @@ public class TestEvolveSchemaVisitor {
     for (PrimitiveType primitiveType : primitiveTypes()) {
       Schema targetSchema = new Schema(optional(1, "aList", 
ListType.ofOptional(2, primitiveType)));
       UpdateSchema updateApi = loadUpdateApi(new Schema());
-      EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
PRESERVE_COLUMNS);
+      EvolveSchemaVisitor.visit(
+          TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
       
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
     }
   }
@@ -198,7 +209,8 @@ public class TestEvolveSchemaVisitor {
           new Schema(optional(1, "aList", ListType.ofRequired(2, 
primitiveType)));
       Schema targetSchema = new Schema();
       UpdateSchema updateApi = loadUpdateApi(existingSchema);
-      EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, 
targetSchema, PRESERVE_COLUMNS);
+      EvolveSchemaVisitor.visit(
+          TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
       Schema expectedSchema =
           new Schema(optional(1, "aList", ListType.ofRequired(2, 
primitiveType)));
       
assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct());
@@ -211,7 +223,8 @@ public class TestEvolveSchemaVisitor {
       Schema targetSchema =
           new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, 
primitiveType, primitiveType)));
       UpdateSchema updateApi = loadUpdateApi(new Schema());
-      EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
PRESERVE_COLUMNS);
+      EvolveSchemaVisitor.visit(
+          TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
       
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
     }
   }
@@ -223,7 +236,8 @@ public class TestEvolveSchemaVisitor {
           new Schema(
               optional(1, "aStruct", StructType.of(optional(2, "primitive", 
primitiveType))));
       UpdateSchema updateApi = loadUpdateApi(new Schema());
-      EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), currentSchema, 
PRESERVE_COLUMNS);
+      EvolveSchemaVisitor.visit(
+          TABLE, updateApi, new Schema(), currentSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
       
assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct());
     }
   }
@@ -236,7 +250,8 @@ public class TestEvolveSchemaVisitor {
           new Schema(
               optional(1, "aStruct", StructType.of(optional(2, "primitive", 
primitiveType))));
       UpdateSchema updateApi = loadUpdateApi(currentSchema);
-      EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+      EvolveSchemaVisitor.visit(
+          TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
       
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
     }
   }
@@ -251,7 +266,8 @@ public class TestEvolveSchemaVisitor {
           new Schema(
               optional(1, "aStruct", StructType.of(optional(2, "primitive", 
primitiveType))));
       UpdateSchema updateApi = loadUpdateApi(currentSchema);
-      EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+      EvolveSchemaVisitor.visit(
+          TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
       
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
     }
   }
@@ -262,7 +278,8 @@ public class TestEvolveSchemaVisitor {
     Schema targetSchema =
         new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, 
primitiveTypes()))));
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -292,7 +309,8 @@ public class TestEvolveSchemaVisitor {
                                                 ListType.ofOptional(
                                                     10, DecimalType.of(11, 
20))))))))))));
     UpdateSchema updateApi = loadUpdateApi(new Schema());
-    EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -331,7 +349,8 @@ public class TestEvolveSchemaVisitor {
                                                                 "aString",
                                                                 
StringType.get()))))))))))))));
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -366,7 +385,8 @@ public class TestEvolveSchemaVisitor {
                                         12, 13, StringType.get(), 
StringType.get()))))))));
 
     UpdateSchema updateApi = loadUpdateApi(new Schema());
-    EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -382,6 +402,7 @@ public class TestEvolveSchemaVisitor {
                     loadUpdateApi(currentSchema),
                     currentSchema,
                     targetSchema,
+                    true,
                     PRESERVE_COLUMNS))
         .hasMessage("Cannot change column type: aList.element: string -> long")
         .isInstanceOf(IllegalArgumentException.class);
@@ -403,6 +424,7 @@ public class TestEvolveSchemaVisitor {
                     loadUpdateApi(currentSchema),
                     currentSchema,
                     targetSchema,
+                    true,
                     PRESERVE_COLUMNS))
         .hasMessage("Cannot change column type: aMap.value: string -> long")
         .isInstanceOf(IllegalArgumentException.class);
@@ -422,6 +444,7 @@ public class TestEvolveSchemaVisitor {
                     loadUpdateApi(currentSchema),
                     currentSchema,
                     targetSchema,
+                    true,
                     PRESERVE_COLUMNS))
         .hasMessage("Cannot change column type: aMap.key: string -> uuid")
         .isInstanceOf(IllegalArgumentException.class);
@@ -434,7 +457,8 @@ public class TestEvolveSchemaVisitor {
     Schema targetSchema = new Schema(required(1, "aCol", LongType.get()));
 
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     Schema applied = updateApi.apply();
     assertThat(applied.asStruct().fields()).hasSize(1);
     
assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get());
@@ -447,7 +471,8 @@ public class TestEvolveSchemaVisitor {
     Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get()));
 
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     Schema applied = updateApi.apply();
     assertThat(applied.asStruct().fields()).hasSize(1);
     
assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get());
@@ -464,6 +489,7 @@ public class TestEvolveSchemaVisitor {
                     loadUpdateApi(currentSchema),
                     currentSchema,
                     targetSchema,
+                    true,
                     PRESERVE_COLUMNS))
         .hasMessage("Cannot change column type: aCol: double -> float")
         .isInstanceOf(IllegalArgumentException.class);
@@ -477,7 +503,8 @@ public class TestEvolveSchemaVisitor {
     Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 
1)));
 
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -520,7 +547,8 @@ public class TestEvolveSchemaVisitor {
                                         optional(6, "time", 
TimeType.get())))))))));
 
     UpdateSchema updateApi = loadUpdateApi(existingSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -536,6 +564,7 @@ public class TestEvolveSchemaVisitor {
                     loadUpdateApi(currentSchema),
                     currentSchema,
                     targetSchema,
+                    true,
                     PRESERVE_COLUMNS))
         .hasMessage("Cannot change column type: aColumn: list<string> -> 
string")
         .isInstanceOf(IllegalArgumentException.class);
@@ -573,7 +602,8 @@ public class TestEvolveSchemaVisitor {
                     optional(7, "d1", StructType.of(optional(8, "d2", 
StringType.get()))))));
 
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -625,7 +655,8 @@ public class TestEvolveSchemaVisitor {
                                                                 
StringType.get()))))))))))))));
 
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
   }
 
@@ -645,7 +676,8 @@ public class TestEvolveSchemaVisitor {
                             optional(
                                 3, "s3", StructType.of(optional(4, "s4", 
StringType.get()))))))));
     UpdateSchema updateApi = loadUpdateApi(currentSchema);
-    EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, 
PRESERVE_COLUMNS);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
     assertThat(getNestedSchemaWithOptionalModifier(true).asStruct())
         .isEqualTo(updateApi.apply().asStruct());
   }
@@ -682,6 +714,82 @@ public class TestEvolveSchemaVisitor {
                                                             9, "s4", 
StringType.get()))))))))))))));
   }
 
+  @Test
+  public void testCaseInsensitiveAddField() {
+    Schema existingSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "name", Types.StringType.get()));
+    Schema targetSchema =
+        new Schema(
+            Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "NAME", Types.StringType.get()),
+            Types.NestedField.optional(3, "AGE", Types.IntegerType.get()));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE, 
PRESERVE_COLUMNS);
+    Schema result = updateApi.apply();
+    assertThat(result.columns()).hasSize(3);
+    assertThat(result.findField("AGE")).isNotNull();
+  }
+
+  @Test
+  public void testCaseInsensitiveMakeFieldOptional() {
+    Schema existingSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "name", Types.StringType.get()));
+    Schema targetSchema = new Schema(Types.NestedField.optional(1, "ID", 
Types.IntegerType.get()));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE, 
PRESERVE_COLUMNS);
+    Schema result = updateApi.apply();
+    assertThat(result.findField("name").isOptional()).isTrue();
+  }
+
+  @Test
+  public void testCaseInsensitiveNestedStructField() {
+    Schema existingSchema =
+        new Schema(
+            optional(1, "struct1", StructType.of(optional(2, "field1", 
Types.StringType.get()))));
+    Schema targetSchema =
+        new Schema(
+            optional(
+                1,
+                "STRUCT1",
+                StructType.of(
+                    optional(2, "FIELD1", Types.StringType.get()),
+                    optional(3, "FIELD2", Types.IntegerType.get()))));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE, 
PRESERVE_COLUMNS);
+    Schema result = updateApi.apply();
+    Types.StructType struct = 
result.findField("struct1").type().asStructType();
+    assertThat(struct.fields()).hasSize(2);
+    assertThat(struct.field("FIELD2")).isNotNull();
+  }
+
+  @Test
+  public void testCaseSensitiveDoesNotMatch() {
+    Schema existingSchema =
+        new Schema(Types.NestedField.optional(1, "id", 
Types.IntegerType.get()));
+    Schema targetSchema =
+        new Schema(
+            Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "name", Types.StringType.get()));
+
+    UpdateSchema updateApi = loadUpdateApi(existingSchema);
+    EvolveSchemaVisitor.visit(
+        TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
+    Schema result = updateApi.apply();
+    assertThat(result.columns()).hasSize(3);
+    assertThat(result.findField("ID")).isNotNull();
+    assertThat(result.findField("id")).isNotNull();
+  }
+
   private static UpdateSchema loadUpdateApi(Schema schema) {
     try {
       Constructor<?> constructor =
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index d696059902..8a17c707f8 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -36,6 +36,12 @@ import org.junit.jupiter.api.Test;
 
 public class TestTableMetadataCache extends TestFlinkIcebergSinkBase {
 
+  private static final boolean CASE_SENSITIVE = true;
+  private static final boolean CASE_INSENSITIVE = false;
+
+  private static final boolean DROP_COLUMNS = true;
+  private static final boolean PRESERVE_COLUMNS = false;
+
   static final Schema SCHEMA =
       new Schema(
           Types.NestedField.optional(1, "id", Types.IntegerType.get()),
@@ -47,29 +53,35 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
           Types.NestedField.optional(2, "data", Types.StringType.get()),
           Types.NestedField.optional(3, "extra", Types.StringType.get()));
 
+  static final Schema SCHEMA_UPPERCASE =
+      new Schema(
+          Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "DATA", Types.StringType.get()));
+
+  static final Schema SCHEMA_MIXEDCASE =
+      new Schema(
+          Types.NestedField.optional(1, "Id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "Data", Types.StringType.get()));
+
   @Test
   void testCaching() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
     catalog.createTable(tableIdentifier, SCHEMA);
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
-    Schema schema1 = cache.schema(tableIdentifier, SCHEMA, 
false).resolvedTableSchema();
+    Schema schema1 = cache.schema(tableIdentifier, 
SCHEMA).resolvedTableSchema();
     assertThat(schema1.sameSchema(SCHEMA)).isTrue();
     assertThat(
-            cache
-                .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), 
false)
-                .resolvedTableSchema())
+            cache.schema(tableIdentifier, 
SerializationUtils.clone(SCHEMA)).resolvedTableSchema())
         .isEqualTo(schema1);
 
-    assertThat(cache.schema(tableIdentifier, SCHEMA2, false))
-        .isEqualTo(TableMetadataCache.NOT_FOUND);
+    assertThat(cache.schema(tableIdentifier, 
SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND);
 
-    schema1 = cache.schema(tableIdentifier, SCHEMA, 
false).resolvedTableSchema();
+    schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema();
     assertThat(
-            cache
-                .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), 
false)
-                .resolvedTableSchema())
+            cache.schema(tableIdentifier, 
SerializationUtils.clone(SCHEMA)).resolvedTableSchema())
         .isEqualTo(schema1);
   }
 
@@ -78,10 +90,11 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
     catalog.createTable(tableIdentifier, SCHEMA);
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
-    Schema schema1 = cache.schema(tableIdentifier, SCHEMA, 
false).resolvedTableSchema();
+    Schema schema1 = cache.schema(tableIdentifier, 
SCHEMA).resolvedTableSchema();
     assertThat(schema1.sameSchema(SCHEMA)).isTrue();
 
     catalog.dropTable(tableIdentifier);
@@ -93,7 +106,7 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
         PartitionSpec.unpartitioned(),
         TableCreator.DEFAULT);
 
-    Schema schema2 = cache.schema(tableIdentifier, SCHEMA2, 
false).resolvedTableSchema();
+    Schema schema2 = cache.schema(tableIdentifier, 
SCHEMA2).resolvedTableSchema();
     assertThat(schema2.sameSchema(SCHEMA2)).isTrue();
   }
 
@@ -102,7 +115,8 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
     catalog.createTable(tableIdentifier, SCHEMA);
-    TableMetadataCache cache = new TableMetadataCache(catalog, 0, 
Long.MAX_VALUE, 10);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10, CASE_SENSITIVE, 
PRESERVE_COLUMNS);
 
     assertThat(cache.getInternalCache()).isEmpty();
   }
@@ -117,15 +131,21 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
     // Init cache
     TableMetadataCache cache =
         new TableMetadataCache(
-            catalog, 10, 100L, 10, Clock.fixed(Instant.now(), 
ZoneId.systemDefault()));
+            catalog,
+            10,
+            100L,
+            10,
+            CASE_INSENSITIVE,
+            PRESERVE_COLUMNS,
+            Clock.fixed(Instant.now(), ZoneId.systemDefault()));
     cache.update(tableIdentifier, table);
 
     // Cache schema
-    Schema schema = cache.schema(tableIdentifier, SCHEMA2, 
false).resolvedTableSchema();
+    Schema schema = cache.schema(tableIdentifier, 
SCHEMA2).resolvedTableSchema();
     assertThat(schema.sameSchema(SCHEMA2)).isTrue();
 
     // Cache schema with fewer fields
-    TableMetadataCache.ResolvedSchemaInfo schemaInfo = 
cache.schema(tableIdentifier, SCHEMA, false);
+    TableMetadataCache.ResolvedSchemaInfo schemaInfo = 
cache.schema(tableIdentifier, SCHEMA);
     assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
     assertThat(schemaInfo.compareResult())
         .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
@@ -140,9 +160,10 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
   void testNoSuchNamespaceExceptionHandling() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = 
TableIdentifier.of("nonexistent_namespace", "myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
-    TableMetadataCache.ResolvedSchemaInfo result = 
cache.schema(tableIdentifier, SCHEMA, false);
+    TableMetadataCache.ResolvedSchemaInfo result = 
cache.schema(tableIdentifier, SCHEMA);
 
     assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
     assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
@@ -152,11 +173,48 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
   void testNoSuchTableExceptionHandling() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = 
TableIdentifier.parse("default.nonexistent_table");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
-    TableMetadataCache.ResolvedSchemaInfo result = 
cache.schema(tableIdentifier, SCHEMA, false);
+    TableMetadataCache.ResolvedSchemaInfo result = 
cache.schema(tableIdentifier, SCHEMA);
 
     assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
     assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
   }
+
+  @Test
+  void testCaseInsensitiveCaching() {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+    catalog.createTable(tableIdentifier, SCHEMA);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_INSENSITIVE, PRESERVE_COLUMNS);
+
+    Schema schema1 = cache.schema(tableIdentifier, 
SCHEMA).resolvedTableSchema();
+    assertThat(schema1.sameSchema(SCHEMA)).isTrue();
+
+    Schema schemaUpperCase = cache.schema(tableIdentifier, 
SCHEMA_UPPERCASE).resolvedTableSchema();
+    assertThat(schemaUpperCase).isEqualTo(schema1);
+
+    Schema schemaMixedCase = cache.schema(tableIdentifier, 
SCHEMA_MIXEDCASE).resolvedTableSchema();
+    assertThat(schemaMixedCase).isEqualTo(schema1);
+  }
+
+  @Test
+  void testCaseSensitiveCachingDoesNotMatch() {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+    catalog.createTable(tableIdentifier, SCHEMA);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+
+    Schema schema1 = cache.schema(tableIdentifier, 
SCHEMA).resolvedTableSchema();
+    assertThat(schema1.sameSchema(SCHEMA)).isTrue();
+
+    assertThat(cache.schema(tableIdentifier, SCHEMA_UPPERCASE))
+        .isEqualTo(TableMetadataCache.NOT_FOUND);
+
+    assertThat(cache.schema(tableIdentifier, SCHEMA_MIXEDCASE))
+        .isEqualTo(TableMetadataCache.NOT_FOUND);
+  }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
index c0b376d30e..bdc825b44f 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
@@ -36,9 +36,17 @@ import org.apache.iceberg.inmemory.InMemoryCatalog;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestTableUpdater extends TestFlinkIcebergSinkBase {
 
+  private static final boolean CASE_SENSITIVE = true;
+  private static final boolean CASE_INSENSITIVE = false;
+
+  private static final boolean DROP_COLUMNS = true;
+  private static final boolean PRESERVE_COLUMNS = false;
+
   static final Schema SCHEMA =
       new Schema(
           Types.NestedField.optional(1, "id", Types.IntegerType.get()),
@@ -57,8 +65,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     catalog.initialize("catalog", Map.of());
     catalog.createNamespace(Namespace.of("myNamespace"));
     TableIdentifier tableIdentifier = 
TableIdentifier.parse("myNamespace.myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     String locationOverride = tempDir.toString() + "/custom-path";
     Map<String, String> tableProperties = Map.of("key", "value");
@@ -75,8 +84,7 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     assertThat(catalog.tableExists(tableIdentifier)).isTrue();
     
assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value");
     
assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride);
-    TableMetadataCache.ResolvedSchemaInfo cachedSchema =
-        cache.schema(tableIdentifier, SCHEMA, false);
+    TableMetadataCache.ResolvedSchemaInfo cachedSchema = 
cache.schema(tableIdentifier, SCHEMA);
     assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
   }
 
@@ -84,8 +92,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
   void testTableAlreadyExists() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     // Make the table non-existent in cache
     cache.exists(tableIdentifier);
@@ -108,8 +117,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
   void testBranchCreationAndCaching() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     catalog.createTable(tableIdentifier, SCHEMA);
     tableUpdater.update(
@@ -126,8 +136,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
   void testSpecCreation() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 
10).build();
     tableUpdater.update(
@@ -143,9 +154,10 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
     catalog.createTable(tableIdentifier, SCHEMA);
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    cache.schema(tableIdentifier, SCHEMA, false);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    cache.schema(tableIdentifier, SCHEMA);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     Schema updated =
         tableUpdater
@@ -158,8 +170,7 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
             .f0
             .resolvedTableSchema();
     assertThat(updated.sameSchema(SCHEMA2)).isTrue();
-    assertThat(
-            cache.schema(tableIdentifier, SCHEMA2, 
false).resolvedTableSchema().sameSchema(SCHEMA2))
+    assertThat(cache.schema(tableIdentifier, 
SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2))
         .isTrue();
   }
 
@@ -168,8 +179,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
     catalog.createTable(tableIdentifier, SCHEMA);
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     // Initialize cache
     tableUpdater.update(
@@ -184,7 +196,7 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     catalog.createTable(tableIdentifier, SCHEMA2);
 
     // Cache still stores the old information
-    assertThat(cache.schema(tableIdentifier, SCHEMA2, false).compareResult())
+    assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult())
         .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
 
     assertThat(
@@ -204,14 +216,59 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
         .doesNotContainKey(SCHEMA2);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCaseSensitivity(boolean caseSensitive) {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, caseSensitive, 
DROP_COLUMNS);
+
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
caseSensitive, DROP_COLUMNS);
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "data", Types.StringType.get()),
+            Types.NestedField.optional(3, "extra", Types.StringType.get()));
+
+    catalog.createTable(tableIdentifier, schema);
+
+    Schema schemaWithUpperCase =
+        new Schema(
+            Types.NestedField.optional(1, "Id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "Data", Types.StringType.get()),
+            Types.NestedField.optional(3, "Extra", Types.StringType.get()));
+
+    Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
+        tableUpdater.update(
+            tableIdentifier,
+            SnapshotRef.MAIN_BRANCH,
+            schemaWithUpperCase,
+            PartitionSpec.unpartitioned(),
+            TableCreator.DEFAULT);
+
+    
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
+
+    Schema tableSchema = catalog.loadTable(tableIdentifier).schema();
+    if (caseSensitive) {
+      assertThat(tableSchema.columns()).hasSize(3);
+      assertThat(tableSchema.findField("Id")).isNotNull();
+      assertThat(tableSchema.findField("Data")).isNotNull();
+      assertThat(tableSchema.findField("Extra")).isNotNull();
+    } else {
+      assertThat(tableSchema.sameSchema(schema)).isTrue();
+    }
+  }
+
   @Test
   void testDropUnusedColumns() {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, DROP_COLUMNS);
 
-    final boolean dropUnusedColumns = true;
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
dropUnusedColumns);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, DROP_COLUMNS);
 
     catalog.createTable(tableIdentifier, SCHEMA2);
 
@@ -236,8 +293,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     Catalog catalog = CATALOG_EXTENSION.catalog();
     SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
     TableIdentifier tableIdentifier = TableIdentifier.of("new_namespace", 
"myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isFalse();
     assertThat(catalog.tableExists(tableIdentifier)).isFalse();
@@ -265,8 +323,9 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     namespaceCatalog.createNamespace(namespace);
 
     TableIdentifier tableIdentifier = TableIdentifier.of("existing_namespace", 
"myTable");
-    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
-    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+    TableMetadataCache cache =
+        new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, 
CASE_SENSITIVE, PRESERVE_COLUMNS);
 
     assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
     assertThat(catalog.tableExists(tableIdentifier)).isFalse();


Reply via email to