This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b7be0cb4a1 [Fix][Connector-V2] Fixed iceberg sink can not handle 
uppercase fields (#7660)
b7be0cb4a1 is described below

commit b7be0cb4a1ac45bb08c1a7c1bc69bba28ea87382
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Sep 23 10:30:32 2024 +0800

    [Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields 
(#7660)
---
 .../seatunnel/iceberg/sink/IcebergSink.java        | 75 +---------------------
 .../iceberg/sink/writer/IcebergWriterFactory.java  |  8 ++-
 .../e2e/connector/iceberg/IcebergSinkCDCIT.java    |  4 +-
 3 files changed, 10 insertions(+), 77 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index a1d43d6acf..417d92ac49 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -30,10 +30,6 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -48,7 +44,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 
@@ -68,7 +63,7 @@ public class IcebergSink
     public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) 
{
         this.readonlyConfig = pluginConfig;
         this.config = new SinkConfig(pluginConfig);
-        this.catalogTable = convertLowerCaseCatalogTable(catalogTable);
+        this.catalogTable = catalogTable;
         // Reset primary keys if need
         if (config.getPrimaryKeys().isEmpty()
                 && 
Objects.nonNull(this.catalogTable.getTableSchema().getPrimaryKey())) {
@@ -138,72 +133,4 @@ public class IcebergSink
                         catalogTable,
                         null));
     }
-
-    private CatalogTable convertLowerCaseCatalogTable(CatalogTable 
catalogTable) {
-        TableSchema tableSchema = catalogTable.getTableSchema();
-        TableSchema.Builder builder = TableSchema.builder();
-        tableSchema
-                .getColumns()
-                .forEach(
-                        column -> {
-                            PhysicalColumn physicalColumn =
-                                    PhysicalColumn.of(
-                                            column.getName(),
-                                            column.getDataType(),
-                                            column.getColumnLength(),
-                                            column.isNullable(),
-                                            column.getDefaultValue(),
-                                            column.getComment());
-                            builder.column(physicalColumn);
-                        });
-        // set primary
-        if (Objects.nonNull(tableSchema.getPrimaryKey())) {
-            PrimaryKey newPrimaryKey =
-                    PrimaryKey.of(
-                            tableSchema.getPrimaryKey().getPrimaryKey(),
-                            
tableSchema.getPrimaryKey().getColumnNames().stream()
-                                    .map(String::toLowerCase)
-                                    .collect(Collectors.toList()));
-            builder.primaryKey(newPrimaryKey);
-        }
-
-        if (Objects.nonNull(tableSchema.getConstraintKeys())) {
-            tableSchema
-                    .getConstraintKeys()
-                    .forEach(
-                            constraintKey -> {
-                                ConstraintKey newConstraintKey =
-                                        ConstraintKey.of(
-                                                
constraintKey.getConstraintType(),
-                                                
constraintKey.getConstraintName(),
-                                                constraintKey.getColumnNames() 
!= null
-                                                        ? 
constraintKey.getColumnNames().stream()
-                                                                .map(
-                                                                        
constraintKeyColumn ->
-                                                                               
 ConstraintKey
-                                                                               
         .ConstraintKeyColumn
-                                                                               
         .of(
-                                                                               
                 constraintKeyColumn
-                                                                               
                                         .getColumnName()
-                                                                               
                                 != null
-                                                                               
                         ? constraintKeyColumn
-                                                                               
                                 .getColumnName()
-                                                                               
                                 .toLowerCase()
-                                                                               
                         : null,
-                                                                               
                 constraintKeyColumn
-                                                                               
                         .getSortType()))
-                                                                
.collect(Collectors.toList())
-                                                        : null);
-                                builder.constraintKey(newConstraintKey);
-                            });
-        }
-
-        return CatalogTable.of(
-                catalogTable.getTableId(),
-                builder.build(),
-                catalogTable.getOptions(),
-                catalogTable.getPartitionKeys(),
-                catalogTable.getComment(),
-                catalogTable.getCatalogName());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
index 49e084e9f0..ea1495781d 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
@@ -110,7 +110,13 @@ public class IcebergWriterFactory {
         if (!idCols.isEmpty()) {
             identifierFieldIds =
                     idCols.stream()
-                            .map(colName -> 
table.schema().findField(colName).fieldId())
+                            .map(
+                                    colName ->
+                                            config.isCaseSensitive()
+                                                    ? table.schema()
+                                                            
.caseInsensitiveFindField(colName)
+                                                            .fieldId()
+                                                    : 
table.schema().findField(colName).fieldId())
                             .collect(toSet());
         }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index 111c7e0dfb..a7cbba8b89 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -325,10 +325,10 @@ public class IcebergSinkCDCIT extends TestSuiteBase 
implements TestResource {
                     results.add(record);
                 }
             } catch (IOException e) {
-                e.printStackTrace();
+                log.error(e.getMessage());
             }
         } catch (Exception ex) {
-            ex.printStackTrace();
+            log.error(ex.getMessage());
         }
         return results;
     }

Reply via email to