This is an automated email from the ASF dual-hosted git repository.

chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fcdf96c52d [Fix][Connector-V2] Fix Paimon branch save mode DDL (#10991)
fcdf96c52d is described below

commit fcdf96c52d286a6ce6cb0c953eeb683c021a3f4b
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 3 10:19:12 2026 +0800

    [Fix][Connector-V2] Fix Paimon branch save mode DDL (#10991)
---
 docs/en/connectors/sink/Paimon.md                  |   2 +-
 docs/zh/connectors/sink/Paimon.md                  |   2 +-
 .../paimon/exception/PaimonConnectorErrorCode.java |   4 +-
 .../paimon/handler/PaimonSaveModeHandler.java      |  57 +++-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |   3 +-
 .../AlterPaimonTableSchemaEventHandler.java        |  20 +-
 .../handler/PaimonBranchSaveModeHandlerTest.java   | 314 +++++++++++++++++++++
 7 files changed, 393 insertions(+), 9 deletions(-)

diff --git a/docs/en/connectors/sink/Paimon.md 
b/docs/en/connectors/sink/Paimon.md
index 060bbcff9f..f915f68290 100644
--- a/docs/en/connectors/sink/Paimon.md
+++ b/docs/en/connectors/sink/Paimon.md
@@ -78,7 +78,7 @@ libfb303-xxx.jar
 | paimon.hadoop.conf           | Map     | No       | -                        
    | Properties in hadoop conf                                                 
                                                                                
       |
 | paimon.hadoop.conf-path      | String  | No       | -                        
    | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files                                                           
            |
 | paimon.table.non-primary-key | Boolean | false    | -                        
    | Switch to create `table with PK` or `table without PK`. true : `table 
without PK`, false : `table with PK`                                            
           |
-| branch                       | String  | No       | main                     
    | The branch name of Paimon table to write data to. If the branch does not 
exist, an exception will be thrown.                                             
        |
+| branch                       | String  | No       | main                     
    | The branch name of Paimon table to write data to. For non-main branches, 
the main table and target branch must already exist, and 
`schema_save_mode=RECREATE_SCHEMA` or `data_save_mode=DROP_DATA` is not 
supported. |
 
 
 ## Checkpoint in batch mode
diff --git a/docs/zh/connectors/sink/Paimon.md 
b/docs/zh/connectors/sink/Paimon.md
index 8c3799a80c..3aaa9e18d2 100644
--- a/docs/zh/connectors/sink/Paimon.md
+++ b/docs/zh/connectors/sink/Paimon.md
@@ -77,7 +77,7 @@ libfb303-xxx.jar
 | paimon.hadoop.conf           | Map  | 否    | -                            | 
Hadoop配置文件属性信息                                                                  
                     |
 | paimon.hadoop.conf-path      | 字符串  | 否    | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                     |
 | paimon.table.non-primary-key | Boolean | false    | -                        
    | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表                              
                           |
-| branch                       | 字符串  | 否    | main                         | 
要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。                                             
                    |
+| branch                       | 字符串  | 否    | main                         | 
要写入数据的Paimon表分支名称。非 main 分支要求 main 表和目标分支已存在,且不支持 
`schema_save_mode=RECREATE_SCHEMA` 或 `data_save_mode=DROP_DATA`。 |
 
 ## 批模式下的checkpoint
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index f369b8cf66..6acd6e70fd 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -33,7 +33,9 @@ public enum PaimonConnectorErrorCode implements 
SeaTunnelErrorCode {
     NON_PRIMARY_KEY_CHECK_ERROR(
             "PAIMON-10", "Primary keys should be empty when nonPrimaryKey is 
true"),
     DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is 
incompatible. "),
-    BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");
+    BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. "),
+    UNSUPPORTED_BRANCH_SAVE_MODE(
+            "PAIMON-13", "The save mode is not supported for non-main 
branches. ");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
index a89edc02c5..1d93119041 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -26,10 +26,13 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
 
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.BranchManager;
 
 public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
 
@@ -55,15 +58,67 @@ public class PaimonSaveModeHandler extends 
DefaultSaveModeHandler {
 
     @Override
     public void handleSchemaSaveMode() {
+        checkBranchSaveMode();
         super.handleSchemaSaveMode();
         TablePath tablePath = catalogTable.getTablePath();
         Table paimonTable = ((PaimonCatalog) 
catalog).getPaimonTable(tablePath);
         Table loadTable = this.supportLoadTable.getLoadTable();
         if (loadTable == null || this.schemaSaveMode == 
SchemaSaveMode.RECREATE_SCHEMA) {
-            if (StringUtils.isNotEmpty(branch)) {
+            if (isNonMainBranch()) {
                 paimonTable = ((FileStoreTable) 
paimonTable).switchToBranch(branch);
             }
             this.supportLoadTable.setLoadTable(paimonTable);
         }
     }
+
+    @Override
+    public void handleDataSaveMode() {
+        checkBranchSaveMode();
+        super.handleDataSaveMode();
+    }
+
+    @Override
+    public void handleSchemaSaveModeWithRestore() {
+        checkBranchSaveMode();
+        super.handleSchemaSaveModeWithRestore();
+    }
+
+    private boolean isNonMainBranch() {
+        return StringUtils.isNotEmpty(branch)
+                && !BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch);
+    }
+
+    private void checkBranchSaveMode() {
+        if (!isNonMainBranch()) {
+            return;
+        }
+        if (!catalog.tableExists(tablePath)) {
+            throw unsupportedBranchSaveMode(
+                    "The main table must exist before writing to a non-main 
branch.");
+        }
+        Table paimonTable = ((PaimonCatalog) 
catalog).getPaimonTable(tablePath);
+        if (!((FileStoreTable) 
paimonTable).branchManager().branchExists(branch)) {
+            throw new PaimonConnectorException(
+                    PaimonConnectorErrorCode.BRANCH_NOT_EXISTS,
+                    String.format(
+                            "Specified branch '%s' of table '%s' does not 
exist.",
+                            branch, tablePath));
+        }
+        if (this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) {
+            throw unsupportedBranchSaveMode(
+                    "schema_save_mode=RECREATE_SCHEMA would drop and recreate 
the main table.");
+        }
+        if (this.dataSaveMode == DataSaveMode.DROP_DATA) {
+            throw unsupportedBranchSaveMode(
+                    "data_save_mode=DROP_DATA would truncate the main table.");
+        }
+    }
+
+    private PaimonConnectorException unsupportedBranchSaveMode(String reason) {
+        return new PaimonConnectorException(
+                PaimonConnectorErrorCode.UNSUPPORTED_BRANCH_SAVE_MODE,
+                String.format(
+                        "Paimon branch '%s' does not support this save mode 
for table '%s'. %s",
+                        branch, tablePath, reason));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 2d8fad38c0..076f7796ad 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -268,7 +268,8 @@ public class PaimonSinkWriter
                                 sourceTableSchema,
                                 paimonCatalog,
                                 sinkPaimonTableSchema,
-                                paimonTablePath)
+                                paimonTablePath,
+                                paimonSinkConfig.getBranch())
                         .apply(event);
         reOpenTableWrite();
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
index d37ab21758..e0faa687b2 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
@@ -38,6 +38,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.Preconditions;
 
 import lombok.extern.slf4j.Slf4j;
@@ -61,15 +62,19 @@ public class AlterPaimonTableSchemaEventHandler {
 
     private final TablePath paimonTablePath;
 
+    private final String branch;
+
     public AlterPaimonTableSchemaEventHandler(
             TableSchema sourceTableSchema,
             PaimonCatalog paimonCatalog,
             org.apache.paimon.schema.TableSchema sinkPaimonTableSchema,
-            TablePath paimonTablePath) {
+            TablePath paimonTablePath,
+            String branch) {
         this.sourceTableSchema = sourceTableSchema;
         this.paimonCatalog = paimonCatalog;
         this.sinkPaimonTableSchema = sinkPaimonTableSchema;
         this.paimonTablePath = paimonTablePath;
+        this.branch = branch;
     }
 
     public TableSchema apply(SchemaChangeEvent event) {
@@ -87,9 +92,7 @@ public class AlterPaimonTableSchemaEventHandler {
     }
 
     private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
-        Identifier identifier =
-                Identifier.create(
-                        paimonTablePath.getDatabaseName(), 
paimonTablePath.getTableName());
+        Identifier identifier = toIdentifier();
         if (event instanceof AlterTableAddColumnEvent) {
             AlterTableAddColumnEvent alterTableAddColumnEvent = 
(AlterTableAddColumnEvent) event;
             Column column = alterTableAddColumnEvent.getColumn();
@@ -130,6 +133,15 @@ public class AlterPaimonTableSchemaEventHandler {
         }
     }
 
+    private Identifier toIdentifier() {
+        if (StringUtils.isNotEmpty(branch)
+                && 
!BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch)) {
+            return new Identifier(
+                    paimonTablePath.getDatabaseName(), 
paimonTablePath.getTableName(), branch);
+        }
+        return Identifier.create(paimonTablePath.getDatabaseName(), 
paimonTablePath.getTableName());
+    }
+
     private void updateColumn(
             Column newColumn, String oldColumnName, Identifier identifier, 
String afterTheColumn) {
         BasicTypeDefine<DataType> reconvertColumn = 
PaimonTypeMapper.INSTANCE.reconvert(newColumn);
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java
new file mode 100644
index 0000000000..87540d7b8c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class PaimonBranchSaveModeHandlerTest {
+
+    private static final String CATALOG_NAME = "paimon_catalog";
+    private static final String DATABASE_NAME = "default";
+    private static final String TABLE_NAME = "branch_table";
+    private static final String BRANCH_NAME = "test_branch";
+
+    @TempDir private Path temporaryFolder;
+
+    private PaimonCatalog paimonCatalog;
+    private ReadonlyConfig readonlyConfig;
+    private TablePath tablePath;
+    private CatalogTable catalogTable;
+
+    @BeforeEach
+    public void before() throws Exception {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("warehouse", temporaryFolder.toString());
+        properties.put("plugin_name", "Paimon");
+        properties.put("database", DATABASE_NAME);
+        properties.put("table", TABLE_NAME);
+        properties.put("branch", BRANCH_NAME);
+        properties.put("paimon.table.write-props", new HashMap<String, 
String>());
+        readonlyConfig = ReadonlyConfig.fromMap(properties);
+
+        tablePath = TablePath.of(DATABASE_NAME, TABLE_NAME);
+        catalogTable = createCatalogTable(TABLE_NAME);
+
+        paimonCatalog = new PaimonCatalog(CATALOG_NAME, readonlyConfig);
+        paimonCatalog.open();
+        paimonCatalog.createDatabase(tablePath, true);
+        paimonCatalog.createTable(tablePath, catalogTable, false);
+
+        FileStoreTable table = (FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath);
+        if (!table.branchManager().branchExists(BRANCH_NAME)) {
+            table.createBranch(BRANCH_NAME);
+        }
+    }
+
+    @Test
+    public void 
createSchemaWithBranchShouldFailWithoutCreatingMainTableWhenTableMissing() {
+        String missingTableName = "missing_branch_table";
+        TablePath missingTablePath = TablePath.of(DATABASE_NAME, 
missingTableName);
+        CatalogTable missingCatalogTable = 
createCatalogTable(missingTableName);
+        PaimonSaveModeHandler handler =
+                new PaimonSaveModeHandler(
+                        new TestSupportLoadTable(),
+                        SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+                        DataSaveMode.APPEND_DATA,
+                        paimonCatalog,
+                        missingCatalogTable,
+                        null,
+                        BRANCH_NAME);
+
+        PaimonConnectorException exception =
+                Assertions.assertThrows(
+                        PaimonConnectorException.class, 
handler::handleSchemaSaveMode);
+        Assertions.assertTrue(
+                exception.getMessage().contains("main table must exist"),
+                "The error message should explain that branch writes require 
an existing table.");
+        Assertions.assertFalse(
+                paimonCatalog.tableExists(missingTablePath),
+                "Branch save mode must not auto-create a main table.");
+    }
+
+    @Test
+    public void createSchemaWithBranchShouldFailWhenBranchMissing() {
+        String tableWithoutBranch = "table_without_branch";
+        TablePath tableWithoutBranchPath = TablePath.of(DATABASE_NAME, 
tableWithoutBranch);
+        CatalogTable tableWithoutBranchCatalogTable = 
createCatalogTable(tableWithoutBranch);
+        paimonCatalog.createTable(tableWithoutBranchPath, 
tableWithoutBranchCatalogTable, false);
+        PaimonSaveModeHandler handler =
+                new PaimonSaveModeHandler(
+                        new TestSupportLoadTable(),
+                        SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+                        DataSaveMode.APPEND_DATA,
+                        paimonCatalog,
+                        tableWithoutBranchCatalogTable,
+                        null,
+                        BRANCH_NAME);
+
+        PaimonConnectorException exception =
+                Assertions.assertThrows(
+                        PaimonConnectorException.class, 
handler::handleSchemaSaveMode);
+        Assertions.assertTrue(
+                exception.getMessage().contains("does not exist"),
+                "The error message should explain that the branch does not 
exist.");
+        
Assertions.assertTrue(paimonCatalog.tableExists(tableWithoutBranchPath));
+        Assertions.assertFalse(
+                ((FileStoreTable) 
paimonCatalog.getPaimonTable(tableWithoutBranchPath))
+                        .branchManager()
+                        .branchExists(BRANCH_NAME),
+                "Branch save mode must not auto-create the branch.");
+    }
+
+    @Test
+    public void dropDataWithBranchShouldFailWithoutDroppingMainTableOrBranch() 
{
+        PaimonSaveModeHandler handler =
+                new PaimonSaveModeHandler(
+                        new TestSupportLoadTable(),
+                        SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+                        DataSaveMode.DROP_DATA,
+                        paimonCatalog,
+                        catalogTable,
+                        null,
+                        BRANCH_NAME);
+
+        PaimonConnectorException exception =
+                Assertions.assertThrows(
+                        PaimonConnectorException.class, 
handler::handleSchemaSaveMode);
+        Assertions.assertTrue(
+                exception.getMessage().contains("data_save_mode=DROP_DATA"),
+                "The error message should explain the unsupported save mode.");
+
+        FileStoreTable mainTable = (FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath);
+        Assertions.assertTrue(
+                mainTable.branchManager().branchExists(BRANCH_NAME),
+                "DROP_DATA on a branch must not delete branch metadata.");
+    }
+
+    @Test
+    public void 
recreateSchemaWithBranchShouldFailWithoutDroppingMainTableOrBranch() {
+        PaimonSaveModeHandler handler =
+                new PaimonSaveModeHandler(
+                        new TestSupportLoadTable(),
+                        SchemaSaveMode.RECREATE_SCHEMA,
+                        DataSaveMode.APPEND_DATA,
+                        paimonCatalog,
+                        catalogTable,
+                        null,
+                        BRANCH_NAME);
+
+        PaimonConnectorException exception =
+                Assertions.assertThrows(
+                        PaimonConnectorException.class, 
handler::handleSchemaSaveMode);
+        Assertions.assertTrue(
+                
exception.getMessage().contains("schema_save_mode=RECREATE_SCHEMA"),
+                "The error message should explain the unsupported save mode.");
+
+        FileStoreTable mainTable = (FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath);
+        Assertions.assertTrue(
+                mainTable.branchManager().branchExists(BRANCH_NAME),
+                "RECREATE_SCHEMA on a branch must not delete branch 
metadata.");
+    }
+
+    @Test
+    public void 
restoreSchemaSaveModeWithBranchShouldFailForDestructiveSaveMode() {
+        PaimonSaveModeHandler handler =
+                new PaimonSaveModeHandler(
+                        new TestSupportLoadTable(),
+                        SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+                        DataSaveMode.DROP_DATA,
+                        paimonCatalog,
+                        catalogTable,
+                        null,
+                        BRANCH_NAME);
+
+        PaimonConnectorException exception =
+                Assertions.assertThrows(
+                        PaimonConnectorException.class, 
handler::handleSchemaSaveModeWithRestore);
+        Assertions.assertTrue(
+                exception.getMessage().contains("data_save_mode=DROP_DATA"),
+                "The error message should explain the unsupported save mode.");
+    }
+
+    @Test
+    public void schemaEvolutionWithBranchShouldAlterBranchSchemaOnly() throws 
Exception {
+        PaimonSinkConfig sinkConfig = new PaimonSinkConfig(readonlyConfig);
+        PaimonSinkWriter writer =
+                new PaimonSinkWriter(
+                        new TestSinkWriterContext(),
+                        readonlyConfig,
+                        catalogTable,
+                        ((FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath))
+                                .switchToBranch(BRANCH_NAME),
+                        UUID.randomUUID().toString(),
+                        null,
+                        sinkConfig,
+                        new PaimonHadoopConfiguration(),
+                        new PaimonBucketAssignerFactory());
+        try {
+            writer.applySchemaChange(
+                    AlterTableAddColumnEvent.add(
+                            catalogTable.getTableId(),
+                            PhysicalColumn.of(
+                                    "branch_only_column",
+                                    BasicType.STRING_TYPE,
+                                    (Long) null,
+                                    true,
+                                    null,
+                                    null)));
+        } finally {
+            writer.close();
+        }
+
+        FileStoreTable mainTable = (FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath);
+        FileStoreTable branchTable = mainTable.switchToBranch(BRANCH_NAME);
+        Assertions.assertFalse(
+                mainTable.schema().fieldNames().contains("branch_only_column"),
+                "Schema evolution configured with a branch must not alter the 
main schema.");
+        Assertions.assertTrue(
+                
branchTable.schema().fieldNames().contains("branch_only_column"),
+                "Schema evolution configured with a branch must alter the 
branch schema.");
+    }
+
+    @AfterEach
+    public void after() {
+        if (paimonCatalog != null) {
+            paimonCatalog.close();
+        }
+    }
+
+    private static TableSchema baseSchema() {
+        return TableSchema.builder()
+                .column(PhysicalColumn.of("id", BasicType.INT_TYPE, (Long) 
null, false, null, null))
+                .primaryKey(
+                        org.apache.seatunnel.api.table.catalog.PrimaryKey.of(
+                                "pk", Collections.singletonList("id")))
+                .build();
+    }
+
+    private static CatalogTable createCatalogTable(String tableName) {
+        return CatalogTable.of(
+                TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, tableName),
+                baseSchema(),
+                new HashMap<>(),
+                Collections.emptyList(),
+                "branch test table");
+    }
+
+    private static class TestSupportLoadTable implements 
SupportLoadTable<Table> {
+        private Table table;
+
+        @Override
+        public void setLoadTable(Table table) {
+            this.table = table;
+        }
+
+        @Override
+        public Table getLoadTable() {
+            return table;
+        }
+    }
+
+    private static class TestSinkWriterContext
+            implements org.apache.seatunnel.api.sink.SinkWriter.Context {
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
+        @Override
+        public org.apache.seatunnel.api.common.metrics.MetricsContext 
getMetricsContext() {
+            return null;
+        }
+
+        @Override
+        public org.apache.seatunnel.api.event.EventListener getEventListener() 
{
+            return null;
+        }
+    }
+}

Reply via email to