This is an automated email from the ASF dual-hosted git repository.
chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fcdf96c52d [Fix][Connector-V2] Fix Paimon branch save mode DDL (#10991)
fcdf96c52d is described below
commit fcdf96c52d286a6ce6cb0c953eeb683c021a3f4b
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 3 10:19:12 2026 +0800
[Fix][Connector-V2] Fix Paimon branch save mode DDL (#10991)
---
docs/en/connectors/sink/Paimon.md | 2 +-
docs/zh/connectors/sink/Paimon.md | 2 +-
.../paimon/exception/PaimonConnectorErrorCode.java | 4 +-
.../paimon/handler/PaimonSaveModeHandler.java | 57 +++-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 3 +-
.../AlterPaimonTableSchemaEventHandler.java | 20 +-
.../handler/PaimonBranchSaveModeHandlerTest.java | 314 +++++++++++++++++++++
7 files changed, 393 insertions(+), 9 deletions(-)
diff --git a/docs/en/connectors/sink/Paimon.md
b/docs/en/connectors/sink/Paimon.md
index 060bbcff9f..f915f68290 100644
--- a/docs/en/connectors/sink/Paimon.md
+++ b/docs/en/connectors/sink/Paimon.md
@@ -78,7 +78,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
| paimon.table.non-primary-key | Boolean | false | -
| Switch to create `table with PK` or `table without PK`. true : `table
without PK`, false : `table with PK`
|
-| branch | String | No | main
| The branch name of Paimon table to write data to. If the branch does not
exist, an exception will be thrown.
|
+| branch | String | No | main
| The branch name of Paimon table to write data to. For non-main branches,
the main table and target branch must already exist, and
`schema_save_mode=RECREATE_SCHEMA` or `data_save_mode=DROP_DATA` is not
supported. |
## Checkpoint in batch mode
diff --git a/docs/zh/connectors/sink/Paimon.md
b/docs/zh/connectors/sink/Paimon.md
index 8c3799a80c..3aaa9e18d2 100644
--- a/docs/zh/connectors/sink/Paimon.md
+++ b/docs/zh/connectors/sink/Paimon.md
@@ -77,7 +77,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | 否 | - |
Hadoop配置文件属性信息
|
| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
| paimon.table.non-primary-key | Boolean | false | -
| 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表
|
-| branch | 字符串 | 否 | main |
要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。
|
+| branch | 字符串 | 否 | main |
要写入数据的Paimon表分支名称。非 main 分支要求 main 表和目标分支已存在,且不支持
`schema_save_mode=RECREATE_SCHEMA` 或 `data_save_mode=DROP_DATA`。 |
## 批模式下的checkpoint
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index f369b8cf66..6acd6e70fd 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -33,7 +33,9 @@ public enum PaimonConnectorErrorCode implements
SeaTunnelErrorCode {
NON_PRIMARY_KEY_CHECK_ERROR(
"PAIMON-10", "Primary keys should be empty when nonPrimaryKey is
true"),
DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is
incompatible. "),
- BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");
+ BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. "),
+ UNSUPPORTED_BRANCH_SAVE_MODE(
+ "PAIMON-13", "The save mode is not supported for non-main
branches. ");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
index a89edc02c5..1d93119041 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -26,10 +26,13 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.BranchManager;
public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
@@ -55,15 +58,67 @@ public class PaimonSaveModeHandler extends
DefaultSaveModeHandler {
@Override
public void handleSchemaSaveMode() {
+ checkBranchSaveMode();
super.handleSchemaSaveMode();
TablePath tablePath = catalogTable.getTablePath();
Table paimonTable = ((PaimonCatalog)
catalog).getPaimonTable(tablePath);
Table loadTable = this.supportLoadTable.getLoadTable();
if (loadTable == null || this.schemaSaveMode ==
SchemaSaveMode.RECREATE_SCHEMA) {
- if (StringUtils.isNotEmpty(branch)) {
+ if (isNonMainBranch()) {
paimonTable = ((FileStoreTable)
paimonTable).switchToBranch(branch);
}
this.supportLoadTable.setLoadTable(paimonTable);
}
}
+
+ @Override
+ public void handleDataSaveMode() {
+ checkBranchSaveMode();
+ super.handleDataSaveMode();
+ }
+
+ @Override
+ public void handleSchemaSaveModeWithRestore() {
+ checkBranchSaveMode();
+ super.handleSchemaSaveModeWithRestore();
+ }
+
+ private boolean isNonMainBranch() {
+ return StringUtils.isNotEmpty(branch)
+ && !BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch);
+ }
+
+ private void checkBranchSaveMode() {
+ if (!isNonMainBranch()) {
+ return;
+ }
+ if (!catalog.tableExists(tablePath)) {
+ throw unsupportedBranchSaveMode(
+ "The main table must exist before writing to a non-main
branch.");
+ }
+ Table paimonTable = ((PaimonCatalog)
catalog).getPaimonTable(tablePath);
+ if (!((FileStoreTable)
paimonTable).branchManager().branchExists(branch)) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.BRANCH_NOT_EXISTS,
+ String.format(
+ "Specified branch '%s' of table '%s' does not
exist.",
+ branch, tablePath));
+ }
+ if (this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) {
+ throw unsupportedBranchSaveMode(
+ "schema_save_mode=RECREATE_SCHEMA would drop and recreate
the main table.");
+ }
+ if (this.dataSaveMode == DataSaveMode.DROP_DATA) {
+ throw unsupportedBranchSaveMode(
+ "data_save_mode=DROP_DATA would truncate the main table.");
+ }
+ }
+
+ private PaimonConnectorException unsupportedBranchSaveMode(String reason) {
+ return new PaimonConnectorException(
+ PaimonConnectorErrorCode.UNSUPPORTED_BRANCH_SAVE_MODE,
+ String.format(
+ "Paimon branch '%s' does not support this save mode
for table '%s'. %s",
+ branch, tablePath, reason));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 2d8fad38c0..076f7796ad 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -268,7 +268,8 @@ public class PaimonSinkWriter
sourceTableSchema,
paimonCatalog,
sinkPaimonTableSchema,
- paimonTablePath)
+ paimonTablePath,
+ paimonSinkConfig.getBranch())
.apply(event);
reOpenTableWrite();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
index d37ab21758..e0faa687b2 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
@@ -38,6 +38,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;
import lombok.extern.slf4j.Slf4j;
@@ -61,15 +62,19 @@ public class AlterPaimonTableSchemaEventHandler {
private final TablePath paimonTablePath;
+ private final String branch;
+
public AlterPaimonTableSchemaEventHandler(
TableSchema sourceTableSchema,
PaimonCatalog paimonCatalog,
org.apache.paimon.schema.TableSchema sinkPaimonTableSchema,
- TablePath paimonTablePath) {
+ TablePath paimonTablePath,
+ String branch) {
this.sourceTableSchema = sourceTableSchema;
this.paimonCatalog = paimonCatalog;
this.sinkPaimonTableSchema = sinkPaimonTableSchema;
this.paimonTablePath = paimonTablePath;
+ this.branch = branch;
}
public TableSchema apply(SchemaChangeEvent event) {
@@ -87,9 +92,7 @@ public class AlterPaimonTableSchemaEventHandler {
}
private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
- Identifier identifier =
- Identifier.create(
- paimonTablePath.getDatabaseName(),
paimonTablePath.getTableName());
+ Identifier identifier = toIdentifier();
if (event instanceof AlterTableAddColumnEvent) {
AlterTableAddColumnEvent alterTableAddColumnEvent =
(AlterTableAddColumnEvent) event;
Column column = alterTableAddColumnEvent.getColumn();
@@ -130,6 +133,15 @@ public class AlterPaimonTableSchemaEventHandler {
}
}
+ private Identifier toIdentifier() {
+ if (StringUtils.isNotEmpty(branch)
+ &&
!BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch)) {
+ return new Identifier(
+ paimonTablePath.getDatabaseName(),
paimonTablePath.getTableName(), branch);
+ }
+ return Identifier.create(paimonTablePath.getDatabaseName(),
paimonTablePath.getTableName());
+ }
+
private void updateColumn(
Column newColumn, String oldColumnName, Identifier identifier,
String afterTheColumn) {
BasicTypeDefine<DataType> reconvertColumn =
PaimonTypeMapper.INSTANCE.reconvert(newColumn);
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java
new file mode 100644
index 0000000000..87540d7b8c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class PaimonBranchSaveModeHandlerTest {
+
+ private static final String CATALOG_NAME = "paimon_catalog";
+ private static final String DATABASE_NAME = "default";
+ private static final String TABLE_NAME = "branch_table";
+ private static final String BRANCH_NAME = "test_branch";
+
+ @TempDir private Path temporaryFolder;
+
+ private PaimonCatalog paimonCatalog;
+ private ReadonlyConfig readonlyConfig;
+ private TablePath tablePath;
+ private CatalogTable catalogTable;
+
+ @BeforeEach
+ public void before() throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("warehouse", temporaryFolder.toString());
+ properties.put("plugin_name", "Paimon");
+ properties.put("database", DATABASE_NAME);
+ properties.put("table", TABLE_NAME);
+ properties.put("branch", BRANCH_NAME);
+ properties.put("paimon.table.write-props", new HashMap<String,
String>());
+ readonlyConfig = ReadonlyConfig.fromMap(properties);
+
+ tablePath = TablePath.of(DATABASE_NAME, TABLE_NAME);
+ catalogTable = createCatalogTable(TABLE_NAME);
+
+ paimonCatalog = new PaimonCatalog(CATALOG_NAME, readonlyConfig);
+ paimonCatalog.open();
+ paimonCatalog.createDatabase(tablePath, true);
+ paimonCatalog.createTable(tablePath, catalogTable, false);
+
+ FileStoreTable table = (FileStoreTable)
paimonCatalog.getPaimonTable(tablePath);
+ if (!table.branchManager().branchExists(BRANCH_NAME)) {
+ table.createBranch(BRANCH_NAME);
+ }
+ }
+
+ @Test
+ public void
createSchemaWithBranchShouldFailWithoutCreatingMainTableWhenTableMissing() {
+ String missingTableName = "missing_branch_table";
+ TablePath missingTablePath = TablePath.of(DATABASE_NAME,
missingTableName);
+ CatalogTable missingCatalogTable =
createCatalogTable(missingTableName);
+ PaimonSaveModeHandler handler =
+ new PaimonSaveModeHandler(
+ new TestSupportLoadTable(),
+ SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+ DataSaveMode.APPEND_DATA,
+ paimonCatalog,
+ missingCatalogTable,
+ null,
+ BRANCH_NAME);
+
+ PaimonConnectorException exception =
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
handler::handleSchemaSaveMode);
+ Assertions.assertTrue(
+ exception.getMessage().contains("main table must exist"),
+ "The error message should explain that branch writes require
an existing table.");
+ Assertions.assertFalse(
+ paimonCatalog.tableExists(missingTablePath),
+ "Branch save mode must not auto-create a main table.");
+ }
+
+ @Test
+ public void createSchemaWithBranchShouldFailWhenBranchMissing() {
+ String tableWithoutBranch = "table_without_branch";
+ TablePath tableWithoutBranchPath = TablePath.of(DATABASE_NAME,
tableWithoutBranch);
+ CatalogTable tableWithoutBranchCatalogTable =
createCatalogTable(tableWithoutBranch);
+ paimonCatalog.createTable(tableWithoutBranchPath,
tableWithoutBranchCatalogTable, false);
+ PaimonSaveModeHandler handler =
+ new PaimonSaveModeHandler(
+ new TestSupportLoadTable(),
+ SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+ DataSaveMode.APPEND_DATA,
+ paimonCatalog,
+ tableWithoutBranchCatalogTable,
+ null,
+ BRANCH_NAME);
+
+ PaimonConnectorException exception =
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
handler::handleSchemaSaveMode);
+ Assertions.assertTrue(
+ exception.getMessage().contains("does not exist"),
+ "The error message should explain that the branch does not
exist.");
+
Assertions.assertTrue(paimonCatalog.tableExists(tableWithoutBranchPath));
+ Assertions.assertFalse(
+ ((FileStoreTable)
paimonCatalog.getPaimonTable(tableWithoutBranchPath))
+ .branchManager()
+ .branchExists(BRANCH_NAME),
+ "Branch save mode must not auto-create the branch.");
+ }
+
+ @Test
+ public void dropDataWithBranchShouldFailWithoutDroppingMainTableOrBranch()
{
+ PaimonSaveModeHandler handler =
+ new PaimonSaveModeHandler(
+ new TestSupportLoadTable(),
+ SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+ DataSaveMode.DROP_DATA,
+ paimonCatalog,
+ catalogTable,
+ null,
+ BRANCH_NAME);
+
+ PaimonConnectorException exception =
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
handler::handleSchemaSaveMode);
+ Assertions.assertTrue(
+ exception.getMessage().contains("data_save_mode=DROP_DATA"),
+ "The error message should explain the unsupported save mode.");
+
+ FileStoreTable mainTable = (FileStoreTable)
paimonCatalog.getPaimonTable(tablePath);
+ Assertions.assertTrue(
+ mainTable.branchManager().branchExists(BRANCH_NAME),
+ "DROP_DATA on a branch must not delete branch metadata.");
+ }
+
+ @Test
+ public void
recreateSchemaWithBranchShouldFailWithoutDroppingMainTableOrBranch() {
+ PaimonSaveModeHandler handler =
+ new PaimonSaveModeHandler(
+ new TestSupportLoadTable(),
+ SchemaSaveMode.RECREATE_SCHEMA,
+ DataSaveMode.APPEND_DATA,
+ paimonCatalog,
+ catalogTable,
+ null,
+ BRANCH_NAME);
+
+ PaimonConnectorException exception =
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
handler::handleSchemaSaveMode);
+ Assertions.assertTrue(
+
exception.getMessage().contains("schema_save_mode=RECREATE_SCHEMA"),
+ "The error message should explain the unsupported save mode.");
+
+ FileStoreTable mainTable = (FileStoreTable)
paimonCatalog.getPaimonTable(tablePath);
+ Assertions.assertTrue(
+ mainTable.branchManager().branchExists(BRANCH_NAME),
+ "RECREATE_SCHEMA on a branch must not delete branch
metadata.");
+ }
+
+ @Test
+ public void
restoreSchemaSaveModeWithBranchShouldFailForDestructiveSaveMode() {
+ PaimonSaveModeHandler handler =
+ new PaimonSaveModeHandler(
+ new TestSupportLoadTable(),
+ SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+ DataSaveMode.DROP_DATA,
+ paimonCatalog,
+ catalogTable,
+ null,
+ BRANCH_NAME);
+
+ PaimonConnectorException exception =
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
handler::handleSchemaSaveModeWithRestore);
+ Assertions.assertTrue(
+ exception.getMessage().contains("data_save_mode=DROP_DATA"),
+ "The error message should explain the unsupported save mode.");
+ }
+
+ @Test
+ public void schemaEvolutionWithBranchShouldAlterBranchSchemaOnly() throws
Exception {
+ PaimonSinkConfig sinkConfig = new PaimonSinkConfig(readonlyConfig);
+ PaimonSinkWriter writer =
+ new PaimonSinkWriter(
+ new TestSinkWriterContext(),
+ readonlyConfig,
+ catalogTable,
+ ((FileStoreTable)
paimonCatalog.getPaimonTable(tablePath))
+ .switchToBranch(BRANCH_NAME),
+ UUID.randomUUID().toString(),
+ null,
+ sinkConfig,
+ new PaimonHadoopConfiguration(),
+ new PaimonBucketAssignerFactory());
+ try {
+ writer.applySchemaChange(
+ AlterTableAddColumnEvent.add(
+ catalogTable.getTableId(),
+ PhysicalColumn.of(
+ "branch_only_column",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ null)));
+ } finally {
+ writer.close();
+ }
+
+ FileStoreTable mainTable = (FileStoreTable)
paimonCatalog.getPaimonTable(tablePath);
+ FileStoreTable branchTable = mainTable.switchToBranch(BRANCH_NAME);
+ Assertions.assertFalse(
+ mainTable.schema().fieldNames().contains("branch_only_column"),
+ "Schema evolution configured with a branch must not alter the
main schema.");
+ Assertions.assertTrue(
+
branchTable.schema().fieldNames().contains("branch_only_column"),
+ "Schema evolution configured with a branch must alter the
branch schema.");
+ }
+
+ @AfterEach
+ public void after() {
+ if (paimonCatalog != null) {
+ paimonCatalog.close();
+ }
+ }
+
+ private static TableSchema baseSchema() {
+ return TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.INT_TYPE, (Long)
null, false, null, null))
+ .primaryKey(
+ org.apache.seatunnel.api.table.catalog.PrimaryKey.of(
+ "pk", Collections.singletonList("id")))
+ .build();
+ }
+
+ private static CatalogTable createCatalogTable(String tableName) {
+ return CatalogTable.of(
+ TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, tableName),
+ baseSchema(),
+ new HashMap<>(),
+ Collections.emptyList(),
+ "branch test table");
+ }
+
+ private static class TestSupportLoadTable implements
SupportLoadTable<Table> {
+ private Table table;
+
+ @Override
+ public void setLoadTable(Table table) {
+ this.table = table;
+ }
+
+ @Override
+ public Table getLoadTable() {
+ return table;
+ }
+ }
+
+ private static class TestSinkWriterContext
+ implements org.apache.seatunnel.api.sink.SinkWriter.Context {
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
+ @Override
+ public org.apache.seatunnel.api.common.metrics.MetricsContext
getMetricsContext() {
+ return null;
+ }
+
+ @Override
+ public org.apache.seatunnel.api.event.EventListener getEventListener()
{
+ return null;
+ }
+ }
+}