This is an automated email from the ASF dual-hosted git repository.
dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c0f27c2f76 [Feature][Connector-V2] Piamon Sink supports
changelog-procuder is lookup and full-compaction mode (#7834)
c0f27c2f76 is described below
commit c0f27c2f76c6562b18a7b4c76d2c7ef4452f4b4a
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Oct 21 10:41:38 2024 +0800
[Feature][Connector-V2] Piamon Sink supports changelog-procuder is lookup
and full-compaction mode (#7834)
---
docs/en/connector-v2/sink/Paimon.md | 45 +++++-
docs/zh/connector-v2/sink/Paimon.md | 70 +++++++---
.../seatunnel/paimon/config/PaimonSinkConfig.java | 33 ++++-
.../seatunnel/paimon/sink/PaimonSink.java | 15 +-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 48 +++++--
.../seatunnel/paimon/utils/SchemaUtil.java | 5 +
.../e2e/connector/paimon/PaimonRecord.java | 30 ++++
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 152 ++++++++++++++++++++-
.../changelog_fake_cdc_sink_paimon_case1_ddl.conf | 53 +++++++
...log_fake_cdc_sink_paimon_case1_insert_data.conf | 67 +++++++++
...log_fake_cdc_sink_paimon_case1_update_data.conf | 71 ++++++++++
.../changelog_fake_cdc_sink_paimon_case2.conf | 83 +++++++++++
.../test/resources/changelog_paimon_to_paimon.conf | 48 +++++++
.../common/container/AbstractTestContainer.java | 24 +++-
.../e2e/common/container/TestContainer.java | 13 +-
.../flink/AbstractTestFlinkContainer.java | 9 +-
.../ConnectorPackageServiceContainer.java | 9 +-
.../container/seatunnel/SeaTunnelContainer.java | 27 +++-
.../spark/AbstractTestSparkContainer.java | 10 +-
.../seatunnel/e2e/common/util/JobIdGenerator.java | 27 ++++
20 files changed, 786 insertions(+), 53 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 8133b6e836..c9e4b3a9b6 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -31,7 +31,7 @@ libfb303-xxx.jar
## Options
-| name | type | required | default value
| Description
|
+| name | type | required | default value
| Description
|
|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| warehouse | String | Yes | -
| Paimon warehouse path
|
| catalog_type | String | No | filesystem
| Catalog type of Paimon, support filesystem and hive
|
@@ -43,7 +43,7 @@ libfb303-xxx.jar
| data_save_mode | Enum | No | APPEND_DATA
| The data save mode
|
| paimon.table.primary-keys | String | No | -
| Default comma-separated list of columns (primary key) that identify a row
in tables.(Notice: The partition field needs to be included in the primary key
fields) |
| paimon.table.partition-keys | String | No | -
| Default comma-separated list of partition fields to use when creating
tables.
|
-| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
|
+| paimon.table.write-props | Map | No | -
| Properties passed through to paimon table initialization,
[reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions).
|
| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
@@ -52,9 +52,14 @@ You must configure the `changelog-producer=input` option to
enable the changelog
The changelog producer mode of the paimon table has [four
mode](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/)
which is `none`、`input`、`lookup` and `full-compaction`.
-Currently, we only support the `none` and `input` mode. The default is `none`
which will not output the changelog file. The `input` mode will output the
changelog file in paimon table.
+All `changelog-producer` modes are currently supported. The default is `none`.
-When you use a streaming mode to read paimon table, these two mode will
produce [different
results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog).
+*
[`none`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#none)
+*
[`input`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#input)
+*
[`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
+*
[`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
+> note:
+> When you use a streaming mode to read paimon table,different mode will
produce [different
results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
## Examples
@@ -250,6 +255,38 @@ sink {
}
```
+#### Write with the `changelog-producer` attribute
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+ catalog_name = "seatunnel_test"
+ warehouse = "file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database = "seatunnel"
+ table = "role"
+ paimon.table.write-props = {
+ changelog-producer = full-compaction
+ changelog-tmp-path = /tmp/paimon/changelog
+ }
+ }
+}
+```
+
### Write to dynamic bucket table
Single dynamic bucket table with write props of paimon,operates on the primary
key table and bucket is -1.
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 32d35a5e95..375c8c90ca 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -30,30 +30,35 @@ libfb303-xxx.jar
## 连接器选项
-| 名称 | 类型 | 是否必须 | 默认值 | 描述
|
-|-----------------------------|-------|----------|------------------------------|---------------------------------------------------------------------------------------------------|
-| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
-| catalog_type | 字符串 | 否 | filesystem |
Paimon的catalog类型,目前支持filesystem和hive
|
-| catalog_uri | 字符串 | 否 | - |
Paimon catalog的uri,仅当catalog_type为hive时需要配置
|
-| database | 字符串 | 是 | - |
数据库名称
|
-| table | 字符串 | 是 | - |
表名
|
-| hdfs_site_path | 字符串 | 否 | - |
hdfs-site.xml文件路径
|
-| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST
| Schema保存模式
|
-| data_save_mode | 枚举 | 否 | APPEND_DATA
| 数据保存模式
|
-| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
-| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
-| paimon.table.write-props | Map | 否 | -
| Paimon表初始化指定的属性,
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
|
-| paimon.hadoop.conf | Map | 否 | -
| Hadoop配置文件属性信息
|
-| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
+| 名称 | 类型 | 是否必须 | 默认值 |
描述
|
+|-----------------------------|------|------|------------------------------|-------------------------------------------------------------------------------------------------------|
+| warehouse | 字符串 | 是 | - |
Paimon warehouse路径
|
+| catalog_type | 字符串 | 否 | filesystem |
Paimon的catalog类型,目前支持filesystem和hive
|
+| catalog_uri | 字符串 | 否 | - |
Paimon catalog的uri,仅当catalog_type为hive时需要配置
|
+| database | 字符串 | 是 | - |
数据库名称
|
+| table | 字符串 | 是 | - |
表名
|
+| hdfs_site_path | 字符串 | 否 | - |
hdfs-site.xml文件路径
|
+| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
Schema保存模式
|
+| data_save_mode | 枚举 | 否 | APPEND_DATA |
数据保存模式
|
+| paimon.table.primary-keys | 字符串 | 否 | - |
主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中)
|
+| paimon.table.partition-keys | 字符串 | 否 | - |
分区字段列表,多字段使用逗号分隔
|
+| paimon.table.write-props | Map | 否 | - |
Paimon表初始化指定的属性,
[参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions)
|
+| paimon.hadoop.conf | Map | 否 | - |
Hadoop配置文件属性信息
|
+| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
## 更新日志
你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon
sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/),分别是`none`、`input`、`lookup`
和 `full-compaction`。
-目前,我们只支持`none`和`input`模式。默认是`none`,这种模式将不会产生changelog文件。`input`模式将会在Paimon表下产生changelog文件。
+目前支持全部`changelog-producer`模式。默认是`none`模式。
-当你使用流模式去读paimon表的数据时,这两种模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+*
[`none`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#none)
+*
[`input`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#input)
+*
[`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
+*
[`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
+> 注意:
+ >
当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
## 示例
@@ -248,6 +253,37 @@ sink {
}
}
```
+#### 使用`changelog-producer`属性写入
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+sink {
+ Paimon {
+ catalog_name = "seatunnel_test"
+ warehouse = "file:///tmp/seatunnel/paimon/hadoop-sink/"
+ database = "seatunnel"
+ table = "role"
+ paimon.table.write-props = {
+ changelog-producer = full-compaction
+ changelog-tmp-path = /tmp/paimon/changelog
+ }
+ }
+}
+```
### 动态分桶paimon单表
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index 9b358a2e8c..87766ff96b 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -23,16 +23,22 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.paimon.CoreOptions;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
@Getter
@Slf4j
public class PaimonSinkConfig extends PaimonConfig {
+
+ public static final String CHANGELOG_TMP_PATH = "changelog-tmp-path";
+
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
@@ -44,7 +50,6 @@ public class PaimonSinkConfig extends PaimonConfig {
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription("data_save_mode");
-
public static final Option<String> PRIMARY_KEYS =
Options.key("paimon.table.primary-keys")
.stringType()
@@ -66,11 +71,13 @@ public class PaimonSinkConfig extends PaimonConfig {
.withDescription(
"Properties passed through to paimon table
initialization, such as 'file.format',
'bucket'(org.apache.paimon.CoreOptions)");
- private SchemaSaveMode schemaSaveMode;
- private DataSaveMode dataSaveMode;
- private List<String> primaryKeys;
- private List<String> partitionKeys;
- private Map<String, String> writeProps;
+ private final SchemaSaveMode schemaSaveMode;
+ private final DataSaveMode dataSaveMode;
+ private final CoreOptions.ChangelogProducer changelogProducer;
+ private final String changelogTmpPath;
+ private final List<String> primaryKeys;
+ private final List<String> partitionKeys;
+ private final Map<String, String> writeProps;
public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
@@ -79,6 +86,20 @@ public class PaimonSinkConfig extends PaimonConfig {
this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS),
",");
this.writeProps = readonlyConfig.get(WRITE_PROPS);
+ this.changelogProducer =
+ Stream.of(CoreOptions.ChangelogProducer.values())
+ .filter(
+ cp ->
+ cp.toString()
+ .equalsIgnoreCase(
+
writeProps.getOrDefault(
+
CoreOptions.CHANGELOG_PRODUCER
+ .key(),
+ "")))
+ .findFirst()
+ .orElse(null);
+ this.changelogTmpPath =
+ writeProps.getOrDefault(CHANGELOG_TMP_PATH,
System.getProperty("java.io.tmpdir"));
checkConfig();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 73d2151b89..86828c9a58 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -94,7 +94,12 @@ public class PaimonSink
@Override
public PaimonSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new PaimonSinkWriter(
- context, table, seaTunnelRowType, jobContext,
paimonHadoopConfiguration);
+ context,
+ table,
+ seaTunnelRowType,
+ jobContext,
+ paimonSinkConfig,
+ paimonHadoopConfiguration);
}
@Override
@@ -108,7 +113,13 @@ public class PaimonSink
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>
restoreWriter(
SinkWriter.Context context, List<PaimonSinkState> states) throws
IOException {
return new PaimonSinkWriter(
- context, table, seaTunnelRowType, states, jobContext,
paimonHadoopConfiguration);
+ context,
+ table,
+ seaTunnelRowType,
+ states,
+ jobContext,
+ paimonSinkConfig,
+ paimonHadoopConfiguration);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index ac0b1027d0..e57e62c981 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
@@ -33,7 +34,9 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkSta
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -58,6 +61,8 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
@Slf4j
public class PaimonSinkWriter
implements SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>,
@@ -65,14 +70,14 @@ public class PaimonSinkWriter
private String commitUser = UUID.randomUUID().toString();
+ private final FileStoreTable table;
+
private final WriteBuilder tableWriteBuilder;
private final TableWrite tableWrite;
private List<CommitMessage> committables = new ArrayList<>();
- private final Table table;
-
private final SeaTunnelRowType seaTunnelRowType;
private final SinkWriter.Context context;
@@ -90,18 +95,30 @@ public class PaimonSinkWriter
Table table,
SeaTunnelRowType seaTunnelRowType,
JobContext jobContext,
+ PaimonSinkConfig paimonSinkConfig,
PaimonHadoopConfiguration paimonHadoopConfiguration) {
- this.table = table;
+ this.table = (FileStoreTable) table;
+ CoreOptions.ChangelogProducer changelogProducer =
+ this.table.coreOptions().changelogProducer();
+ if (Objects.nonNull(paimonSinkConfig.getChangelogProducer())
+ && changelogProducer !=
paimonSinkConfig.getChangelogProducer()) {
+ log.warn(
+ "configured the props named 'changelog-producer' which is
not compatible with the options in table , so it will use the table's
'changelog-producer'");
+ }
+ String changelogTmpPath = paimonSinkConfig.getChangelogTmpPath();
this.tableWriteBuilder =
JobContextUtil.isBatchJob(jobContext)
? this.table.newBatchWriteBuilder()
: this.table.newStreamWriteBuilder();
- this.tableWrite = tableWriteBuilder.newWrite();
+ this.tableWrite =
+ tableWriteBuilder
+ .newWrite()
+
.withIOManager(IOManager.create(splitPaths(changelogTmpPath)));
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.jobContext = jobContext;
- this.tableSchema = ((FileStoreTable) table).schema();
- BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
+ this.tableSchema = this.table.schema();
+ BucketMode bucketMode = this.table.bucketMode();
this.dynamicBucket =
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC
== bucketMode;
int bucket = ((FileStoreTable) table).coreOptions().bucket();
@@ -124,8 +141,15 @@ public class PaimonSinkWriter
SeaTunnelRowType seaTunnelRowType,
List<PaimonSinkState> states,
JobContext jobContext,
+ PaimonSinkConfig paimonSinkConfig,
PaimonHadoopConfiguration paimonHadoopConfiguration) {
- this(context, table, seaTunnelRowType, jobContext,
paimonHadoopConfiguration);
+ this(
+ context,
+ table,
+ seaTunnelRowType,
+ jobContext,
+ paimonSinkConfig,
+ paimonHadoopConfiguration);
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
@@ -186,7 +210,8 @@ public class PaimonSinkWriter
fileCommittables = ((BatchTableWrite)
tableWrite).prepareCommit();
} else {
fileCommittables =
- ((StreamTableWrite) tableWrite).prepareCommit(false,
checkpointId);
+ ((StreamTableWrite) tableWrite)
+ .prepareCommit(waitCompaction(), checkpointId);
}
committables.addAll(fileCommittables);
return Optional.of(new PaimonCommitInfo(fileCommittables,
checkpointId));
@@ -224,4 +249,11 @@ public class PaimonSinkWriter
committables.clear();
}
}
+
+ private boolean waitCompaction() {
+ CoreOptions.ChangelogProducer changelogProducer =
+ this.table.coreOptions().changelogProducer();
+ return changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
+ || changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index fa8ed33820..ca825a269f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -61,6 +62,10 @@ public class SchemaUtil {
paiSchemaBuilder.partitionKeys(partitionKeys);
}
Map<String, String> writeProps = paimonSinkConfig.getWriteProps();
+ CoreOptions.ChangelogProducer changelogProducer =
paimonSinkConfig.getChangelogProducer();
+ if (changelogProducer != null) {
+ writeProps.remove(PaimonSinkConfig.CHANGELOG_TMP_PATH);
+ }
if (!writeProps.isEmpty()) {
paiSchemaBuilder.options(writeProps);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
index 700bf25f51..c17d8dbc14 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
@@ -21,18 +21,23 @@
package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.util.Arrays;
+
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PaimonRecord {
+ public RowKind rowKind;
public Long pkId;
public String name;
public Integer score;
+ public String op;
public String dt;
public Timestamp oneTime;
public Timestamp twoTime;
@@ -45,6 +50,12 @@ public class PaimonRecord {
this.name = name;
}
+ public PaimonRecord(RowKind rowKind, Long pkId, String name) {
+ this(pkId, name);
+ this.rowKind = rowKind;
+ this.name = name;
+ }
+
public PaimonRecord(Long pkId, String name, String dt) {
this(pkId, name);
this.dt = dt;
@@ -68,4 +79,23 @@ public class PaimonRecord {
this.threeTime = threeTime;
this.fourTime = fourTime;
}
+
+ public String toChangeLogFull() {
+ Object[] objects = new Object[4];
+ objects[0] = rowKind.shortString();
+ objects[1] = pkId;
+ objects[2] = name;
+ objects[3] = score;
+ return Arrays.toString(objects);
+ }
+
+ public String toChangeLogLookUp() {
+ Object[] objects = new Object[5];
+ objects[0] = rowKind.shortString();
+ objects[1] = pkId;
+ objects[2] = name;
+ objects[3] = score;
+ objects[4] = op;
+ return Arrays.toString(objects);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index dc6bfc9eba..293cf6c76e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
@@ -25,6 +26,7 @@ import
org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.lang3.StringUtils;
@@ -58,7 +60,9 @@ import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -84,6 +88,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements
TestResource {
private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
private boolean isWindows;
+ private boolean changeLogEnabled = false;
@BeforeAll
@Override
@@ -545,6 +550,129 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
});
}
+ @TestTemplate
+ public void testChangelogLookup(TestContainer container) throws Exception {
+ // create Piamon table (changelog-producer=lookup)
+ Container.ExecResult writeResult =
+
container.executeJob("/changelog_fake_cdc_sink_paimon_case1_ddl.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+ TimeUnit.SECONDS.sleep(20);
+ String[] jobIds =
+ new String[] {
+ JobIdGenerator.newJobId(), JobIdGenerator.newJobId(),
JobIdGenerator.newJobId()
+ };
+ log.info("jobIds: {}", Arrays.toString(jobIds));
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ // read changelog and write to append only paimon table
+ futures.add(
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+
container.executeJob("/changelog_paimon_to_paimon.conf", jobIds[0]);
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ }));
+ TimeUnit.SECONDS.sleep(10);
+ // dml: insert data
+ futures.add(
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/changelog_fake_cdc_sink_paimon_case1_insert_data.conf",
+ jobIds[1]);
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ }));
+ // dml: update and delete data
+ TimeUnit.SECONDS.sleep(10);
+ futures.add(
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/changelog_fake_cdc_sink_paimon_case1_update_data.conf",
+ jobIds[2]);
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ }));
+ // stream job running 30 seconds
+ TimeUnit.SECONDS.sleep(30);
+ // cancel stream job
+ container.cancelJob(jobIds[1]);
+ container.cancelJob(jobIds[2]);
+ container.cancelJob(jobIds[0]);
+ changeLogEnabled = true;
+ TimeUnit.SECONDS.sleep(10);
+ // copy paimon to local
+ container.executeExtraCommands(containerExtendedFactory);
+ List<PaimonRecord> paimonRecords1 =
loadPaimonData("seatunnel_namespace", "st_test_sink");
+ List<String> actual1 =
+ paimonRecords1.stream()
+ .map(PaimonRecord::toChangeLogLookUp)
+ .collect(Collectors.toList());
+ log.info("paimon records: {}", actual1);
+ Assertions.assertEquals(8, actual1.size());
+ Assertions.assertEquals(
+ Arrays.asList(
+ "[+I, 1, A, 100, +I]",
+ "[+I, 2, B, 100, +I]",
+ "[+I, 3, C, 100, +I]",
+ "[+I, 1, A, 100, -U]",
+ "[+I, 1, Aa, 200, +U]",
+ "[+I, 2, B, 100, -U]",
+ "[+I, 2, Bb, 90, +U]",
+ "[+I, 3, C, 100, -D]"),
+ actual1);
+ List<PaimonRecord> paimonRecords2 =
loadPaimonData("seatunnel_namespace", "st_test_lookup");
+ List<String> actual2 =
+ paimonRecords2.stream()
+ .map(PaimonRecord::toChangeLogFull)
+ .collect(Collectors.toList());
+ log.info("paimon records: {}", actual2);
+ Assertions.assertEquals(2, actual2.size());
+ Assertions.assertEquals(Arrays.asList("[+U, 1, Aa, 200]", "[+I, 2, Bb,
90]"), actual2);
+ changeLogEnabled = false;
+ futures.forEach(future -> future.cancel(true));
+ }
+
+ @TestTemplate
+ public void testChangelogFullCompaction(TestContainer container) throws
Exception {
+ String jobId = JobIdGenerator.newJobId();
+ log.info("jobId: {}", jobId);
+ CompletableFuture<Void> voidCompletableFuture =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/changelog_fake_cdc_sink_paimon_case2.conf", jobId);
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ });
+ // stream job running 20 seconds
+ TimeUnit.SECONDS.sleep(20);
+ changeLogEnabled = true;
+ // cancel stream job
+ container.cancelJob(jobId);
+ TimeUnit.SECONDS.sleep(5);
+ // copy paimon to local
+ container.executeExtraCommands(containerExtendedFactory);
+ List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace", "st_test_full");
+ List<String> actual =
+ paimonRecords.stream()
+ .map(PaimonRecord::toChangeLogFull)
+ .collect(Collectors.toList());
+ log.info("paimon records: {}", actual);
+ Assertions.assertEquals(2, actual.size());
+ Assertions.assertEquals(Arrays.asList("[+U, 1, Aa, 200]", "[+I, 2, Bb,
90]"), actual);
+ changeLogEnabled = false;
+ voidCompletableFuture.cancel(true);
+ }
+
protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
if (isWindows) {
@@ -619,13 +747,31 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
reader.forEachRemaining(
row -> {
- PaimonRecord paimonRecord =
- new PaimonRecord(row.getLong(0),
row.getString(1).toString());
+ PaimonRecord paimonRecord;
+ if (changeLogEnabled) {
+ paimonRecord =
+ new PaimonRecord(
+ row.getRowKind(),
+ row.getLong(0),
+ row.getString(1).toString());
+ } else {
+ paimonRecord =
+ new PaimonRecord(row.getLong(0),
row.getString(1).toString());
+ }
if (table.schema().fieldNames().contains("score")) {
paimonRecord.setScore(row.getInt(2));
}
+ if (table.schema().fieldNames().contains("op")) {
+ paimonRecord.setOp(row.getString(3).toString());
+ }
result.add(paimonRecord);
- log.info("key_id:" + row.getLong(0) + ", name:" +
row.getString(1));
+ log.info(
+ "rowKind:"
+ + row.getRowKind().shortString()
+ + ", key_id:"
+ + row.getLong(0)
+ + ", name:"
+ + row.getString(1));
});
}
log.info(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
new file mode 100644
index 0000000000..6a32727505
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "batch"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = []
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test_lookup"
+ paimon.table.write-props = {
+ changelog-producer = lookup
+ changelog-tmp-path = "/tmp/paimon/changelog"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
new file mode 100644
index 0000000000..9b7310177c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "Streaming"
+ checkpoint.interval = 2000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test_lookup"
+ paimon.table.write-props = {
+ changelog-producer = lookup
+ changelog-tmp-path = "/tmp/paimon/changelog"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
new file mode 100644
index 0000000000..271ad20bff
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "Streaming"
+ checkpoint.interval = 2000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "Aa", 200]
+ },
+ {
+ kind = INSERT
+ fields = [2, "Bb", 90]
+ },
+ {
+ kind = DELETE
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test_lookup"
+ paimon.table.write-props = {
+ changelog-producer = lookup
+ changelog-tmp-path = "/tmp/paimon/changelog"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
new file mode 100644
index 0000000000..f7135e645f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "Streaming"
+ checkpoint.interval = 2000
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "Aa", 200]
+ },
+ {
+ kind = INSERT
+ fields = [2, "Bb", 90]
+ },
+ {
+ kind = DELETE
+ fields = [3, "C", 100]
+ },
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test_full"
+ paimon.table.write-props = {
+ changelog-producer = full-compaction
+ changelog-tmp-path = "/tmp/paimon/changelog"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
new file mode 100644
index 0000000000..d23d11d9e0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "Streaming"
+ checkpoint.interval = 2000
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test_lookup"
+ }
+}
+
+transform {
+ RowKindExtractor {
+ custom_field_name = op
+ transform_type = SHORT
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "seatunnel_namespace"
+ table = "st_test_sink"
+ paimon.table.write-props = {
+ write-only = true
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index d7bd0f4d74..10d5685c6d 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.e2e.common.container;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
@@ -68,6 +70,8 @@ public abstract class AbstractTestContainer implements
TestContainer {
protected abstract String getSavePointCommand();
+ protected abstract String getCancelJobCommand();
+
protected abstract String getRestoreCommand();
protected abstract String getConnectorNamePrefix();
@@ -95,11 +99,11 @@ public abstract class AbstractTestContainer implements
TestContainer {
protected Container.ExecResult executeJob(GenericContainer<?> container,
String confFile)
throws IOException, InterruptedException {
- return executeJob(container, confFile, null);
+ return executeJob(container, confFile, null, null);
}
protected Container.ExecResult executeJob(
- GenericContainer<?> container, String confFile, List<String>
variables)
+ GenericContainer<?> container, String confFile, String jobId,
List<String> variables)
throws IOException, InterruptedException {
final String confInContainerPath =
copyConfigFileToContainer(container, confFile);
// copy connectors
@@ -118,6 +122,10 @@ public abstract class AbstractTestContainer implements
TestContainer {
command.add(adaptPathForWin(confInContainerPath));
command.add("--name");
command.add(new File(confInContainerPath).getName());
+ if (StringUtils.isNoneEmpty(jobId)) {
+ command.add("--set-job-id");
+ command.add(jobId);
+ }
List<String> extraStartShellCommands = new
ArrayList<>(getExtraStartShellCommands());
if (variables != null && !variables.isEmpty()) {
variables.forEach(
@@ -142,6 +150,18 @@ public abstract class AbstractTestContainer implements
TestContainer {
return executeCommand(container, command);
}
+ protected Container.ExecResult cancelJob(GenericContainer<?> container,
String jobId)
+ throws IOException, InterruptedException {
+ final List<String> command = new ArrayList<>();
+ String binPath = Paths.get(SEATUNNEL_HOME, "bin",
getStartShellName()).toString();
+ // base command
+ command.add(adaptPathForWin(binPath));
+ command.add(getCancelJobCommand());
+ command.add(jobId);
+ command.addAll(getExtraStartShellCommands());
+ return executeCommand(container, command);
+ }
+
protected Container.ExecResult restoreJob(
GenericContainer<?> container, String confFile, String jobId)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index e83a3635e8..72584158f6 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -39,15 +39,20 @@ public interface TestContainer extends TestResource {
Container.ExecResult executeJob(String confFile, List<String> variables)
throws IOException, InterruptedException;
+ default Container.ExecResult executeJob(String confFile, String jobId)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
default Container.ExecResult executeConnectorCheck(String[] args)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
- };
+ }
default Container.ExecResult executeBaseCommand(String[] args)
throws IOException, InterruptedException {
throw new UnsupportedOperationException("Not implemented");
- };
+ }
default Container.ExecResult savepointJob(String jobId)
throws IOException, InterruptedException {
@@ -59,6 +64,10 @@ public interface TestContainer extends TestResource {
throw new UnsupportedOperationException("Not implemented");
}
+ default Container.ExecResult cancelJob(String jobId) throws IOException,
InterruptedException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
String getServerLogs();
void copyFileToContainer(String path, String targetPath);
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index ff16c0c754..47b3de5ff5 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -131,6 +131,11 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
throw new UnsupportedOperationException("Not implemented");
}
+ @Override
+ protected String getCancelJobCommand() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
protected String getRestoreCommand() {
throw new UnsupportedOperationException("Not implemented");
@@ -150,14 +155,14 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
- return executeJob(confFile, null);
+ return executeJob(confFile, Collections.emptyList());
}
@Override
public Container.ExecResult executeJob(String confFile, List<String>
variables)
throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(jobManager, confFile, variables);
+ return executeJob(jobManager, confFile, null, variables);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index 3a27d78d42..ea8bcd8788 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -189,6 +189,11 @@ public class ConnectorPackageServiceContainer extends
AbstractTestContainer {
return "-s";
}
+ @Override
+ protected String getCancelJobCommand() {
+ return "-can";
+ }
+
@Override
protected String getRestoreCommand() {
return "-r";
@@ -220,14 +225,14 @@ public class ConnectorPackageServiceContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
- return executeJob(confFile, null);
+ return executeJob(confFile, Collections.emptyList());
}
@Override
public Container.ExecResult executeJob(String confFile, List<String>
variables)
throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(server1, confFile, variables);
+ return executeJob(server1, confFile, null, variables);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 96e5162d7a..14d89571b9 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.commons.compress.utils.Lists;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -241,6 +242,11 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
return "-s";
}
+ @Override
+ protected String getCancelJobCommand() {
+ return "-can";
+ }
+
@Override
protected String getRestoreCommand() {
return "-r";
@@ -281,16 +287,27 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
- return executeJob(confFile, null);
+ return executeJob(confFile, Lists.newArrayList());
}
@Override
public Container.ExecResult executeJob(String confFile, List<String>
variables)
throws IOException, InterruptedException {
+ return executeJob(confFile, null, variables);
+ }
+
+ @Override
+ public Container.ExecResult executeJob(String confFile, String jobId)
+ throws IOException, InterruptedException {
+ return executeJob(confFile, jobId, null);
+ }
+
+ private Container.ExecResult executeJob(String confFile, String jobId,
List<String> variables)
+ throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
List<String> beforeThreads = ContainerUtil.getJVMThreadNames(server);
runningCount.incrementAndGet();
- Container.ExecResult result = executeJob(server, confFile, variables);
+ Container.ExecResult result = executeJob(server, confFile, jobId,
variables);
if (runningCount.decrementAndGet() > 0) {
// only check thread when job all finished.
return result;
@@ -322,7 +339,6 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
.collect(Collectors.joining()));
});
}
- // classLoaderObjectCheck(1);
return result;
}
@@ -467,6 +483,11 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
return result;
}
+ @Override
+ public Container.ExecResult cancelJob(String jobId) throws IOException,
InterruptedException {
+ return cancelJob(server, jobId);
+ }
+
@Override
public String getServerLogs() {
return server.getLogs();
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 9970ffb3aa..b13851582c 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
@@ -87,6 +88,11 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
throw new UnsupportedOperationException("Not implemented");
}
+ @Override
+ protected String getCancelJobCommand() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
protected String getRestoreCommand() {
throw new UnsupportedOperationException("Not implemented");
@@ -105,14 +111,14 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
- return executeJob(confFile, null);
+ return executeJob(confFile, Collections.emptyList());
}
@Override
public Container.ExecResult executeJob(String confFile, List<String>
variables)
throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(master, confFile, variables);
+ return executeJob(master, confFile, null, variables);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
new file mode 100644
index 0000000000..6904593b24
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class JobIdGenerator {
+
+ public static String newJobId() {
+ return
String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong()));
+ }
+}