This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 40d2c1b213 [Bug][Connector-Iceberg]fix create iceberg v2 table with
pks (#6895)
40d2c1b213 is described below
commit 40d2c1b213480a67abda9a6c0d5ff82cea4d6389
Author: litiliu <[email protected]>
AuthorDate: Wed May 29 21:33:11 2024 +0800
[Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895)
---
.../seatunnel/iceberg/data/IcebergTypeMapper.java | 27 +++---
.../seatunnel/iceberg/utils/SchemaUtils.java | 48 +++++++++-
.../seatunnel/iceberg/utils/SchemaUtilsTest.java | 101 +++++++++++++++++++++
3 files changed, 161 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index 0b6ed2ccc5..e1635919d6 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -34,9 +34,9 @@ import lombok.NonNull;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
public class IcebergTypeMapper {
- private static int fieldId = 1;
public static SeaTunnelDataType<?> mapping(String field, @NonNull Type
icebergType) {
switch (icebergType.typeId()) {
@@ -113,6 +113,10 @@ public class IcebergTypeMapper {
}
public static Type toIcebergType(SeaTunnelDataType dataType) {
+ return toIcebergType(dataType, new AtomicInteger(1));
+ }
+
+ private static Type toIcebergType(SeaTunnelDataType dataType,
AtomicInteger nextId) {
switch (dataType.getSqlType()) {
case BOOLEAN:
return Types.BooleanType.get();
@@ -134,14 +138,15 @@ public class IcebergTypeMapper {
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
// converter elementType
- Type elementType = toIcebergType(arrayType.getElementType());
- return Types.ListType.ofOptional(nextId(), elementType);
+ Type elementType = toIcebergType(arrayType.getElementType(),
nextId);
+ return Types.ListType.ofOptional(nextId.getAndIncrement(),
elementType);
case MAP:
org.apache.seatunnel.api.table.type.MapType mapType =
(org.apache.seatunnel.api.table.type.MapType) dataType;
- Type keyType = toIcebergType(mapType.getKeyType());
- Type valueType = toIcebergType(mapType.getValueType());
- return Types.MapType.ofOptional(nextId(), nextId(), keyType,
valueType);
+ Type keyType = toIcebergType(mapType.getKeyType(), nextId);
+ Type valueType = toIcebergType(mapType.getValueType(), nextId);
+ return Types.MapType.ofOptional(
+ nextId.getAndIncrement(), nextId.getAndIncrement(),
keyType, valueType);
case ROW:
SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType)
dataType;
List<Types.NestedField> structFields = new ArrayList<>();
@@ -149,7 +154,11 @@ public class IcebergTypeMapper {
String field = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType fieldType =
seaTunnelRowType.getFieldType(i);
structFields.add(
- Types.NestedField.of(nextId(), true, field,
toIcebergType(fieldType)));
+ Types.NestedField.of(
+ nextId.getAndIncrement(),
+ true,
+ field,
+ toIcebergType(fieldType, nextId)));
}
return Types.StructType.of(structFields);
case DATE:
@@ -163,8 +172,4 @@ public class IcebergTypeMapper {
return Types.StringType.get();
}
}
-
- private static int nextId() {
- return fieldId++;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index fafac0eaae..9d56072c92 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaChang
import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaDeleteColumn;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaModifyColumn;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -50,11 +51,17 @@ import org.apache.iceberg.util.Tasks;
import org.jetbrains.annotations.NotNull;
+import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -94,7 +101,7 @@ public class SchemaUtils {
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
TableSchema tableSchema = table.getTableSchema();
// Convert to iceberg schema
- Schema schema = toIcebergSchema(tableSchema.toPhysicalRowDataType());
+ Schema schema = toIcebergSchema(tableSchema.toPhysicalRowDataType(),
readonlyConfig);
// Convert sink config
SinkConfig config = new SinkConfig(readonlyConfig);
// build auto create table
@@ -120,7 +127,7 @@ public class SchemaUtils {
SinkConfig config,
SeaTunnelRowType rowType) {
// Generate struct type
- Schema schema = toIcebergSchema(rowType);
+ Schema schema = toIcebergSchema(rowType, config.getReadonlyConfig());
return createTable(catalog, tableIdentifier, config, schema,
config.getAutoCreateProps());
}
@@ -160,9 +167,42 @@ public class SchemaUtils {
return result.get();
}
- @NotNull private static Schema toIcebergSchema(SeaTunnelRowType rowType) {
+ @VisibleForTesting
+ @NotNull protected static Schema toIcebergSchema(
+ SeaTunnelRowType rowType, ReadonlyConfig readonlyConfig) {
Types.StructType structType =
SchemaUtils.toIcebergType(rowType).asStructType();
- return new Schema(structType.fields());
+ Set<Integer> identifierFieldIds = new HashSet<>();
+ if (Objects.nonNull(readonlyConfig)) {
+ List<String> pks =
+
SinkConfig.stringToList(readonlyConfig.get(SinkConfig.TABLE_PRIMARY_KEYS), ",");
+ if (CollectionUtils.isNotEmpty(pks)) {
+ for (String pk : pks) {
+ Optional<Integer> pkId =
+ structType.fields().stream()
+ .filter(nestedField ->
nestedField.name().equals(pk))
+ .map(nestedField -> nestedField.fieldId())
+ .findFirst();
+ if (!pkId.isPresent()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "iceberg table pk:%s not present in
the incoming struct",
+ pk));
+ }
+ identifierFieldIds.add(pkId.get());
+ }
+ }
+ }
+ List<Types.NestedField> fields = new ArrayList<>();
+ structType
+ .fields()
+ .forEach(
+ field -> {
+ fields.add(
+
identifierFieldIds.contains(field.fieldId())
+ ? field.asRequired()
+ : field.asOptional());
+ });
+ return new Schema(fields, identifierFieldIds);
}
public static TableIdentifier toIcebergTableIdentifierFromCatalogTable(
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtilsTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtilsTest.java
new file mode 100644
index 0000000000..170f83fecf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtilsTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+class SchemaUtilsTest {
+
+ @Test
+ void testToIcebergSchemaWithPk() {
+ String[] fieldNames = new String[] {"id", "name", "description",
"weight"};
+ SeaTunnelDataType<?>[] dataTypes =
+ new SeaTunnelDataType[] {
+ BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ };
+ SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, dataTypes);
+ List<String> pks = Arrays.asList("id", "name");
+ ReadonlyConfig readonlyConfig =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(SinkConfig.TABLE_PRIMARY_KEYS.key(),
String.join(",", pks));
+ }
+ });
+ Schema schema = SchemaUtils.toIcebergSchema(rowType, readonlyConfig);
+ Assertions.assertNotNull(schema);
+ Assertions.assertEquals(fieldNames.length, schema.columns().size());
+ for (Types.NestedField column : schema.columns()) {
+ Assertions.assertEquals(fieldNames[column.fieldId() - 1],
column.name());
+ if (pks.contains(column.name())) {
+ Assertions.assertEquals(Boolean.TRUE, column.isRequired());
+ } else {
+ Assertions.assertEquals(Boolean.FALSE, column.isRequired());
+ }
+ }
+ Assertions.assertNotNull(schema.identifierFieldIds());
+ Assertions.assertEquals(pks.size(),
schema.identifierFieldIds().size());
+ for (Integer identifierFieldId : schema.identifierFieldIds()) {
+ Assertions.assertEquals(
+ pks.get(identifierFieldId - 1),
fieldNames[identifierFieldId - 1]);
+ }
+ }
+
+ @Test
+ void testToIcebergSchemaWithoutPk() {
+ String[] fieldNames = new String[] {"id", "name", "description",
"weight"};
+ SeaTunnelDataType<?>[] dataTypes =
+ new SeaTunnelDataType[] {
+ BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ };
+ SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, dataTypes);
+ ReadonlyConfig readonlyConfig =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ }
+ });
+ Schema schema = SchemaUtils.toIcebergSchema(rowType, readonlyConfig);
+ Assertions.assertNotNull(schema);
+ Assertions.assertEquals(fieldNames.length, schema.columns().size());
+ for (Types.NestedField column : schema.columns()) {
+ Assertions.assertEquals(fieldNames[column.fieldId() - 1],
column.name());
+ Assertions.assertEquals(Boolean.FALSE, column.isRequired());
+ }
+ }
+}