This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 40d2c1b213 [Bug][Connector-Iceberg]fix create iceberg v2 table with 
pks (#6895)
40d2c1b213 is described below

commit 40d2c1b213480a67abda9a6c0d5ff82cea4d6389
Author: litiliu <[email protected]>
AuthorDate: Wed May 29 21:33:11 2024 +0800

    [Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895)
---
 .../seatunnel/iceberg/data/IcebergTypeMapper.java  |  27 +++---
 .../seatunnel/iceberg/utils/SchemaUtils.java       |  48 +++++++++-
 .../seatunnel/iceberg/utils/SchemaUtilsTest.java   | 101 +++++++++++++++++++++
 3 files changed, 161 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index 0b6ed2ccc5..e1635919d6 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -34,9 +34,9 @@ import lombok.NonNull;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class IcebergTypeMapper {
-    private static int fieldId = 1;
 
     public static SeaTunnelDataType<?> mapping(String field, @NonNull Type 
icebergType) {
         switch (icebergType.typeId()) {
@@ -113,6 +113,10 @@ public class IcebergTypeMapper {
     }
 
     public static Type toIcebergType(SeaTunnelDataType dataType) {
+        return toIcebergType(dataType, new AtomicInteger(1));
+    }
+
+    private static Type toIcebergType(SeaTunnelDataType dataType, 
AtomicInteger nextId) {
         switch (dataType.getSqlType()) {
             case BOOLEAN:
                 return Types.BooleanType.get();
@@ -134,14 +138,15 @@ public class IcebergTypeMapper {
             case ARRAY:
                 ArrayType arrayType = (ArrayType) dataType;
                 // converter elementType
-                Type elementType = toIcebergType(arrayType.getElementType());
-                return Types.ListType.ofOptional(nextId(), elementType);
+                Type elementType = toIcebergType(arrayType.getElementType(), 
nextId);
+                return Types.ListType.ofOptional(nextId.getAndIncrement(), 
elementType);
             case MAP:
                 org.apache.seatunnel.api.table.type.MapType mapType =
                         (org.apache.seatunnel.api.table.type.MapType) dataType;
-                Type keyType = toIcebergType(mapType.getKeyType());
-                Type valueType = toIcebergType(mapType.getValueType());
-                return Types.MapType.ofOptional(nextId(), nextId(), keyType, 
valueType);
+                Type keyType = toIcebergType(mapType.getKeyType(), nextId);
+                Type valueType = toIcebergType(mapType.getValueType(), nextId);
+                return Types.MapType.ofOptional(
+                        nextId.getAndIncrement(), nextId.getAndIncrement(), 
keyType, valueType);
             case ROW:
                 SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) 
dataType;
                 List<Types.NestedField> structFields = new ArrayList<>();
@@ -149,7 +154,11 @@ public class IcebergTypeMapper {
                     String field = seaTunnelRowType.getFieldName(i);
                     SeaTunnelDataType fieldType = 
seaTunnelRowType.getFieldType(i);
                     structFields.add(
-                            Types.NestedField.of(nextId(), true, field, 
toIcebergType(fieldType)));
+                            Types.NestedField.of(
+                                    nextId.getAndIncrement(),
+                                    true,
+                                    field,
+                                    toIcebergType(fieldType, nextId)));
                 }
                 return Types.StructType.of(structFields);
             case DATE:
@@ -163,8 +172,4 @@ public class IcebergTypeMapper {
                 return Types.StringType.get();
         }
     }
-
-    private static int nextId() {
-        return fieldId++;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index fafac0eaae..9d56072c92 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaChang
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaDeleteColumn;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaModifyColumn;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -50,11 +51,17 @@ import org.apache.iceberg.util.Tasks;
 
 import org.jetbrains.annotations.NotNull;
 
+import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -94,7 +101,7 @@ public class SchemaUtils {
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         TableSchema tableSchema = table.getTableSchema();
         // Convert to iceberg schema
-        Schema schema = toIcebergSchema(tableSchema.toPhysicalRowDataType());
+        Schema schema = toIcebergSchema(tableSchema.toPhysicalRowDataType(), 
readonlyConfig);
         // Convert sink config
         SinkConfig config = new SinkConfig(readonlyConfig);
         // build auto create table
@@ -120,7 +127,7 @@ public class SchemaUtils {
             SinkConfig config,
             SeaTunnelRowType rowType) {
         // Generate struct type
-        Schema schema = toIcebergSchema(rowType);
+        Schema schema = toIcebergSchema(rowType, config.getReadonlyConfig());
         return createTable(catalog, tableIdentifier, config, schema, 
config.getAutoCreateProps());
     }
 
@@ -160,9 +167,42 @@ public class SchemaUtils {
         return result.get();
     }
 
-    @NotNull private static Schema toIcebergSchema(SeaTunnelRowType rowType) {
+    @VisibleForTesting
+    @NotNull protected static Schema toIcebergSchema(
+            SeaTunnelRowType rowType, ReadonlyConfig readonlyConfig) {
         Types.StructType structType = 
SchemaUtils.toIcebergType(rowType).asStructType();
-        return new Schema(structType.fields());
+        Set<Integer> identifierFieldIds = new HashSet<>();
+        if (Objects.nonNull(readonlyConfig)) {
+            List<String> pks =
+                    
SinkConfig.stringToList(readonlyConfig.get(SinkConfig.TABLE_PRIMARY_KEYS), ",");
+            if (CollectionUtils.isNotEmpty(pks)) {
+                for (String pk : pks) {
+                    Optional<Integer> pkId =
+                            structType.fields().stream()
+                                    .filter(nestedField -> 
nestedField.name().equals(pk))
+                                    .map(nestedField -> nestedField.fieldId())
+                                    .findFirst();
+                    if (!pkId.isPresent()) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "iceberg table pk:%s not present in 
the incoming struct",
+                                        pk));
+                    }
+                    identifierFieldIds.add(pkId.get());
+                }
+            }
+        }
+        List<Types.NestedField> fields = new ArrayList<>();
+        structType
+                .fields()
+                .forEach(
+                        field -> {
+                            fields.add(
+                                    
identifierFieldIds.contains(field.fieldId())
+                                            ? field.asRequired()
+                                            : field.asOptional());
+                        });
+        return new Schema(fields, identifierFieldIds);
     }
 
     public static TableIdentifier toIcebergTableIdentifierFromCatalogTable(
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtilsTest.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtilsTest.java
new file mode 100644
index 0000000000..170f83fecf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtilsTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+class SchemaUtilsTest {
+
+    @Test
+    void testToIcebergSchemaWithPk() {
+        String[] fieldNames = new String[] {"id", "name", "description", 
"weight"};
+        SeaTunnelDataType<?>[] dataTypes =
+                new SeaTunnelDataType[] {
+                    BasicType.LONG_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.STRING_TYPE
+                };
+        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, dataTypes);
+        List<String> pks = Arrays.asList("id", "name");
+        ReadonlyConfig readonlyConfig =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(SinkConfig.TABLE_PRIMARY_KEYS.key(), 
String.join(",", pks));
+                            }
+                        });
+        Schema schema = SchemaUtils.toIcebergSchema(rowType, readonlyConfig);
+        Assertions.assertNotNull(schema);
+        Assertions.assertEquals(fieldNames.length, schema.columns().size());
+        for (Types.NestedField column : schema.columns()) {
+            Assertions.assertEquals(fieldNames[column.fieldId() - 1], 
column.name());
+            if (pks.contains(column.name())) {
+                Assertions.assertEquals(Boolean.TRUE, column.isRequired());
+            } else {
+                Assertions.assertEquals(Boolean.FALSE, column.isRequired());
+            }
+        }
+        Assertions.assertNotNull(schema.identifierFieldIds());
+        Assertions.assertEquals(pks.size(), 
schema.identifierFieldIds().size());
+        for (Integer identifierFieldId : schema.identifierFieldIds()) {
+            Assertions.assertEquals(
+                    pks.get(identifierFieldId - 1), 
fieldNames[identifierFieldId - 1]);
+        }
+    }
+
+    @Test
+    void testToIcebergSchemaWithoutPk() {
+        String[] fieldNames = new String[] {"id", "name", "description", 
"weight"};
+        SeaTunnelDataType<?>[] dataTypes =
+                new SeaTunnelDataType[] {
+                    BasicType.LONG_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.STRING_TYPE
+                };
+        SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, dataTypes);
+        ReadonlyConfig readonlyConfig =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                            }
+                        });
+        Schema schema = SchemaUtils.toIcebergSchema(rowType, readonlyConfig);
+        Assertions.assertNotNull(schema);
+        Assertions.assertEquals(fieldNames.length, schema.columns().size());
+        for (Types.NestedField column : schema.columns()) {
+            Assertions.assertEquals(fieldNames[column.fieldId() - 1], 
column.name());
+            Assertions.assertEquals(Boolean.FALSE, column.isRequired());
+        }
+    }
+}

Reply via email to