This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 96b26a68dc [Feature][Connectors-v2]Paimon version upgrade to 1.1.1 
(#8074)
96b26a68dc is described below

commit 96b26a68dc34986ccfc8cc72924c30337bbb1a2a
Author: zhangdonghao <[email protected]>
AuthorDate: Fri Aug 1 15:47:31 2025 +0800

    [Feature][Connectors-v2]Paimon version upgrade to 1.1.1 (#8074)
---
 docs/en/connector-v2/sink/Paimon.md                | 38 ++++++++++--
 docs/en/connector-v2/source/Paimon.md              | 27 ++++++++
 docs/zh/connector-v2/sink/Paimon.md                | 27 ++++++++
 docs/zh/connector-v2/source/Paimon.md              | 27 ++++++++
 seatunnel-connectors-v2/connector-paimon/pom.xml   |  2 +-
 .../seatunnel/paimon/catalog/PaimonCatalog.java    | 25 ++++++--
 .../seatunnel/paimon/sink/PaimonSink.java          |  1 +
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |  6 +-
 .../sink/bucket/FixedBucketRowKeyExtractor.java    | 72 ----------------------
 .../paimon/sink/bucket/PaimonBucketAssigner.java   |  5 +-
 .../paimon/source/PaimonSourceReader.java          |  6 +-
 .../sink/bucket/PaimonBucketAssignerTest.java      |  3 +-
 .../seatunnel/e2e/connector/paimon/PaimonIT.java   |  5 +-
 .../connector/paimon/PaimonRecordWithFullType.java |  2 +-
 .../paimon/PaimonSinkDynamicBucketIT.java          |  6 +-
 .../paimon/PaimonSinkWithSchemaEvolutionIT.java    |  8 ++-
 .../e2e/connector/paimon/PaimonStreamReadIT.java   |  4 +-
 .../test/resources/fake_cdc_sink_paimon_case9.conf |  2 +-
 .../mysql_cdc_to_paimon_with_schema_change.conf    |  4 +-
 .../src/test/resources/paimon_to_paimon.conf       |  1 +
 .../container/seatunnel/SeaTunnelContainer.java    |  5 +-
 21 files changed, 175 insertions(+), 101 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 9f126895c9..91514365a6 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -8,6 +8,33 @@ import ChangeLog from '../changelog/connector-paimon.md';
 
 Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
 
+### Comparison between Seatunnel and Paimon vsrsion
+
+| Seatunnel Version | Paimon Version   |
+|-------------------|------------------|
+| 2.3.2  -  2.3.3   | 0.4-SNAPSHOT     |
+| 2.3.4             | 0.6-SNAPSHOT     |
+| 2.3.5  -  2.3.11  | 0.7.0-incubating |
+| 2.3.12            | 1.1.1            |
+
+### Key Considerations for Upgrading Paimon from `0.7.0-incubating` to `1.1.1`
+
+1. **Backup Recommendations**
+   Although compatibility is ensured, it is strongly recommended to backup 
critical data, especially the metadata directory, before initiating the upgrade.
+2. **Gradual Upgrade Process**
+   - **Test Environment Validation**: First validate the upgrade process in a 
staging environment.
+   - **Update JAR Files**: Replace Paimon JAR files with version 1.1.1.
+   - **Automatic Format Upgrade**: The system will automatically detect and 
upgrade older file formats.
+3. **Configuration Check**
+   Review your configurations to ensure no deprecated options are in use. 
While most configurations remain backward-compatible, deprecated settings may 
require updates.
+4. **Post-Upgrade Validation**
+   Verify the following after upgrading:
+   - **Read/Write Operations**: Ensure data ingestion and retrieval workflows 
function normally.
+   - **Query Performance**: Confirm that query response times meet 
expectations.
+   - **New Feature Verification**: Test all newly introduced features (e.g., 
time travel, enhanced compaction) to ensure proper functionality.
+
+**Note**: These steps help minimize risks and ensure a smooth transition to 
the stable version 1.1.1.
+
 ## Supported DataSource Info
 
 | Datasource | Dependent |                                   Maven             
                      |
@@ -82,11 +109,12 @@ Cdc Ingestion supports a limited number of schema changes. 
Currently supported s
 
 * Modify column. More specifically, If you modify the column type, the 
following changes are supported:
 
-    * altering from a string type (char, varchar, text) to another string type 
with longer length,
-    * altering from a binary type (binary, varbinary, blob) to another binary 
type with longer length,
-    * altering from an integer type (tinyint, smallint, int, bigint) to 
another integer type with wider range,
-    * altering from a floating-point type (float, double) to another 
floating-point type with wider range,
-  
+  * altering from a string type (char, varchar, text) to another string type 
with longer length,
+  * altering from a binary type (binary, varbinary, blob) to another binary 
type with longer length,
+  * altering from an integer type (tinyint, smallint, int, bigint) to another 
integer type with wider range,
+  * altering from a floating-point type (float, double) to another 
floating-point type with wider range,
+    
+
   are supported. 
   > Note:
   > 
diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index 648be40dd2..f3b268044c 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -8,6 +8,33 @@ import ChangeLog from '../changelog/connector-paimon.md';
 
 Read data from Apache Paimon.
 
+### Comparison between Seatunnel and Paimon vsrsion
+
+| Seatunnel Version | Paimon Version   |
+|-------------------|------------------|
+| 2.3.2  -  2.3.3   | 0.4-SNAPSHOT     |
+| 2.3.4             | 0.6-SNAPSHOT     |
+| 2.3.5  -  2.3.11  | 0.7.0-incubating |
+| 2.3.12            | 1.1.1            |
+
+### Key Considerations for Upgrading Paimon from `0.7.0-incubating` to `1.1.1`
+
+1. **Backup Recommendations**
+   Although compatibility is ensured, it is strongly recommended to backup 
critical data, especially the metadata directory, before initiating the upgrade.
+2. **Gradual Upgrade Process**
+    - **Test Environment Validation**: First validate the upgrade process in a 
staging environment.
+    - **Update JAR Files**: Replace Paimon JAR files with version 1.1.1.
+    - **Automatic Format Upgrade**: The system will automatically detect and 
upgrade older file formats.
+3. **Configuration Check**
+   Review your configurations to ensure no deprecated options are in use. 
While most configurations remain backward-compatible, deprecated settings may 
require updates.
+4. **Post-Upgrade Validation**
+   Verify the following after upgrading:
+    - **Read/Write Operations**: Ensure data ingestion and retrieval workflows 
function normally.
+    - **Query Performance**: Confirm that query response times meet 
expectations.
+    - **New Feature Verification**: Test all newly introduced features (e.g., 
time travel, enhanced compaction) to ensure proper functionality.
+
+**Note**: These steps help minimize risks and ensure a smooth transition to 
the stable version 1.1.1.
+
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index a386c903dc..73b3556a44 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -8,6 +8,33 @@ import ChangeLog from '../changelog/connector-paimon.md';
 
 Apache Paimon数据连接器。支持cdc写以及自动建表。
 
+### Seatunnmel与Paimon版本对照
+
+| Seatunnel Version | Paimon Version   |
+|-------------------|------------------|
+| 2.3.2  -  2.3.3   | 0.4-SNAPSHOT     |
+| 2.3.4             | 0.6-SNAPSHOT     |
+| 2.3.5  -  2.3.11  | 0.7.0-incubating |
+| 2.3.12            | 1.1.1            |
+
+### 从 0.7 版本升级到 1.1.1 版本的注意事项
+
+1. **备份建议**
+   尽管存在兼容性保障,但在从 0.7 版本开始升级前,仍强烈建议备份关键数据,尤其是元数据目录。
+2. **逐步升级流程**
+   - **测试环境验证**:首先在测试环境中验证(从 0.7 版本开始的)升级过程。
+   - **更新 JAR 文件**:将 Paimon 的 JAR 文件替换为 1.1.1 版本。
+   - **自动格式升级**:系统会自动识别并升级 0.7 版本中使用的文件格式。
+3. **配置检查**
+   检查配置以确认是否存在 0.7 版本适用的已弃用选项。尽管大多数配置保持向后兼容,但已弃用的设置可能需要更新以适配 1.1.1 版本。
+4. **升级后验证**
+   从 0.7 版本升级到 1.1.1 版本后,需验证以下内容:
+   - **读写操作**:确保基于 0.7 版本继承的数据结构,数据写入和读取流程正常运行。
+   - **查询性能**:考虑到 0.7 与 1.1.1 版本间底层机制(如分桶管理)的变化,确认查询响应时间符合预期。
+   - **新功能验证**:测试所有新增功能(如增强的压实机制、时间旅行等),确保其与从 0.7 版本迁移的数据兼容并正常工作。
+
+**注意**:遵循这些步骤有助于降低风险,确保从 0.7 版本平稳过渡到稳定版本 1.1.1。
+
 ## 支持的数据源信息
 
 |  数据源   |    依赖     |                                   Maven                 
                  |
diff --git a/docs/zh/connector-v2/source/Paimon.md 
b/docs/zh/connector-v2/source/Paimon.md
index 8ce84cf533..df868cff63 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -8,6 +8,33 @@ import ChangeLog from '../changelog/connector-paimon.md';
 
 用于从 `Apache Paimon` 读取数据
 
+### Seatunnmel与Paimon版本对照
+
+| Seatunnel Version | Paimon Version   |
+|-------------------|------------------|
+| 2.3.2  -  2.3.3   | 0.4-SNAPSHOT     |
+| 2.3.4             | 0.6-SNAPSHOT     |
+| 2.3.5  -  2.3.11  | 0.7.0-incubating |
+| 2.3.12            | 1.1.1            |
+
+### 从 0.7 版本升级到 1.1.1 版本的注意事项
+
+1. **备份建议**
+   尽管存在兼容性保障,但在从 0.7 版本开始升级前,仍强烈建议备份关键数据,尤其是元数据目录。
+2. **逐步升级流程**
+    - **测试环境验证**:首先在测试环境中验证(从 0.7 版本开始的)升级过程。
+    - **更新 JAR 文件**:将 Paimon 的 JAR 文件替换为 1.1.1 版本。
+    - **自动格式升级**:系统会自动识别并升级 0.7 版本中使用的文件格式。
+3. **配置检查**
+   检查配置以确认是否存在 0.7 版本适用的已弃用选项。尽管大多数配置保持向后兼容,但已弃用的设置可能需要更新以适配 1.1.1 版本。
+4. **升级后验证**
+   从 0.7 版本升级到 1.1.1 版本后,需验证以下内容:
+    - **读写操作**:确保基于 0.7 版本继承的数据结构,数据写入和读取流程正常运行。
+    - **查询性能**:考虑到 0.7 与 1.1.1 版本间底层机制(如分桶管理)的变化,确认查询响应时间符合预期。
+    - **新功能验证**:测试所有新增功能(如增强的压实机制、时间旅行等),确保其与从 0.7 版本迁移的数据兼容并正常工作。
+
+**注意**:遵循这些步骤有助于降低风险,确保从 0.7 版本平稳过渡到稳定版本 1.1.1。
+
 ## 主要功能
 
 - [x] [批处理](../../concept/connector-v2-features.md)
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml 
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 0cd3f535d0..1a66880690 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -30,7 +30,7 @@
     <name>SeaTunnel : Connectors V2 : Paimon</name>
 
     <properties>
-        <paimon.version>0.7.0-incubating</paimon.version>
+        <paimon.version>1.1.1</paimon.version>
         <hive.version>2.3.9</hive.version>
         <connector.name>connector.paimon</connector.name>
     </properties>
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 63523f7c2a..5186ac955a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -39,6 +39,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
 import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -52,6 +53,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -104,7 +107,8 @@ public class PaimonCatalog implements Catalog, PaimonTable {
 
     @Override
     public boolean databaseExists(String databaseName) throws CatalogException 
{
-        return catalog.databaseExists(databaseName);
+        List<String> listDatabases = catalog.listDatabases();
+        return listDatabases.contains(databaseName);
     }
 
     @Override
@@ -124,7 +128,16 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
 
     @Override
     public boolean tableExists(TablePath tablePath) throws CatalogException {
-        return catalog.tableExists(toIdentifier(tablePath));
+        Identifier identifier = toIdentifier(tablePath);
+        List<String> tables = new ArrayList<>();
+        try {
+            if (databaseExists(identifier.getDatabaseName())) {
+                tables = catalog.listTables(identifier.getDatabaseName());
+            }
+        } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException 
e) {
+            return false;
+        }
+        return tables.contains(identifier.getTableName());
     }
 
     @Override
@@ -219,7 +232,9 @@ public class PaimonCatalog implements Catalog, PaimonTable {
         Schema.Builder builder = Schema.newBuilder();
         schema.fields()
                 .forEach(field -> builder.column(field.name(), field.type(), 
field.description()));
-        builder.options(schema.options());
+        Map<String, String> options = new HashMap<>(schema.options());
+        options.remove(CoreOptions.PATH.key());
+        builder.options(options);
         builder.primaryKey(schema.primaryKeys());
         builder.partitionKeys(schema.partitionKeys());
         builder.comment(schema.comment());
@@ -303,7 +318,9 @@ public class PaimonCatalog implements Catalog, PaimonTable {
             }
         } else if (cause instanceof RuntimeException) {
             String message = cause.getMessage();
-            if (message.contains("Cannot define 'bucket-key' in unaware or 
dynamic bucket mode.")) {
+            // 
https://github.com/apache/paimon/pull/3320/files#diff-d3e068ea8caf83d2371f0eaa1cbf3d02ff06e1c1cdceec5fab2e065cecd96230
+            if (message.contains(
+                    "Cannot define 'bucket-key' with bucket -1, please specify 
a bucket number.")) {
                 throw new PaimonConnectorException(
                         PaimonConnectorErrorCode.WRITE_PROPS_BUCKET_KEY_ERROR, 
message);
             }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 44869f5793..29db1001ab 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -118,6 +118,7 @@ public class PaimonSink
                 readonlyConfig,
                 catalogTable,
                 paimonTable,
+                states,
                 jobContext,
                 paimonSinkConfig,
                 paimonHadoopConfiguration,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 08d60c0507..36c9010588 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -161,13 +161,13 @@ public class PaimonSinkWriter
         // starting job. For tables with a large amount of data, there will be 
a significant loss in
         // performance. Moreover, initialization takes a long time. This mode 
is not supported at
         // this time.
-        if (BucketMode.GLOBAL_DYNAMIC == bucketMode) {
+        if (BucketMode.CROSS_PARTITION == bucketMode) {
             throw new UnsupportedOperationException(
                     "Cross Partitions Upsert Dynamic Bucket Mode is not 
supported.");
         }
-        this.dynamicBucket = BucketMode.DYNAMIC == bucketMode;
+        this.dynamicBucket = BucketMode.HASH_DYNAMIC == bucketMode;
         int bucket = ((FileStoreTable) paimonTable).coreOptions().bucket();
-        if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
+        if (bucket == -1 && BucketMode.BUCKET_UNAWARE == bucketMode) {
             log.warn("Append only table currently do not support dynamic 
bucket");
         }
         if (dynamicBucket) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
deleted file mode 100644
index 8a2a453cb8..0000000000
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/FixedBucketRowKeyExtractor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-import org.apache.paimon.table.sink.RowKeyExtractor;
-
-public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
-    private final int numBuckets;
-    private final boolean sameBucketKeyAndTrimmedPrimaryKey;
-    private final Projection bucketKeyProjection;
-    private BinaryRow reuseBucketKey;
-    private Integer reuseBucket;
-
-    public FixedBucketRowKeyExtractor(TableSchema schema) {
-        super(schema);
-        this.numBuckets = (new CoreOptions(schema.options())).bucket();
-        this.sameBucketKeyAndTrimmedPrimaryKey =
-                schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
-        this.bucketKeyProjection =
-                CodeGenUtils.newProjection(
-                        schema.logicalRowType(), 
schema.projection(schema.bucketKeys()));
-    }
-
-    public void setRecord(InternalRow record) {
-        super.setRecord(record);
-        this.reuseBucketKey = null;
-        this.reuseBucket = null;
-    }
-
-    private BinaryRow bucketKey() {
-        if (this.sameBucketKeyAndTrimmedPrimaryKey) {
-            return this.trimmedPrimaryKey();
-        } else {
-            if (this.reuseBucketKey == null) {
-                this.reuseBucketKey = 
this.bucketKeyProjection.apply(this.record);
-            }
-            return this.reuseBucketKey;
-        }
-    }
-
-    public int bucket() {
-        BinaryRow bucketKey = this.bucketKey();
-        if (this.reuseBucket == null) {
-            this.reuseBucket =
-                    KeyAndBucketExtractor.bucket(
-                            
KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), this.numBuckets);
-        }
-
-        return this.reuseBucket;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
index f7919fc4b4..d4f4ffdaa2 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
@@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.HashBucketAssigner;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 
 public class PaimonBucketAssigner {
 
@@ -34,6 +35,7 @@ public class PaimonBucketAssigner {
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         this.extractor = new 
FixedBucketRowKeyExtractor(fileStoreTable.schema());
         long dynamicBucketTargetRowNum = 
fileStoreTable.coreOptions().dynamicBucketTargetRowNum();
+        Integer maxBucketsNum = 
fileStoreTable.coreOptions().dynamicBucketMaxBuckets();
         this.hashBucketAssigner =
                 new HashBucketAssigner(
                         fileStoreTable.snapshotManager(),
@@ -42,7 +44,8 @@ public class PaimonBucketAssigner {
                         numAssigners,
                         numAssigners,
                         assignId,
-                        dynamicBucketTargetRowNum);
+                        dynamicBucketTargetRowNum,
+                        maxBucketsNum);
         this.isRunning = true;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 50de479b4b..36ad1f68f3 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -78,9 +78,9 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
             if (Objects.nonNull(split)) {
                 // read logic
                 try (final RecordReader<InternalRow> reader =
-                        
tableRead.executeFilter().createReader(split.getSplit())) {
-                    final RecordReaderIterator<InternalRow> rowIterator =
-                            new RecordReaderIterator<>(reader);
+                                
tableRead.executeFilter().createReader(split.getSplit());
+                        final RecordReaderIterator<InternalRow> rowIterator =
+                                new RecordReaderIterator<>(reader)) {
                     while (rowIterator.hasNext()) {
                         final InternalRow row = rowIterator.next();
                         final SeaTunnelRow seaTunnelRow =
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
index fd7fdef816..157d0c90ca 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java
@@ -58,7 +58,8 @@ public class PaimonBucketAssignerTest {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
         catalog.createDatabase(DATABASE_NAME, true);
         Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
-        if (!catalog.tableExists(identifier)) {
+        List<String> tables = catalog.listTables(DATABASE_NAME);
+        if (!tables.contains(identifier.getTableName())) {
             Schema.Builder schemaBuilder = Schema.newBuilder();
             schemaBuilder.column("id", DataTypes.INT(), "primary Key");
             schemaBuilder.column("name", DataTypes.STRING(), "name");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index 7ea21f3020..9d07787bca 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -36,8 +36,9 @@ import java.io.IOException;
 import java.nio.file.Path;
 
 @DisabledOnContainer(
-        value = TestContainerId.FLINK_1_13,
-        disabledReason = "Paimon does not support flink 1.13")
+        value = {TestContainerId.FLINK_1_13, TestContainerId.SPARK_2_4},
+        disabledReason =
+                "Paimon does not support flink 1.13, Spark 2.4.6 has a jar 
package(zstd-jni-version.jar) version compatibility issue.")
 public class PaimonIT extends TestSuiteBase implements TestResource {
 
     @TestContainerExtension
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
index ae3609ee30..a8d85db8da 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java
@@ -36,7 +36,7 @@ public class PaimonRecordWithFullType {
     public int[] c_array;
     public BinaryString c_string;
     public boolean c_boolean;
-    public short c_tinyint;
+    public byte c_tinyint;
     public short c_smallint;
     public int c_int;
     public long c_bigint;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index f247a6461e..6848c8d566 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -312,13 +312,15 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
     @Disabled(
             "Spark and Flink engine can not auto create paimon table on worker 
node in local file, this e2e case work on hdfs environment, please set up your 
own HDFS environment in the test case file and the below setup")
     public void testPaimonBucketCountOnSparkAndFlink(TestContainer container)
-            throws IOException, InterruptedException, 
Catalog.TableNotExistException {
+            throws IOException, InterruptedException, 
Catalog.TableNotExistException,
+                    Catalog.DatabaseNotExistException {
         PaimonSinkConfig paimonSinkConfig =
                 new 
PaimonSinkConfig(ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
         PaimonCatalogLoader paimonCatalogLoader = new 
PaimonCatalogLoader(paimonSinkConfig);
         Catalog catalog = paimonCatalogLoader.loadCatalog();
         Identifier identifier = Identifier.create("default", "st_test_5");
-        if (catalog.tableExists(identifier)) {
+        List<String> tables = catalog.listTables(identifier.getDatabaseName());
+        if (tables.contains(identifier.getTableName())) {
             catalog.dropTable(identifier, true);
         }
         Container.ExecResult textWriteResult =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index 3a81eb5196..73bf50df42 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -70,6 +70,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -330,7 +331,8 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
     private void vertifySchemaAndData(
             TestContainer container,
             List<ImmutableTriple<String[], Integer, Integer>> 
idRangesWithFiledProjection) {
-        await().atMost(30, TimeUnit.SECONDS)
+        await().pollDelay(3, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(
                         () -> {
                             // 1. Vertify the schema
@@ -443,7 +445,9 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
                         results.add(rowRecords);
                     });
         }
-        return results;
+        return results.stream()
+                .sorted(Comparator.comparing(o -> 
Integer.valueOf(o.get(0).toString())))
+                .collect(Collectors.toList());
     }
 
     private Predicate getPredicateWithBound(int lowerBound, int upperBound, 
FileStoreTable table) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
index 985029a792..61bdfe2b5a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
@@ -74,6 +74,7 @@ public class PaimonStreamReadIT extends AbstractPaimonIT 
implements TestResource
 
         given().ignoreExceptions()
                 .await()
+                .pollDelay(20L, TimeUnit.SECONDS)
                 .atLeast(100L, TimeUnit.MILLISECONDS)
                 .atMost(400L, TimeUnit.SECONDS)
                 .untilAsserted(
@@ -94,6 +95,7 @@ public class PaimonStreamReadIT extends AbstractPaimonIT 
implements TestResource
 
         given().ignoreExceptions()
                 .await()
+                .pollDelay(20L, TimeUnit.SECONDS)
                 .atLeast(100L, TimeUnit.MILLISECONDS)
                 .atMost(400L, TimeUnit.SECONDS)
                 .untilAsserted(
@@ -133,7 +135,7 @@ public class PaimonStreamReadIT extends AbstractPaimonIT 
implements TestResource
                                         intArray,
                                         row.getString(2),
                                         row.getBoolean(3),
-                                        row.getShort(4),
+                                        row.getByte(4),
                                         row.getShort(5),
                                         row.getInt(6),
                                         row.getLong(7),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
index 090948f656..58ef0a2932 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
@@ -19,7 +19,7 @@
 ######
 
 env {
-  parallelism = 2
+  parallelism = 1
   job.mode = "BATCH"
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
index 61082e6e98..ccc500cd97 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
@@ -34,7 +34,6 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-
     schema-changes.enabled = true
   }
 }
@@ -44,5 +43,8 @@ sink {
     warehouse = "file:///tmp/seatunnel_mnt/paimon"
     database = "mysql_to_paimon"
     table = "products"
+    paimon.table.write-props = {
+      file.format = orc
+    }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
index 5daa2b8a68..47241437ec 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
@@ -18,6 +18,7 @@
 env {
   parallelism = 1
   job.mode = "Streaming"
+  checkpoint.interval = 5000
 }
 
 source {
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 6397773294..354a408745 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -497,7 +497,10 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 // JNA Cleaner
                 || threadName.startsWith("JNA Cleaner")
                 // GRPC client
-                || threadName.startsWith("grpc");
+                || threadName.startsWith("grpc")
+                // Paimon
+                || threadName.startsWith("AsyncOutputStream")
+                || threadName.startsWith("MANIFEST-READ-THREAD-POOL");
     }
 
     @Override


Reply via email to