This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 93e539cc9f [Future][Connector-V2]Support the automatic creation of 
non-primary key table (#9219)
93e539cc9f is described below

commit 93e539cc9f8912ac052df3a889e876dd5d5677a5
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jun 26 21:11:40 2025 +0800

    [Future][Connector-V2]Support the automatic creation of non-primary key 
table (#9219)
---
 docs/en/connector-v2/sink/Paimon.md                | 32 ++++----
 docs/zh/connector-v2/sink/Paimon.md                | 31 +++----
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |  5 ++
 .../seatunnel/paimon/config/PaimonConfig.java      |  4 +-
 .../seatunnel/paimon/config/PaimonSinkConfig.java  | 14 ++++
 .../seatunnel/paimon/config/PaimonSinkOptions.java |  8 ++
 .../seatunnel/paimon/utils/SchemaUtil.java         |  4 +
 .../paimon/catalog/PaimonCatalogPrimaryTest.java   | 94 ++++++++++++++++++++++
 .../test/resources/changelog_paimon_to_paimon.conf |  1 +
 9 files changed, 160 insertions(+), 33 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 40e87d903c..7694eec535 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -33,21 +33,23 @@ libfb303-xxx.jar
 
 ## Options
 
-|            name             | type   | required | default value              
  | Description                                                                 
                                                                                
     |
-|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| warehouse                   | String | Yes      | -                          
  | Paimon warehouse path                                                       
                                                                                
     |
-| catalog_type                | String | No       | filesystem                 
  | Catalog type of Paimon, support filesystem and hive                         
                                                                                
     |
-| catalog_uri                 | String | No       | -                          
  | Catalog uri of Paimon, only needed when catalog_type is hive                
                                                                                
     |
-| database                    | String | Yes      | -                          
  | The database you want to access                                             
                                                                                
     |
-| table                       | String | Yes      | -                          
  | The table you want to access                                                
                                                                                
     |
-| hdfs_site_path              | String | No       | -                          
  | The path of hdfs-site.xml                                                   
                                                                                
     |
-| schema_save_mode            | Enum   | No       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode                             
                                                                                
                                |
-| data_save_mode              | Enum   | No       | APPEND_DATA                
  | The data save mode                                                          
                                                                                
     |
-| paimon.table.primary-keys   | String | No       | -                          
  | Default comma-separated list of columns (primary key) that identify a row 
in tables.(Notice: The partition field needs to be included in the primary key 
fields) |
-| paimon.table.partition-keys | String | No       | -                          
  | Default comma-separated list of partition fields to use when creating 
tables.                                                                         
           |
-| paimon.table.write-props    | Map    | No       | -                          
  | Properties passed through to paimon table initialization, 
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
            |
-| paimon.hadoop.conf          | Map    | No       | -                          
  | Properties in hadoop conf                                                   
                                                                                
     |
-| paimon.hadoop.conf-path     | String | No       | -                          
  | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files                                                           
            |
+| name                         | type    | required | default value            
    | Description                                                               
                                                                                
       |
+|------------------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| warehouse                    | String  | Yes      | -                        
    | Paimon warehouse path                                                     
                                                                                
       |
+| catalog_type                 | String  | No       | filesystem               
    | Catalog type of Paimon, support filesystem and hive                       
                                                                                
       |
+| catalog_uri                  | String  | No       | -                        
    | Catalog uri of Paimon, only needed when catalog_type is hive              
                                                                                
       |
+| database                     | String  | Yes      | -                        
    | The database you want to access                                           
                                                                                
       |
+| table                        | String  | Yes      | -                        
    | The table you want to access                                              
                                                                                
       |
+| hdfs_site_path               | String  | No       | -                        
    | The path of hdfs-site.xml                                                 
                                                                                
       |
+| schema_save_mode             | Enum    | No       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode                             
                                                                                
                                |
+| data_save_mode               | Enum    | No       | APPEND_DATA              
    | The data save mode                                                        
                                                                                
       |
+| paimon.table.primary-keys    | String  | No       | -                        
    | Default comma-separated list of columns (primary key) that identify a row 
in tables.(Notice: The partition field needs to be included in the primary key 
fields) |
+| paimon.table.partition-keys  | String  | No       | -                        
    | Default comma-separated list of partition fields to use when creating 
tables.                                                                         
           |
+| paimon.table.write-props     | Map     | No       | -                        
    | Properties passed through to paimon table initialization, 
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
            |
+| paimon.hadoop.conf           | Map     | No       | -                        
    | Properties in hadoop conf                                                 
                                                                                
       |
+| paimon.hadoop.conf-path      | String  | No       | -                        
    | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files                                                           
            |
+| paimon.table.non-primary-key | Boolean | false    | -                        
    | Switch to create `table with PK` or `table without PK`. true : `table 
without PK`, false : `table with PK`                                            
           |
+
 
 ## Checkpoint in batch mode
 
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index bf6cd3c90a..2bbddfd148 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -32,21 +32,22 @@ libfb303-xxx.jar
 
 ## 连接器选项
 
-| 名称                          | 类型   | 是否必须 | 默认值                          | 
描述                                                                              
                      |
-|-----------------------------|------|------|------------------------------|-------------------------------------------------------------------------------------------------------|
-| warehouse                   | 字符串  | 是    | -                            | 
Paimon warehouse路径                                                              
                      |
-| catalog_type                | 字符串  | 否    | filesystem                   | 
Paimon的catalog类型,目前支持filesystem和hive                                            
                      |
-| catalog_uri                 | 字符串  | 否    | -                            | 
Paimon catalog的uri,仅当catalog_type为hive时需要配置                                     
                      |
-| database                    | 字符串  | 是    | -                            | 
数据库名称                                                                           
                      |
-| table                       | 字符串  | 是    | -                            | 
表名                                                                              
                      |
-| hdfs_site_path              | 字符串  | 否    | -                            | 
hdfs-site.xml文件路径                                                               
                      |
-| schema_save_mode            | 枚举   | 否    | CREATE_SCHEMA_WHEN_NOT_EXIST | 
Schema保存模式                                                                      
                      |
-| data_save_mode              | 枚举   | 否    | APPEND_DATA                  | 
数据保存模式                                                                          
                      |
-| paimon.table.primary-keys   | 字符串  | 否    | -                            | 
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)                                            
                      |
-| paimon.table.partition-keys | 字符串  | 否    | -                            | 
分区字段列表,多字段使用逗号分隔                                                                
                      |
-| paimon.table.write-props    | Map  | 否    | -                            | 
Paimon表初始化指定的属性, 
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
  |
-| paimon.hadoop.conf          | Map  | 否    | -                            | 
Hadoop配置文件属性信息                                                                  
                      |
-| paimon.hadoop.conf-path     | 字符串  | 否    | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                      |
+| 名称                          | 类型   | 是否必须 | 默认值                          | 
描述                                                                              
                     |
+|-----------------------------|------|------|------------------------------|------------------------------------------------------------------------------------------------------|
+| warehouse                   | 字符串  | 是    | -                            | 
Paimon warehouse路径                                                              
                     |
+| catalog_type                | 字符串  | 否    | filesystem                   | 
Paimon的catalog类型,目前支持filesystem和hive                                            
                     |
+| catalog_uri                 | 字符串  | 否    | -                            | 
Paimon catalog的uri,仅当catalog_type为hive时需要配置                                     
                     |
+| database                    | 字符串  | 是    | -                            | 
数据库名称                                                                           
                     |
+| table                       | 字符串  | 是    | -                            | 
表名                                                                              
                     |
+| hdfs_site_path              | 字符串  | 否    | -                            | 
hdfs-site.xml文件路径                                                               
                     |
+| schema_save_mode            | 枚举   | 否    | CREATE_SCHEMA_WHEN_NOT_EXIST | 
Schema保存模式                                                                      
                     |
+| data_save_mode              | 枚举   | 否    | APPEND_DATA                  | 
数据保存模式                                                                          
                     |
+| paimon.table.primary-keys   | 字符串  | 否    | -                            | 
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)                                            
                     |
+| paimon.table.partition-keys | 字符串  | 否    | -                            | 
分区字段列表,多字段使用逗号分隔                                                                
                     |
+| paimon.table.write-props    | Map  | 否    | -                            | 
Paimon表初始化指定的属性, 
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
 |
+| paimon.hadoop.conf          | Map  | 否    | -                            | 
Hadoop配置文件属性信息                                                                  
                     |
+| paimon.hadoop.conf-path     | 字符串  | 否    | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                     |
+| paimon.table.non-primary-key | Boolean | false    | -                        
    | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表                              
                                     |
 
 ## 批模式下的checkpoint
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 00d162a237..63523f7c2a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -271,6 +272,10 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
                 });
 
         List<String> partitionKeys = schema.partitionKeys();
+        List<String> primaryKyes = schema.primaryKeys();
+        if (!primaryKyes.isEmpty()) {
+            builder.primaryKey(PrimaryKey.of("pk", primaryKyes));
+        }
 
         return CatalogTable.of(
                 org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index 053d73cdd4..403d56d8c9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.config;
 
-import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -91,8 +90,7 @@ public class PaimonConfig implements Serializable {
         return propValue;
     }
 
-    @VisibleForTesting
-    public static List<String> stringToList(String value, String regex) {
+    protected static List<String> stringToList(String value, String regex) {
         if (value == null || value.isEmpty()) {
             return ImmutableList.of();
         }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index dd756fb2f5..857820b6a8 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.paimon.config;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 
 import org.apache.paimon.CoreOptions;
 
@@ -38,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
     private final DataSaveMode dataSaveMode;
     private final CoreOptions.ChangelogProducer changelogProducer;
     private final String changelogTmpPath;
+    private final Boolean nonPrimaryKey;
     private final List<String> primaryKeys;
     private final List<String> partitionKeys;
     private final Map<String, String> writeProps;
@@ -46,7 +49,18 @@ public class PaimonSinkConfig extends PaimonConfig {
         super(readonlyConfig);
         this.schemaSaveMode = 
readonlyConfig.get(PaimonSinkOptions.SCHEMA_SAVE_MODE);
         this.dataSaveMode = 
readonlyConfig.get(PaimonSinkOptions.DATA_SAVE_MODE);
+        this.nonPrimaryKey = 
readonlyConfig.get(PaimonSinkOptions.NON_PRIMARY_KEY);
         this.primaryKeys = 
stringToList(readonlyConfig.get(PaimonSinkOptions.PRIMARY_KEYS), ",");
+        if (this.nonPrimaryKey && !this.primaryKeys.isEmpty()) {
+            String message =
+                    String.format(
+                            " `%s` will is empty when `%s`is true, but is %s",
+                            PaimonSinkOptions.PRIMARY_KEYS.key(),
+                            PaimonSinkOptions.NON_PRIMARY_KEY.key(),
+                            this.primaryKeys);
+            throw new PaimonConnectorException(
+                    PaimonConnectorErrorCode.NON_PRIMARY_KEY_CHECK_ERROR, 
message);
+        }
         this.partitionKeys =
                 
stringToList(readonlyConfig.get(PaimonSinkOptions.PARTITION_KEYS), ",");
         this.writeProps = readonlyConfig.get(PaimonSinkOptions.WRITE_PROPS);
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
index 44a2764b4f..b56215e2d1 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
@@ -40,6 +40,14 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
                     .enumType(DataSaveMode.class)
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription("data_save_mode");
+
+    public static final Option<Boolean> NON_PRIMARY_KEY =
+            Options.key("paimon.table.non-primary-key")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Switch to create table with PK or table without 
PK, true is table without PK, false is table with PK");
+
     public static final Option<String> PRIMARY_KEYS =
             Options.key("paimon.table.primary-keys")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index 1ae84be765..7638990c98 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeJsonParser;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -68,6 +69,9 @@ public class SchemaUtil {
         if (primaryKeys.isEmpty() && 
Objects.nonNull(tableSchema.getPrimaryKey())) {
             primaryKeys = tableSchema.getPrimaryKey().getColumnNames();
         }
+        if (paimonSinkConfig.getNonPrimaryKey()) {
+            primaryKeys = Collections.emptyList();
+        }
         if (!primaryKeys.isEmpty()) {
             paiSchemaBuilder.primaryKey(primaryKeys);
         }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogPrimaryTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogPrimaryTest.java
new file mode 100644
index 0000000000..c32e2e1f11
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogPrimaryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class PaimonCatalogPrimaryTest {
+
+    private PaimonCatalog paimonCatalog;
+    private Catalog catalog;
+    private final String DATABASE_NAME = "default";
+    private final String CATALOG_NAME = "paimon_catalog";
+    private final String TABLE_NAME = "test_table";
+    private final String WAREHOUSE_PATH = "/tmp/paimon";
+    private final Identifier identifier = Identifier.create(DATABASE_NAME, 
TABLE_NAME);
+
+    @BeforeEach
+    public void before()
+            throws Catalog.DatabaseAlreadyExistException, 
Catalog.TableAlreadyExistException,
+                    Catalog.DatabaseNotExistException {
+        CatalogContext catalogContext = CatalogContext.create(new 
Path(WAREHOUSE_PATH));
+        catalog = CatalogFactory.createCatalog(catalogContext);
+        catalog.createDatabase(DATABASE_NAME, true);
+
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("id", DataTypes.SMALLINT());
+        schemaBuilder.column("name", DataTypes.STRING());
+        schemaBuilder.column("age", DataTypes.TINYINT());
+        schemaBuilder.primaryKey("id", "name");
+        catalog.createTable(identifier, schemaBuilder.build(), true);
+
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("warehouse", "/tmp/paimon");
+        properties.put("plugin_name", "Paimon");
+        properties.put("database", DATABASE_NAME);
+        properties.put("table", TABLE_NAME);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
+        paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
+        paimonCatalog.open();
+    }
+
+    @Test
+    public void primaryKey() {
+        CatalogTable catalogTable = 
paimonCatalog.getTable(TablePath.of(DATABASE_NAME, TABLE_NAME));
+        TableSchema tableSchema = catalogTable.getTableSchema();
+        Assertions.assertEquals(
+                tableSchema.getPrimaryKey().getColumnNames(), 
Arrays.asList("id", "name"));
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        catalog.dropTable(identifier, true);
+        catalog.dropDatabase(DATABASE_NAME, true, true);
+        catalog.close();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
index d23d11d9e0..7f0798039a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -41,6 +41,7 @@ sink {
     warehouse = "/tmp/paimon"
     database = "seatunnel_namespace"
     table = "st_test_sink"
+    paimon.table.non-primary-key = true
     paimon.table.write-props = {
       write-only = true
     }

Reply via email to