This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b7be0cb4a1 [Fix][Connector-V2] Fixed iceberg sink can not handle
uppercase fields (#7660)
b7be0cb4a1 is described below
commit b7be0cb4a1ac45bb08c1a7c1bc69bba28ea87382
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Sep 23 10:30:32 2024 +0800
[Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields
(#7660)
---
.../seatunnel/iceberg/sink/IcebergSink.java | 75 +---------------------
.../iceberg/sink/writer/IcebergWriterFactory.java | 8 ++-
.../e2e/connector/iceberg/IcebergSinkCDCIT.java | 4 +-
3 files changed, 10 insertions(+), 77 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index a1d43d6acf..417d92ac49 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -30,10 +30,6 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
@@ -48,7 +44,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.stream.Collectors;
import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
@@ -68,7 +63,7 @@ public class IcebergSink
public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable)
{
this.readonlyConfig = pluginConfig;
this.config = new SinkConfig(pluginConfig);
- this.catalogTable = convertLowerCaseCatalogTable(catalogTable);
+ this.catalogTable = catalogTable;
// Reset primary keys if need
if (config.getPrimaryKeys().isEmpty()
&&
Objects.nonNull(this.catalogTable.getTableSchema().getPrimaryKey())) {
@@ -138,72 +133,4 @@ public class IcebergSink
catalogTable,
null));
}
-
- private CatalogTable convertLowerCaseCatalogTable(CatalogTable
catalogTable) {
- TableSchema tableSchema = catalogTable.getTableSchema();
- TableSchema.Builder builder = TableSchema.builder();
- tableSchema
- .getColumns()
- .forEach(
- column -> {
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- column.getName(),
- column.getDataType(),
- column.getColumnLength(),
- column.isNullable(),
- column.getDefaultValue(),
- column.getComment());
- builder.column(physicalColumn);
- });
- // set primary
- if (Objects.nonNull(tableSchema.getPrimaryKey())) {
- PrimaryKey newPrimaryKey =
- PrimaryKey.of(
- tableSchema.getPrimaryKey().getPrimaryKey(),
-
tableSchema.getPrimaryKey().getColumnNames().stream()
- .map(String::toLowerCase)
- .collect(Collectors.toList()));
- builder.primaryKey(newPrimaryKey);
- }
-
- if (Objects.nonNull(tableSchema.getConstraintKeys())) {
- tableSchema
- .getConstraintKeys()
- .forEach(
- constraintKey -> {
- ConstraintKey newConstraintKey =
- ConstraintKey.of(
-
constraintKey.getConstraintType(),
-
constraintKey.getConstraintName(),
- constraintKey.getColumnNames()
!= null
- ?
constraintKey.getColumnNames().stream()
- .map(
-
constraintKeyColumn ->
-
ConstraintKey
-
.ConstraintKeyColumn
-
.of(
-
constraintKeyColumn
-
.getColumnName()
-
!= null
-
? constraintKeyColumn
-
.getColumnName()
-
.toLowerCase()
-
: null,
-
constraintKeyColumn
-
.getSortType()))
-
.collect(Collectors.toList())
- : null);
- builder.constraintKey(newConstraintKey);
- });
- }
-
- return CatalogTable.of(
- catalogTable.getTableId(),
- builder.build(),
- catalogTable.getOptions(),
- catalogTable.getPartitionKeys(),
- catalogTable.getComment(),
- catalogTable.getCatalogName());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
index 49e084e9f0..ea1495781d 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
@@ -110,7 +110,13 @@ public class IcebergWriterFactory {
if (!idCols.isEmpty()) {
identifierFieldIds =
idCols.stream()
- .map(colName ->
table.schema().findField(colName).fieldId())
+ .map(
+ colName ->
+ config.isCaseSensitive()
+ ? table.schema()
+
.caseInsensitiveFindField(colName)
+ .fieldId()
+ :
table.schema().findField(colName).fieldId())
.collect(toSet());
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index 111c7e0dfb..a7cbba8b89 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -325,10 +325,10 @@ public class IcebergSinkCDCIT extends TestSuiteBase
implements TestResource {
results.add(record);
}
} catch (IOException e) {
- e.printStackTrace();
+ log.error(e.getMessage());
}
} catch (Exception ex) {
- ex.printStackTrace();
+ log.error(ex.getMessage());
}
return results;
}