This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b0abbd2d89 [Feature][Connector-V2] Support multi-table sink feature
for paimon #5652 (#6449)
b0abbd2d89 is described below
commit b0abbd2d89d07409c6c90f1b278e8cca65d1b9ed
Author: dailai <[email protected]>
AuthorDate: Tue Mar 19 10:48:32 2024 +0800
[Feature][Connector-V2] Support multi-table sink feature for paimon #5652
(#6449)
---
docs/en/connector-v2/sink/Paimon.md | 82 +++++--
docs/zh/connector-v2/sink/Paimon.md | 87 +++++++
seatunnel-connectors-v2/connector-paimon/pom.xml | 5 +
.../seatunnel/paimon/catalog/PaimonCatalog.java | 215 +++++++++++++++++
.../PaimonCatalogFactory.java} | 26 ++-
.../paimon/catalog/PaimonCatalogLoader.java | 61 +++++
.../seatunnel/paimon/catalog/PaimonTable.java | 28 +++
.../seatunnel/paimon/config/PaimonConfig.java | 9 +-
.../seatunnel/paimon/config/PaimonSinkConfig.java | 66 ++++++
.../seatunnel/paimon/data/PaimonTypeMapper.java | 50 ++++
.../paimon/handler/PaimonSaveModeHandler.java | 58 +++++
.../seatunnel/paimon/sink/PaimonSink.java | 146 ++++++------
.../seatunnel/paimon/sink/PaimonSinkFactory.java | 68 +++++-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 57 ++++-
...aimonSinkFactory.java => SupportLoadTable.java} | 26 +--
.../sink/commit/PaimonAggregatedCommitter.java | 34 ++-
.../seatunnel/paimon/utils/JobContextUtil.java | 32 +++
.../seatunnel/paimon/utils/RowConverter.java | 12 +-
.../seatunnel/paimon/utils/RowKindConverter.java | 51 ++++
.../seatunnel/paimon/utils/RowTypeConverter.java | 85 ++++++-
.../seatunnel/paimon/utils/SchemaUtil.java | 54 +++++
.../paimon/utils/RowTypeConverterTest.java | 2 +-
.../connector-paimon-e2e/pom.xml | 9 +
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 260 +++++++++++++++++++++
.../test/resources/fake_cdc_sink_paimon_case1.conf | 86 +++++++
.../test/resources/fake_cdc_sink_paimon_case2.conf | 142 +++++++++++
.../seatunnel-hadoop3-3.1.4-uber/pom.xml | 5 +
27 files changed, 1596 insertions(+), 160 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 6fa721a1e6..5e9d3c431f 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -4,7 +4,7 @@
## Description
-Write data to Apache Paimon.
+Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
## Key features
@@ -12,40 +12,76 @@ Write data to Apache Paimon.
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| warehouse | String | Yes | - |
-| database | String | Yes | - |
-| table | String | Yes | - |
-| hdfs_site_path | String | No | - |
+| name | type | required | default value |
Description |
+|------------------|--------|----------|------------------------------|---------------------------------|
+| warehouse | String | Yes | - | Paimon
warehouse path |
+| database | String | Yes | - | The
database you want to access |
+| table | String | Yes | - | The
table you want to access |
+| hdfs_site_path | String | No | - |
|
+| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | The
schema save mode |
+| data_save_mode | Enum | no | APPEND_DATA | The
data save mode |
-### warehouse [string]
-
-Paimon warehouse path
-
-### database [string]
+## Examples
-The database you want to access
+### Single table
-### table [String]
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
-The table you want to access
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
-## Examples
+transform {
+}
-```hocon
sink {
Paimon {
- warehouse = "/tmp/paimon"
- database = "default"
- table = "st_test"
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
}
}
```
-## Changelog
+### Multiple table
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
+ }
+}
-### next version
+transform {
+}
-- Add Paimon Sink Connector
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="${database_name}"
+ table="${table_name}"
+ }
+}
+```
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
new file mode 100644
index 0000000000..b1b4baef9b
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -0,0 +1,87 @@
+# Paimon
+
+> Paimon 数据连接器
+
+## 描述
+
+Apache Paimon数据连接器。支持cdc写以及自动建表。
+
+## 主要特性
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+## 连接器选项
+
+| 名称 | 类型 | 是否必须 | 默认值 | 描述
|
+|------------------|--------|------|------------------------------|--------------------|
+| warehouse | String | Yes | - | Paimon
warehouse路径 |
+| database | String | Yes | - | 数据库名称
|
+| table | String | Yes | - | 表名
|
+| hdfs_site_path | String | No | - |
|
+| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式
|
+| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式
|
+
+## 示例
+
+### 单表
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+transform {
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ }
+}
+```
+
+### 多表
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
+ }
+}
+
+transform {
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="${database_name}"
+ table="${table_name}"
+ }
+}
+```
+
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 8bcb1c3507..499165ea6f 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -34,6 +34,11 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
new file mode 100644
index 0000000000..7312ed28b0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+@Slf4j
+public class PaimonCatalog implements Catalog, PaimonTable {
+ private static final String DEFAULT_DATABASE = "default";
+
+ private String catalogName;
+ private ReadonlyConfig readonlyConfig;
+ private PaimonCatalogLoader paimonCatalogLoader;
+ private org.apache.paimon.catalog.Catalog catalog;
+
+ public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
+ this.readonlyConfig = readonlyConfig;
+ this.catalogName = catalogName;
+ this.paimonCatalogLoader = new PaimonCatalogLoader(new
PaimonSinkConfig(readonlyConfig));
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ this.catalog = paimonCatalogLoader.loadCatalog();
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (catalog != null && catalog instanceof Closeable) {
+ try {
+ ((Closeable) catalog).close();
+ } catch (IOException e) {
+ log.error("Error while closing IcebergCatalog.", e);
+ throw new CatalogException(e);
+ }
+ }
+ }
+
+ @Override
+ public String name() {
+ return this.catalogName;
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return DEFAULT_DATABASE;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return catalog.databaseExists(databaseName);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return catalog.listDatabases();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ try {
+ return catalog.listTables(databaseName);
+ } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException
e) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ return catalog.tableExists(toIdentifier(tablePath));
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ try {
+ FileStoreTable paimonFileStoreTableTable = (FileStoreTable)
getPaimonTable(tablePath);
+ return toCatalogTable(paimonFileStoreTableTable, tablePath);
+ } catch (Exception e) {
+ throw new TableNotExistException(this.catalogName, tablePath);
+ }
+ }
+
+ @Override
+ public Table getPaimonTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ try {
+ return catalog.getTable(toIdentifier(tablePath));
+ } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+ throw new TableNotExistException(this.catalogName, tablePath);
+ }
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ try {
+ catalog.createTable(
+ toIdentifier(tablePath),
+ SchemaUtil.toPaimonSchema(table.getTableSchema()),
+ ignoreIfExists);
+ } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException
e) {
+ throw new TableAlreadyExistException(this.catalogName, tablePath);
+ } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException
e) {
+ throw new DatabaseNotExistException(this.catalogName,
tablePath.getDatabaseName());
+ }
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
+ } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+ throw new TableNotExistException(this.catalogName, tablePath);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ try {
+ catalog.createDatabase(tablePath.getDatabaseName(),
ignoreIfExists);
+ } catch
(org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException e) {
+ throw new DatabaseAlreadyExistException(this.catalogName,
tablePath.getDatabaseName());
+ }
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ catalog.dropDatabase(tablePath.getDatabaseName(),
ignoreIfNotExists, true);
+ } catch (Exception e) {
+ throw new DatabaseNotExistException(this.catalogName,
tablePath.getDatabaseName());
+ }
+ }
+
+ private CatalogTable toCatalogTable(
+ FileStoreTable paimonFileStoreTableTable, TablePath tablePath) {
+ org.apache.paimon.schema.TableSchema schema =
paimonFileStoreTableTable.schema();
+ List<DataField> dataFields = schema.fields();
+ TableSchema.Builder builder = TableSchema.builder();
+ dataFields.forEach(
+ dataField -> {
+ String name = dataField.name();
+ SeaTunnelDataType<?> seaTunnelType =
+ SchemaUtil.toSeaTunnelType(dataField.type());
+ PhysicalColumn physicalColumn =
+ PhysicalColumn.of(
+ name,
+ seaTunnelType,
+ (Long) null,
+ true,
+ null,
+ dataField.description());
+ builder.column(physicalColumn);
+ });
+
+ List<String> partitionKeys = schema.partitionKeys();
+
+ return CatalogTable.of(
+ org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
+ catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
+ builder.build(),
+ paimonFileStoreTableTable.options(),
+ partitionKeys,
+ null,
+ catalogName);
+ }
+
+ private Identifier toIdentifier(TablePath tablePath) {
+ return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
similarity index 59%
copy from
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
copy to
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
index dfae43c482..4d94f385d9 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
@@ -15,17 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
-public class PaimonSinkFactory implements TableSinkFactory {
+public class PaimonCatalogFactory implements CatalogFactory {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig
readonlyConfig) {
+ return new PaimonCatalog(catalogName, readonlyConfig);
+ }
@Override
public String factoryIdentifier() {
@@ -35,10 +41,14 @@ public class PaimonSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(PaimonConfig.WAREHOUSE)
- .required(PaimonConfig.DATABASE)
- .required(PaimonConfig.TABLE)
- .optional(PaimonConfig.HDFS_SITE_PATH)
+ .required(
+ PaimonSinkConfig.WAREHOUSE,
+ PaimonSinkConfig.DATABASE,
+ PaimonSinkConfig.TABLE)
+ .optional(
+ PaimonSinkConfig.HDFS_SITE_PATH,
+ PaimonSinkConfig.SCHEMA_SAVE_MODE,
+ PaimonSinkConfig.DATA_SAVE_MODE)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
new file mode 100644
index 0000000000..bec66dbe3f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
+
+@Slf4j
+public class PaimonCatalogLoader implements Serializable {
+ private PaimonSinkConfig config;
+
+ public PaimonCatalogLoader(PaimonSinkConfig config) {
+ this.config = config;
+ }
+
+ public Catalog loadCatalog() {
+ // When using the seatunel engine, set the current class loader to
prevent loading failures
+
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
+ final String warehouse = config.getWarehouse();
+ final Map<String, String> optionsMap = new HashMap<>();
+ optionsMap.put(WAREHOUSE.key(), warehouse);
+ final Options options = Options.fromMap(optionsMap);
+ final Configuration hadoopConf = new Configuration();
+ String hdfsSitePathOptional = config.getHdfsSitePath();
+ if (StringUtils.isNotBlank(hdfsSitePathOptional)) {
+ hadoopConf.addResource(new Path(hdfsSitePathOptional));
+ }
+ final CatalogContext catalogContext = CatalogContext.create(options,
hadoopConf);
+ return CatalogFactory.createCatalog(catalogContext);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java
new file mode 100644
index 0000000000..55b18f79ab
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+
+import org.apache.paimon.table.Table;
+
+public interface PaimonTable {
+ Table getPaimonTable(TablePath tablePath) throws CatalogException,
TableNotExistException;
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index b5299d8755..0396e6223a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -22,13 +22,14 @@ import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import java.io.Serializable;
import java.util.List;
/**
* Utility class to store configuration options, used by {@link
SeaTunnelSource} and {@link
* SeaTunnelSink}.
*/
-public class PaimonConfig {
+public class PaimonConfig implements Serializable {
public static final Option<String> WAREHOUSE =
Options.key("warehouse")
@@ -36,6 +37,12 @@ public class PaimonConfig {
.noDefaultValue()
.withDescription("The warehouse path of paimon");
+ public static final Option<String> CATALOG_NAME =
+ Options.key("catalog_name")
+ .stringType()
+ .defaultValue("paimon")
+ .withDescription(" the iceberg catalog name");
+
public static final Option<String> DATABASE =
Options.key("database")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
new file mode 100644
index 0000000000..589fd94816
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+
+import lombok.Getter;
+
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+public class PaimonSinkConfig extends PaimonConfig {
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .enumType(DataSaveMode.class)
+ .defaultValue(DataSaveMode.APPEND_DATA)
+ .withDescription("data_save_mode");
+
+ private String catalogName;
+ private String warehouse;
+ private String namespace;
+ private String table;
+ private String hdfsSitePath;
+ private SchemaSaveMode schemaSaveMode;
+ private DataSaveMode dataSaveMode;
+
+ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
+ this.catalogName =
checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
+ this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE));
+ this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE));
+ this.table = checkArgumentNotNull(readonlyConfig.get(TABLE));
+ this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
+ this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
+ this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
+ }
+
+ protected <T> T checkArgumentNotNull(T argument) {
+ checkNotNull(argument);
+ return argument;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
new file mode 100644
index 0000000000..1f8b1cff32
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.data;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
+
+import org.apache.paimon.types.DataType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class PaimonTypeMapper implements TypeConverter<DataType> {
+ public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();
+
+ @Override
+ public String identifier() {
+ return PaimonSink.PLUGIN_NAME;
+ }
+
+ @Override
+ public Column convert(DataType dataType) {
+ return
PhysicalColumn.builder().dataType(RowTypeConverter.convert(dataType)).build();
+ }
+
+ @Override
+ public DataType reconvert(Column column) {
+ return RowTypeConverter.reconvert(column.getDataType());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
new file mode 100644
index 0000000000..b479ebf14b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
+
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
+
+import org.apache.paimon.table.Table;
+
+public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
+
+ private SupportLoadTable<Table> supportLoadTable;
+ private Catalog catalog;
+ private CatalogTable catalogTable;
+
+ public PaimonSaveModeHandler(
+ SupportLoadTable supportLoadTable,
+ SchemaSaveMode schemaSaveMode,
+ DataSaveMode dataSaveMode,
+ Catalog catalog,
+ CatalogTable catalogTable,
+ String customSql) {
+ super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
+ this.supportLoadTable = supportLoadTable;
+ this.catalog = catalog;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public void handleSchemaSaveMode() {
+ super.handleSchemaSaveMode();
+ TablePath tablePath = catalogTable.getTablePath();
+ Table paimonTable = ((PaimonCatalog)
catalog).getPaimonTable(tablePath);
+ // load paimon table and set it into paimon sink
+ this.supportLoadTable.setLoadTable(paimonTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index ac1a0b97ed..cdec4b0c76 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -17,54 +17,46 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
-@AutoService(SeaTunnelSink.class)
public class PaimonSink
implements SeaTunnelSink<
- SeaTunnelRow, PaimonSinkState, PaimonCommitInfo,
PaimonAggregatedCommitInfo> {
+ SeaTunnelRow,
+ PaimonSinkState,
+ PaimonCommitInfo,
+ PaimonAggregatedCommitInfo>,
+ SupportSaveMode,
+ SupportLoadTable<Table>,
+ SupportMultiTableSink {
private static final long serialVersionUID = 1L;
@@ -72,79 +64,44 @@ public class PaimonSink
private SeaTunnelRowType seaTunnelRowType;
- private Config pluginConfig;
-
private Table table;
- @Override
- public String getPluginName() {
- return PLUGIN_NAME;
- }
+ private JobContext jobContext;
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig, WAREHOUSE.key(), DATABASE.key(),
TABLE.key());
- if (!result.isSuccess()) {
- throw new PaimonConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- // initialize paimon table
- final String warehouse = pluginConfig.getString(WAREHOUSE.key());
- final String database = pluginConfig.getString(DATABASE.key());
- final String table = pluginConfig.getString(TABLE.key());
- final Map<String, String> optionsMap = new HashMap<>();
- optionsMap.put(WAREHOUSE.key(), warehouse);
- final Options options = Options.fromMap(optionsMap);
- final Configuration hadoopConf = new Configuration();
- if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) {
- hadoopConf.addResource(new
Path(pluginConfig.getString(HDFS_SITE_PATH.key())));
- }
- final CatalogContext catalogContext = CatalogContext.create(options,
hadoopConf);
- try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
- Identifier identifier = Identifier.create(database, table);
- this.table = catalog.getTable(identifier);
- } catch (Exception e) {
- String errorMsg =
- String.format(
- "Failed to get table [%s] from database [%s] on
warehouse [%s]",
- database, table, warehouse);
- throw new PaimonConnectorException(
- PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
- }
- }
+ private ReadonlyConfig readonlyConfig;
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ private PaimonSinkConfig paimonSinkConfig;
+
+ private CatalogTable catalogTable;
+
+ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
+ this.readonlyConfig = readonlyConfig;
+ this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig);
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
+ public String getPluginName() {
+ return PLUGIN_NAME;
}
@Override
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
- return new PaimonSinkWriter(context, table, seaTunnelRowType);
+ return new PaimonSinkWriter(context, table, seaTunnelRowType,
jobContext);
}
@Override
public Optional<SinkAggregatedCommitter<PaimonCommitInfo,
PaimonAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
- return Optional.of(new PaimonAggregatedCommitter(table));
+ return Optional.of(new PaimonAggregatedCommitter(table, jobContext));
}
@Override
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>
restoreWriter(
SinkWriter.Context context, List<PaimonSinkState> states) throws
IOException {
- return new PaimonSinkWriter(context, table, seaTunnelRowType, states);
+ return new PaimonSinkWriter(context, table, seaTunnelRowType, states,
jobContext);
}
@Override
@@ -156,4 +113,43 @@ public class PaimonSink
public Optional<Serializer<PaimonCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+
+ @Override
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ org.apache.seatunnel.api.table.factory.CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+
org.apache.seatunnel.api.table.factory.CatalogFactory.class,
+ "Paimon");
+ if (catalogFactory == null) {
+ throw new PaimonConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(),
+ PluginType.SINK,
+ "Cannot find paimon catalog factory"));
+ }
+ org.apache.seatunnel.api.table.catalog.Catalog catalog =
+
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(),
readonlyConfig);
+ catalog.open();
+ return Optional.of(
+ new PaimonSaveModeHandler(
+ this,
+ paimonSinkConfig.getSchemaSaveMode(),
+ paimonSinkConfig.getDataSaveMode(),
+ catalog,
+ catalogTable,
+ null));
+ }
+
+ @Override
+ public void setLoadTable(Table table) {
+ this.table = table;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index dfae43c482..c0b4d997ea 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -17,16 +17,30 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+
+import org.apache.commons.lang3.StringUtils;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
public class PaimonSinkFactory implements TableSinkFactory {
+ public static final String REPLACE_TABLE_NAME_KEY = "${table_name}";
+
+ public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}";
+
+ public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}";
+
@Override
public String factoryIdentifier() {
return "Paimon";
@@ -35,10 +49,56 @@ public class PaimonSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(PaimonConfig.WAREHOUSE)
- .required(PaimonConfig.DATABASE)
- .required(PaimonConfig.TABLE)
- .optional(PaimonConfig.HDFS_SITE_PATH)
+ .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE,
PaimonConfig.TABLE)
+ .optional(
+ PaimonConfig.HDFS_SITE_PATH,
+ PaimonSinkConfig.SCHEMA_SAVE_MODE,
+ PaimonSinkConfig.DATA_SAVE_MODE)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable =
+ renameCatalogTable(new PaimonSinkConfig(readonlyConfig),
context.getCatalogTable());
+ return () -> new PaimonSink(context.getOptions(), catalogTable);
+ }
+
+ private CatalogTable renameCatalogTable(
+ PaimonSinkConfig paimonSinkConfig, CatalogTable catalogTable) {
+ TableIdentifier tableId = catalogTable.getTableId();
+ String tableName;
+ String namespace;
+ if (StringUtils.isNotEmpty(paimonSinkConfig.getTable())) {
+ tableName = replaceName(paimonSinkConfig.getTable(), tableId);
+ } else {
+ tableName = tableId.getTableName();
+ }
+
+ if (StringUtils.isNotEmpty(paimonSinkConfig.getNamespace())) {
+ namespace = replaceName(paimonSinkConfig.getNamespace(), tableId);
+ } else {
+ namespace = tableId.getSchemaName();
+ }
+
+ TableIdentifier newTableId =
+ TableIdentifier.of(
+ tableId.getCatalogName(), namespace,
tableId.getSchemaName(), tableName);
+
+ return CatalogTable.of(newTableId, catalogTable);
+ }
+
+ private String replaceName(String original, TableIdentifier tableId) {
+ if (tableId.getTableName() != null) {
+ original = original.replace(REPLACE_TABLE_NAME_KEY,
tableId.getTableName());
+ }
+ if (tableId.getSchemaName() != null) {
+ original = original.replace(REPLACE_SCHEMA_NAME_KEY,
tableId.getSchemaName());
+ }
+ if (tableId.getDatabaseName() != null) {
+ original = original.replace(REPLACE_DATABASE_NAME_KEY,
tableId.getDatabaseName());
+ }
+ return original;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 930f62045f..7b2e8327a9 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -17,21 +17,28 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.table.sink.WriteBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -46,13 +53,14 @@ import java.util.stream.Collectors;
@Slf4j
public class PaimonSinkWriter
- implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>
{
+ implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>,
+ SupportMultiTableSinkWriter<Void> {
private String commitUser = UUID.randomUUID().toString();
- private final BatchWriteBuilder tableWriteBuilder;
+ private final WriteBuilder tableWriteBuilder;
- private final BatchTableWrite tableWrite;
+ private final TableWrite tableWrite;
private long checkpointId = 0;
@@ -64,37 +72,58 @@ public class PaimonSinkWriter
private final SinkWriter.Context context;
- public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType
seaTunnelRowType) {
+ private final JobContext jobContext;
+
+ public PaimonSinkWriter(
+ Context context,
+ Table table,
+ SeaTunnelRowType seaTunnelRowType,
+ JobContext jobContext) {
this.table = table;
- this.tableWriteBuilder =
this.table.newBatchWriteBuilder().withOverwrite();
+ this.tableWriteBuilder =
+ JobContextUtil.isBatchJob(jobContext)
+ ? this.table.newBatchWriteBuilder().withOverwrite()
+ : this.table.newStreamWriteBuilder();
this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
+ this.jobContext = jobContext;
}
public PaimonSinkWriter(
Context context,
Table table,
SeaTunnelRowType seaTunnelRowType,
- List<PaimonSinkState> states) {
+ List<PaimonSinkState> states,
+ JobContext jobContext) {
this.table = table;
- this.tableWriteBuilder =
this.table.newBatchWriteBuilder().withOverwrite();
+ this.tableWriteBuilder =
+ JobContextUtil.isBatchJob(jobContext)
+ ? this.table.newBatchWriteBuilder().withOverwrite()
+ : this.table.newStreamWriteBuilder();
this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
+ this.jobContext = jobContext;
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
this.commitUser = states.get(0).getCommitUser();
this.checkpointId = states.get(0).getCheckpointId();
- try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) {
+ try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> commitables =
states.stream()
.map(PaimonSinkState::getCommittables)
.flatMap(List::stream)
.collect(Collectors.toList());
log.info("Trying to recommit states {}", commitables);
- tableCommit.commit(commitables);
+ if (JobContextUtil.isBatchJob(jobContext)) {
+ log.debug("Trying to recommit states batch mode");
+ ((BatchTableCommit) tableCommit).commit(commitables);
+ } else {
+ log.debug("Trying to recommit states streaming mode");
+ ((StreamTableCommit)
tableCommit).commit(Objects.hash(commitables), commitables);
+ }
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -117,7 +146,13 @@ public class PaimonSinkWriter
@Override
public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
try {
- List<CommitMessage> fileCommittables = tableWrite.prepareCommit();
+ List<CommitMessage> fileCommittables;
+ if (JobContextUtil.isBatchJob(jobContext)) {
+ fileCommittables = ((BatchTableWrite)
tableWrite).prepareCommit();
+ } else {
+ fileCommittables =
+ ((StreamTableWrite) tableWrite).prepareCommit(false,
committables.size());
+ }
committables.addAll(fileCommittables);
return Optional.of(new PaimonCommitInfo(fileCommittables));
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
similarity index 52%
copy from
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
copy to
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
index dfae43c482..734762e23c 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java
@@ -17,28 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class PaimonSinkFactory implements TableSinkFactory {
-
- @Override
- public String factoryIdentifier() {
- return "Paimon";
- }
-
- @Override
- public OptionRule optionRule() {
- return OptionRule.builder()
- .required(PaimonConfig.WAREHOUSE)
- .required(PaimonConfig.DATABASE)
- .required(PaimonConfig.TABLE)
- .optional(PaimonConfig.HDFS_SITE_PATH)
- .build();
- }
+public interface SupportLoadTable<T> {
+ void setLoadTable(T table);
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 987d8fbb80..2c0be5d424 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -17,14 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.table.sink.WriteBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -32,35 +38,49 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/** Paimon connector aggregated committer class */
@Slf4j
public class PaimonAggregatedCommitter
- implements SinkAggregatedCommitter<PaimonCommitInfo,
PaimonAggregatedCommitInfo> {
+ implements SinkAggregatedCommitter<PaimonCommitInfo,
PaimonAggregatedCommitInfo>,
+ SupportMultiTableSinkAggregatedCommitter {
private static final long serialVersionUID = 1L;
private final Lock.Factory localFactory = Lock.emptyFactory();
- private final Table table;
+ private final WriteBuilder tableWriteBuilder;
- public PaimonAggregatedCommitter(Table table) {
- this.table = table;
+ private final JobContext jobContext;
+
+ public PaimonAggregatedCommitter(Table table, JobContext jobContext) {
+ this.jobContext = jobContext;
+ this.tableWriteBuilder =
+ JobContextUtil.isBatchJob(jobContext)
+ ? table.newBatchWriteBuilder()
+ : table.newStreamWriteBuilder();
}
@Override
public List<PaimonAggregatedCommitInfo> commit(
List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
- try (BatchTableCommit tableCommit =
- table.newBatchWriteBuilder().withOverwrite().newCommit()) {
+ try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> fileCommittables =
aggregatedCommitInfo.stream()
.map(PaimonAggregatedCommitInfo::getCommittables)
.flatMap(List::stream)
.flatMap(List::stream)
.collect(Collectors.toList());
- tableCommit.commit(fileCommittables);
+ if (JobContextUtil.isBatchJob(jobContext)) {
+ log.debug("Trying to commit states batch mode");
+ ((BatchTableCommit) tableCommit).commit(fileCommittables);
+ } else {
+ log.debug("Trying to commit states streaming mode");
+ ((StreamTableCommit) tableCommit)
+ .commit(Objects.hash(fileCommittables),
fileCommittables);
+ }
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED,
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
new file mode 100644
index 0000000000..3a4d9b72d4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import lombok.extern.slf4j.Slf4j;
+
+/** Job env util */
+@Slf4j
+public class JobContextUtil {
+
+ public static boolean isBatchJob(JobContext jobContext) {
+ return jobContext.getJobMode().equals(JobMode.BATCH);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 44f8fb2624..6b9a6bf01c 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -333,6 +333,10 @@ public class RowConverter {
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType) {
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
+ // Convert SeaTunnel RowKind to Paimon RowKind
+ org.apache.paimon.types.RowKind rowKind =
+
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
+ binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
// judge the field is or not equals null
@@ -393,8 +397,8 @@ public class RowConverter {
MapType<?, ?> mapType = (MapType<?, ?>)
seaTunnelRowType.getFieldType(i);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
- DataType paimonKeyType = RowTypeConverter.convert(keyType);
- DataType paimonValueType =
RowTypeConverter.convert(valueType);
+ DataType paimonKeyType =
RowTypeConverter.reconvert(keyType);
+ DataType paimonValueType =
RowTypeConverter.reconvert(valueType);
Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
Object[] keys = field.keySet().toArray(new Object[0]);
Object[] values = field.values().toArray(new Object[0]);
@@ -411,13 +415,13 @@ public class RowConverter {
i,
paimonArray,
new InternalArraySerializer(
-
RowTypeConverter.convert(arrayType.getElementType())));
+
RowTypeConverter.reconvert(arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType =
seaTunnelRowType.getFieldType(i);
Object row = seaTunnelRow.getField(i);
InternalRow paimonRow = convert((SeaTunnelRow) row,
(SeaTunnelRowType) rowType);
- RowType paimonRowType =
RowTypeConverter.convert((SeaTunnelRowType) rowType);
+ RowType paimonRowType =
RowTypeConverter.reconvert((SeaTunnelRowType) rowType);
binaryWriter.writeRow(i, paimonRow, new
InternalRowSerializer(paimonRowType));
break;
default:
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
new file mode 100644
index 0000000000..ce6a172e43
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+
+import org.apache.paimon.data.InternalRow;
+
+public class RowKindConverter {
+
+ /**
+ * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link
InternalRow}
+ *
+ * @param seaTunnelRowInd
+ * @return
+ */
+ public static org.apache.paimon.types.RowKind
convertSeaTunnelRowKind2PaimonRowKind(
+ RowKind seaTunnelRowInd) {
+ switch (seaTunnelRowInd) {
+ case DELETE:
+ return org.apache.paimon.types.RowKind.DELETE;
+ case UPDATE_AFTER:
+ return org.apache.paimon.types.RowKind.UPDATE_AFTER;
+ case UPDATE_BEFORE:
+ return org.apache.paimon.types.RowKind.UPDATE_BEFORE;
+ case INSERT:
+ return org.apache.paimon.types.RowKind.INSERT;
+ default:
+ throw new PaimonConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported rowKind type " +
seaTunnelRowInd.shortString());
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index 4dfd6b69fa..16863ebff5 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -43,6 +43,7 @@ import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
@@ -70,13 +71,93 @@ public class RowTypeConverter {
return new SeaTunnelRowType(fieldNames, dataTypes);
}
+ /**
+ * Convert Paimon row type {@link DataType} to SeaTunnel row type {@link
SeaTunnelDataType}
+ *
+ * @param dataType Paimon data type
+ * @return SeaTunnel data type {@link SeaTunnelDataType}
+ */
+ public static SeaTunnelDataType convert(DataType dataType) {
+ SeaTunnelDataType<?> seaTunnelDataType;
+ PaimonToSeaTunnelTypeVisitor paimonToSeaTunnelTypeVisitor =
+ PaimonToSeaTunnelTypeVisitor.INSTANCE;
+ switch (dataType.getTypeRoot()) {
+ case CHAR:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((CharType) dataType);
+ break;
+ case VARCHAR:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((VarCharType) dataType);
+ break;
+ case BOOLEAN:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((BooleanType) dataType);
+ break;
+ case BINARY:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((BinaryType) dataType);
+ break;
+ case VARBINARY:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((VarBinaryType) dataType);
+ break;
+ case DECIMAL:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((DecimalType) dataType);
+ break;
+ case TINYINT:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((TinyIntType) dataType);
+ break;
+ case SMALLINT:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((SmallIntType) dataType);
+ break;
+ case INTEGER:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((IntType) dataType);
+ break;
+ case BIGINT:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((BigIntType) dataType);
+ break;
+ case FLOAT:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((FloatType) dataType);
+ break;
+ case DOUBLE:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((DoubleType) dataType);
+ break;
+ case DATE:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((DateType) dataType);
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((TimeType) dataType);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((TimestampType) dataType);
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ seaTunnelDataType =
+
paimonToSeaTunnelTypeVisitor.visit((LocalZonedTimestampType) dataType);
+ break;
+ case ARRAY:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((ArrayType) dataType);
+ break;
+ case MAP:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((MapType) dataType);
+ break;
+ case ROW:
+ seaTunnelDataType =
paimonToSeaTunnelTypeVisitor.visit((RowType) dataType);
+ break;
+ default:
+ String errorMsg =
+ String.format(
+ "Paimon dataType not support this genericType
[%s]",
+ dataType.asSQLString());
+ throw new PaimonConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
+ }
+ return seaTunnelDataType;
+ }
+
/**
* Convert SeaTunnel row type {@link SeaTunnelRowType} to Paimon row type
{@link RowType}
*
* @param seaTunnelRowType SeaTunnel row type {@link SeaTunnelRowType}
* @return Paimon row type {@link RowType}
*/
- public static RowType convert(SeaTunnelRowType seaTunnelRowType) {
+ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType) {
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
DataType[] dataTypes =
Arrays.stream(fieldTypes)
@@ -96,7 +177,7 @@ public class RowTypeConverter {
* @param dataType SeaTunnel data type {@link SeaTunnelDataType}
* @return Paimon data type {@link DataType}
*/
- public static DataType convert(SeaTunnelDataType<?> dataType) {
+ public static DataType reconvert(SeaTunnelDataType<?> dataType) {
return SeaTunnelTypeToPaimonVisitor.INSTANCE.visit(dataType);
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
new file mode 100644
index 0000000000..c03a77149c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
+
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import java.util.Objects;
+
+/** The util seatunnel schema to paimon schema */
+public class SchemaUtil {
+
+ public static DataType toPaimonType(Column column) {
+ return PaimonTypeMapper.INSTANCE.reconvert(column);
+ }
+
+ public static Schema toPaimonSchema(TableSchema tableSchema) {
+ Schema.Builder paiSchemaBuilder = Schema.newBuilder();
+ for (int i = 0; i < tableSchema.getColumns().size(); i++) {
+ Column column = tableSchema.getColumns().get(i);
+ paiSchemaBuilder.column(column.getName(), toPaimonType(column));
+ }
+ PrimaryKey primaryKey = tableSchema.getPrimaryKey();
+ if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size()
> 0) {
+ paiSchemaBuilder.primaryKey(primaryKey.getColumnNames());
+ }
+ return paiSchemaBuilder.build();
+ }
+
+ public static SeaTunnelDataType<?> toSeaTunnelType(DataType dataType) {
+ return PaimonTypeMapper.INSTANCE.convert(dataType).getDataType();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index f32b87f007..f828be0650 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -103,7 +103,7 @@ public class RowTypeConverterTest {
@Test
public void seaTunnelToPaimon() {
- RowType convert = RowTypeConverter.convert(seaTunnelRowType);
+ RowType convert = RowTypeConverter.reconvert(seaTunnelRowType);
Assertions.assertEquals(convert, rowType);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
index 4af6e8436e..69ea9a9f74 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
@@ -30,16 +30,25 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-paimon</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+ <classifier>optional</classifier>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
new file mode 100644
index 0000000000..a960f7d4d3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "Spark and Flink engine can not auto create paimon table on
worker node in local file(e.g flink tm) by savemode feature which can lead
error")
+@Slf4j
+public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
+ private static final String CATALOG_ROOT_DIR = "/tmp/";
+ private static final String NAMESPACE = "paimon";
+ private static final String NAMESPACE_TAR = "paimon.tar.gz";
+ private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE +
"/";
+ private static final String TARGET_TABLE = "st_test";
+ private static final String TARGET_DATABASE = "seatunnel_namespace";
+ private static final String FAKE_TABLE1 = "FakeTable1";
+ private static final String FAKE_DATABASE1 = "FakeDatabase1";
+ private static final String FAKE_TABLE2 = "FakeTable1";
+ private static final String FAKE_DATABASE2 = "FakeDatabase2";
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {}
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {}
+
+ @TestTemplate
+ public void testFakeCDCSinkPaimon(TestContainer container) throws
Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case1.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData(TARGET_DATABASE,
TARGET_TABLE);
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("A_1",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 3) {
+ Assertions.assertEquals("C",
paimonRecord.getName());
+ }
+ });
+ });
+
+ cleanPaimonTable(container);
+ }
+
+ @TestTemplate
+ public void testFakeMultipleTableSinkPaimon(TestContainer container)
throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_cdc_sink_paimon_case2.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ // Check FakeDatabase1.FakeTable1
+ List<PaimonRecord> fake1PaimonRecords =
+ loadPaimonData(FAKE_DATABASE1,
FAKE_TABLE1);
+ Assertions.assertEquals(2,
fake1PaimonRecords.size());
+ fake1PaimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("A_1",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 3) {
+ Assertions.assertEquals("C",
paimonRecord.getName());
+ }
+ });
+ // Check FakeDatabase2.FakeTable1
+ List<PaimonRecord> fake2PaimonRecords =
+ loadPaimonData(FAKE_DATABASE2,
FAKE_TABLE2);
+ Assertions.assertEquals(2,
fake2PaimonRecords.size());
+ fake2PaimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 100) {
+ Assertions.assertEquals(
+ "A_100",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 200) {
+ Assertions.assertEquals("C",
paimonRecord.getName());
+ }
+ });
+ });
+
+ cleanPaimonTable(container);
+ }
+
+ protected final ContainerExtendedFactory cleanContainerExtendedFactory =
+ genericContainer ->
+ genericContainer.execInContainer("sh", "-c", "rm -rf " +
CATALOG_DIR + "**");
+
+ private void cleanPaimonTable(TestContainer container)
+ throws IOException, InterruptedException {
+ // clean table
+ container.executeExtraCommands(cleanContainerExtendedFactory);
+ }
+
+ protected final ContainerExtendedFactory containerExtendedFactory =
+ container -> {
+ FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
+ FileUtils.createNewDir(CATALOG_DIR);
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd "
+ + CATALOG_ROOT_DIR
+ + " && tar -czvf "
+ + NAMESPACE_TAR
+ + " "
+ + NAMESPACE);
+ container.copyFileFromContainer(
+ CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR +
NAMESPACE_TAR);
+ extractFiles();
+ };
+
+ private void extractFiles() {
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ processBuilder.command(
+ "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " +
NAMESPACE_TAR);
+ try {
+ Process process = processBuilder.start();
+ // wait command completed
+ int exitCode = process.waitFor();
+ if (exitCode == 0) {
+ log.info("Extract files successful.");
+ } else {
+ log.error("Extract files failed with exit code " + exitCode);
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private List<PaimonRecord> loadPaimonData(String dbName, String tbName)
throws Exception {
+ Table table = getTable(dbName, tbName);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecord> result = new ArrayList<>();
+ log.info(
+ "====================================Paimon
data===========================================");
+ log.info(
+
"==========================================================================================");
+ log.info(
+
"==========================================================================================");
+ try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row -> {
+ result.add(new PaimonRecord(row.getLong(0),
row.getString(1).toString()));
+ log.info("key_id:" + row.getLong(0) + ", name:" +
row.getString(1));
+ });
+ }
+ log.info(
+
"==========================================================================================");
+ log.info(
+
"==========================================================================================");
+ log.info(
+
"==========================================================================================");
+ return result;
+ }
+
+ private Table getTable(String dbName, String tbName) {
+ try {
+ return getCatalog().getTable(getIdentifier(dbName, tbName));
+ } catch (Catalog.TableNotExistException e) {
+ // do something
+ throw new RuntimeException("table not exist");
+ }
+ }
+
+ private Identifier getIdentifier(String dbName, String tbName) {
+ return Identifier.create(dbName, tbName);
+ }
+
+ private Catalog getCatalog() {
+ Options options = new Options();
+ options.set("warehouse", "file://" + CATALOG_DIR);
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
+ return catalog;
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public class PaimonRecord {
+ private Long pkId;
+ private String name;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
new file mode 100644
index 0000000000..59e3a0cf72
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
new file mode 100644
index 0000000000..ddc9226871
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "FakeDatabase1.FakeTable1"
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "FakeDatabase2.FakeTable1"
+ fields {
+ pk_id = bigint
+ name = string
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [100, "A"]
+ },
+ {
+ kind = INSERT
+ fields = [200, "B"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [100, "A"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [100, "A_100"]
+ },
+ {
+ kind = DELETE
+ fields = [200, "B"]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "${database_name}"
+ table = "${table_name}"
+ }
+}
diff --git a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
index 00b55265c4..be5ced9214 100644
--- a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
+++ b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
@@ -47,6 +47,11 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop3.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.10.4</version>
+ </dependency>
</dependencies>
<build>