This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 93e539cc9f [Future][Connector-V2]Support the automatic creation of
non-primary key table (#9219)
93e539cc9f is described below
commit 93e539cc9f8912ac052df3a889e876dd5d5677a5
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jun 26 21:11:40 2025 +0800
[Future][Connector-V2]Support the automatic creation of non-primary key
table (#9219)
---
docs/en/connector-v2/sink/Paimon.md | 32 ++++----
docs/zh/connector-v2/sink/Paimon.md | 31 +++----
.../seatunnel/paimon/catalog/PaimonCatalog.java | 5 ++
.../seatunnel/paimon/config/PaimonConfig.java | 4 +-
.../seatunnel/paimon/config/PaimonSinkConfig.java | 14 ++++
.../seatunnel/paimon/config/PaimonSinkOptions.java | 8 ++
.../seatunnel/paimon/utils/SchemaUtil.java | 4 +
.../paimon/catalog/PaimonCatalogPrimaryTest.java | 94 ++++++++++++++++++++++
.../test/resources/changelog_paimon_to_paimon.conf | 1 +
9 files changed, 160 insertions(+), 33 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 40e87d903c..7694eec535 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -33,21 +33,23 @@ libfb303-xxx.jar
## Options
-| name | type | required | default value
| Description
|
-|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| warehouse | String | Yes | -
| Paimon warehouse path
|
-| catalog_type | String | No | filesystem
| Catalog type of Paimon, support filesystem and hive
|
-| catalog_uri | String | No | -
| Catalog uri of Paimon, only needed when catalog_type is hive
|
-| database | String | Yes | -
| The database you want to access
|
-| table | String | Yes | -
| The table you want to access
|
-| hdfs_site_path | String | No | -
| The path of hdfs-site.xml
|
-| schema_save_mode | Enum | No |
CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode
|
-| data_save_mode | Enum | No | APPEND_DATA
| The data save mode
|
-| paimon.table.primary-keys | String | No | -
| Default comma-separated list of columns (primary key) that identify a row
in tables.(Notice: The partition field needs to be included in the primary key
fields) |
-| paimon.table.partition-keys | String | No | -
| Default comma-separated list of partition fields to use when creating
tables.
|
-| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
|
-| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
-| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
+| name | type | required | default value
| Description
|
+|------------------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| warehouse | String | Yes | -
| Paimon warehouse path
|
+| catalog_type | String | No | filesystem
| Catalog type of Paimon, support filesystem and hive
|
+| catalog_uri | String | No | -
| Catalog uri of Paimon, only needed when catalog_type is hive
|
+| database | String | Yes | -
| The database you want to access
|
+| table | String | Yes | -
| The table you want to access
|
+| hdfs_site_path | String | No | -
| The path of hdfs-site.xml
|
+| schema_save_mode | Enum | No |
CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode
|
+| data_save_mode | Enum | No | APPEND_DATA
| The data save mode
|
+| paimon.table.primary-keys | String | No | -
| Default comma-separated list of columns (primary key) that identify a row
in tables.(Notice: The partition field needs to be included in the primary key
fields) |
+| paimon.table.partition-keys | String | No | -
| Default comma-separated list of partition fields to use when creating
tables.
|
+| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
|
+| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
+| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
+| paimon.table.non-primary-key | Boolean | false | -
| Switch to create `table with PK` or `table without PK`. true : `table
without PK`, false : `table with PK`
|
+
## Checkpoint in batch mode
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index bf6cd3c90a..2bbddfd148 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -32,21 +32,22 @@ libfb303-xxx.jar
## 连接器选项
-| 名称 | 类型 | 是否必须 | 默认值 |
描述
|
-|-----------------------------|------|------|------------------------------|-------------------------------------------------------------------------------------------------------|
-| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
-| catalog_type | 字符串 | 否 | filesystem |
Paimon的catalog类型,目前支持filesystem和hive
|
-| catalog_uri | 字符串 | 否 | - |
Paimon catalog的uri,仅当catalog_type为hive时需要配置
|
-| database | 字符串 | 是 | - |
数据库名称
|
-| table | 字符串 | 是 | - |
表名
|
-| hdfs_site_path | 字符串 | 否 | - |
hdfs-site.xml文件路径
|
-| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
Schema保存模式
|
-| data_save_mode | 枚举 | 否 | APPEND_DATA |
数据保存模式
|
-| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
-| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
-| paimon.table.write-props | Map | 否 | - |
Paimon表初始化指定的属性,
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
|
-| paimon.hadoop.conf | Map | 否 | - |
Hadoop配置文件属性信息
|
-| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
+| 名称 | 类型 | 是否必须 | 默认值 |
描述
|
+|-----------------------------|------|------|------------------------------|------------------------------------------------------------------------------------------------------|
+| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
+| catalog_type | 字符串 | 否 | filesystem |
Paimon的catalog类型,目前支持filesystem和hive
|
+| catalog_uri | 字符串 | 否 | - |
Paimon catalog的uri,仅当catalog_type为hive时需要配置
|
+| database | 字符串 | 是 | - |
数据库名称
|
+| table | 字符串 | 是 | - |
表名
|
+| hdfs_site_path | 字符串 | 否 | - |
hdfs-site.xml文件路径
|
+| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
Schema保存模式
|
+| data_save_mode | 枚举 | 否 | APPEND_DATA |
数据保存模式
|
+| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
+| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
+| paimon.table.write-props | Map | 否 | - |
Paimon表初始化指定的属性,
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
|
+| paimon.hadoop.conf | Map | 否 | - |
Hadoop配置文件属性信息
|
+| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
+| paimon.table.non-primary-key | Boolean | false | -
| 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表
|
## 批模式下的checkpoint
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 00d162a237..63523f7c2a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -271,6 +272,10 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
});
List<String> partitionKeys = schema.partitionKeys();
+ List<String> primaryKyes = schema.primaryKeys();
+ if (!primaryKyes.isEmpty()) {
+ builder.primaryKey(PrimaryKey.of("pk", primaryKyes));
+ }
return CatalogTable.of(
org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index 053d73cdd4..403d56d8c9 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
-import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -91,8 +90,7 @@ public class PaimonConfig implements Serializable {
return propValue;
}
- @VisibleForTesting
- public static List<String> stringToList(String value, String regex) {
+ protected static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
return ImmutableList.of();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index dd756fb2f5..857820b6a8 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.paimon.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.paimon.CoreOptions;
@@ -38,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
private final DataSaveMode dataSaveMode;
private final CoreOptions.ChangelogProducer changelogProducer;
private final String changelogTmpPath;
+ private final Boolean nonPrimaryKey;
private final List<String> primaryKeys;
private final List<String> partitionKeys;
private final Map<String, String> writeProps;
@@ -46,7 +49,18 @@ public class PaimonSinkConfig extends PaimonConfig {
super(readonlyConfig);
this.schemaSaveMode =
readonlyConfig.get(PaimonSinkOptions.SCHEMA_SAVE_MODE);
this.dataSaveMode =
readonlyConfig.get(PaimonSinkOptions.DATA_SAVE_MODE);
+ this.nonPrimaryKey =
readonlyConfig.get(PaimonSinkOptions.NON_PRIMARY_KEY);
this.primaryKeys =
stringToList(readonlyConfig.get(PaimonSinkOptions.PRIMARY_KEYS), ",");
+ if (this.nonPrimaryKey && !this.primaryKeys.isEmpty()) {
+ String message =
+ String.format(
+ " `%s` will is empty when `%s`is true, but is %s",
+ PaimonSinkOptions.PRIMARY_KEYS.key(),
+ PaimonSinkOptions.NON_PRIMARY_KEY.key(),
+ this.primaryKeys);
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.NON_PRIMARY_KEY_CHECK_ERROR,
message);
+ }
this.partitionKeys =
stringToList(readonlyConfig.get(PaimonSinkOptions.PARTITION_KEYS), ",");
this.writeProps = readonlyConfig.get(PaimonSinkOptions.WRITE_PROPS);
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
index 44a2764b4f..b56215e2d1 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
@@ -40,6 +40,14 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");
+
+ public static final Option<Boolean> NON_PRIMARY_KEY =
+ Options.key("paimon.table.non-primary-key")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Switch to create table with PK or table without
PK, true is table without PK, false is table with PK");
+
public static final Option<String> PRIMARY_KEYS =
Options.key("paimon.table.primary-keys")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index 1ae84be765..7638990c98 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeJsonParser;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -68,6 +69,9 @@ public class SchemaUtil {
if (primaryKeys.isEmpty() &&
Objects.nonNull(tableSchema.getPrimaryKey())) {
primaryKeys = tableSchema.getPrimaryKey().getColumnNames();
}
+ if (paimonSinkConfig.getNonPrimaryKey()) {
+ primaryKeys = Collections.emptyList();
+ }
if (!primaryKeys.isEmpty()) {
paiSchemaBuilder.primaryKey(primaryKeys);
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogPrimaryTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogPrimaryTest.java
new file mode 100644
index 0000000000..c32e2e1f11
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogPrimaryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class PaimonCatalogPrimaryTest {
+
+ private PaimonCatalog paimonCatalog;
+ private Catalog catalog;
+ private final String DATABASE_NAME = "default";
+ private final String CATALOG_NAME = "paimon_catalog";
+ private final String TABLE_NAME = "test_table";
+ private final String WAREHOUSE_PATH = "/tmp/paimon";
+ private final Identifier identifier = Identifier.create(DATABASE_NAME,
TABLE_NAME);
+
+ @BeforeEach
+ public void before()
+ throws Catalog.DatabaseAlreadyExistException,
Catalog.TableAlreadyExistException,
+ Catalog.DatabaseNotExistException {
+ CatalogContext catalogContext = CatalogContext.create(new
Path(WAREHOUSE_PATH));
+ catalog = CatalogFactory.createCatalog(catalogContext);
+ catalog.createDatabase(DATABASE_NAME, true);
+
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("id", DataTypes.SMALLINT());
+ schemaBuilder.column("name", DataTypes.STRING());
+ schemaBuilder.column("age", DataTypes.TINYINT());
+ schemaBuilder.primaryKey("id", "name");
+ catalog.createTable(identifier, schemaBuilder.build(), true);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("warehouse", "/tmp/paimon");
+ properties.put("plugin_name", "Paimon");
+ properties.put("database", DATABASE_NAME);
+ properties.put("table", TABLE_NAME);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
+ paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
+ paimonCatalog.open();
+ }
+
+ @Test
+ public void primaryKey() {
+ CatalogTable catalogTable =
paimonCatalog.getTable(TablePath.of(DATABASE_NAME, TABLE_NAME));
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ Assertions.assertEquals(
+ tableSchema.getPrimaryKey().getColumnNames(),
Arrays.asList("id", "name"));
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ catalog.dropTable(identifier, true);
+ catalog.dropDatabase(DATABASE_NAME, true, true);
+ catalog.close();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
index d23d11d9e0..7f0798039a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -41,6 +41,7 @@ sink {
warehouse = "/tmp/paimon"
database = "seatunnel_namespace"
table = "st_test_sink"
+ paimon.table.non-primary-key = true
paimon.table.write-props = {
write-only = true
}