This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3a16b4a4b5 [Feature][Connectors-v2] Support auto-increment id for
FakeSource (#9505)
3a16b4a4b5 is described below
commit 3a16b4a4b56786445b16e179c2c1a66b579912a3
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jul 3 11:19:21 2025 +0800
[Feature][Connectors-v2] Support auto-increment id for FakeSource (#9505)
---
docs/en/connector-v2/source/FakeSource.md | 31 +++++-
docs/zh/connector-v2/source/FakeSource.md | 112 +++++++++++++--------
.../seatunnel/fake/config/FakeConfig.java | 13 +++
.../seatunnel/fake/config/FakeSourceOptions.java | 12 +++
.../seatunnel/fake/source/FakeDataGenerator.java | 6 +-
.../seatunnel/fake/source/FakeSource.java | 6 +-
.../seatunnel/fake/source/FakeSourceReader.java | 7 +-
.../fake/utils/AutoIncrementIdGenerator.java | 36 +++++++
.../seatunnel/fake/utils/FakeDataRandomUtils.java | 29 +++++-
.../seatunnel/fake/utils/IdGeneratorUtils.java | 72 +++++++++++++
.../fake/source/FakeDataGeneratorTest.java | 50 ++++++++-
.../src/test/resources/fake-auto-increment-id.conf | 35 +++++++
.../e2e/connector/pulsar/PulsarBatchIT.java | 2 +-
13 files changed, 353 insertions(+), 58 deletions(-)
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index 016f9dd2bc..f9ce1d9ce8 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -26,10 +26,12 @@ just for some test cases such as type conversion or
connector new feature testin
## Source Options
-| Name | Type | Required | Default |
Description
|
+| Name | Type | Required | Default |
Description
|
|-------------------------|----------|----------|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| tables_configs | list | no | - |
Define Multiple FakeSource, each item can contains the whole fake source config
description below
|
| schema | config | yes | - |
Define Schema information
|
+| auto.increment.enabled | boolean | no | false |
Enable auto increment ID generation
|
+| auto.increment.start | int | no | |
Starting value for auto increment ID
|
| rows | config | no | - |
The row list of fake data output per degree of parallelism see title `Options
rows Case`.
|
| row.num | int | no | 5 |
The total number of data generated per degree of parallelism
|
| split.num | int | no | 1 |
the number of splits generated by the enumerator for each degree of parallelism
|
@@ -523,6 +525,33 @@ source {
}
+```
+
+### Auto-increment primary key Example
+
+```hocon
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ auto.increment.enabled = true
+ auto.increment.start = 1000
+ row.num = 50000
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ primaryKey {
+ name = "pk"
+ columnNames = [id]
+ }
+ }
+ }
+}
+
```
## Changelog
diff --git a/docs/zh/connector-v2/source/FakeSource.md
b/docs/zh/connector-v2/source/FakeSource.md
index 41509358ab..028d1156eb 100644
--- a/docs/zh/connector-v2/source/FakeSource.md
+++ b/docs/zh/connector-v2/source/FakeSource.md
@@ -25,49 +25,50 @@ FakeSource 是一个虚拟数据源,它根据用户定义的 schema 数据结
## 数据源选项
-| 名称 | 类型 | 必填 | 默认值 | 描述
|
+| 名称 | 类型 | 必填 | 默认值 | 描述
|
|---------------------------|---------|------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| tables_configs | list | 否 | - | 定义多个
FakeSource,每个项可以包含完整的 FakeSource 配置描述
|
-| schema | config | 是 | - | 定义 Schema 信息
|
-| rows | config | 否 | - | 每个并行度输出的伪数据行列表,详见标题
`Options rows Case`
|
-| row.num | int | 否 | 5 | 每个并行度生成的数据总行数
|
-| split.num | int | 否 | 1 | 枚举器为每个并行度生成的分片数量
|
-| split.read-interval | long | 否 | 1 | 读取器在两个分片读取之间的间隔时间(毫秒)
|
-| map.size | int | 否 | 5 | 连接器生成的 `map` 类型的大小
|
-| array.size | int | 否 | 5 | 连接器生成的 `array` 类型的大小
|
-| bytes.length | int | 否 | 5 | 连接器生成的 `bytes` 类型的长度
|
-| string.length | int | 否 | 5 | 连接器生成的 `string` 类型的长度
|
-| string.fake.mode | string | 否 | range | 生成字符串数据的伪数据模式,支持
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `string.template` 选项
|
-| string.template | list | 否 | - |
连接器生成的字符串类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| tinyint.fake.mode | string | 否 | range | 生成 tinyint 数据的伪数据模式,支持
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `tinyint.template` 选项
|
-| tinyint.min | tinyint | 否 | 0 | 连接器生成的 tinyint 数据的最小值
|
-| tinyint.max | tinyint | 否 | 127 | 连接器生成的 tinyint 数据的最大值
|
-| tinyint.template | list | 否 | - | 连接器生成的 tinyint
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| smallint.fake.mode | string | 否 | range | 生成 smallint
数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`smallint.template` 选项
|
-| smallint.min | smallint| 否 | 0 | 连接器生成的 smallint 数据的最小值
|
-| smallint.max | smallint| 否 | 32767 | 连接器生成的 smallint 数据的最大值
|
-| smallint.template | list | 否 | - | 连接器生成的 smallint
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| int.fake.template | string | 否 | range | 生成 int 数据的伪数据模式,支持
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `int.template` 选项
|
-| int.min | smallint | 否 | 0 | 连接器生成的 int 数据的最小值
|
-| int.max | smallint | 否 | 0x7fffffff | 连接器生成的 int
数据的最大值
|
-| int.template | list | 否 | - | 连接器生成的 int
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| bigint.fake.mode | string | 否 | range | 生成 bigint 数据的伪数据模式,支持
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `bigint.template` 选项
|
-| bigint.min | bigint | 否 | 0 | 连接器生成的 bigint 数据的最小值
|
-| bigint.max | bigint | 否 | 0x7fffffffffffffff | 连接器生成的
bigint 数据的最大值
|
-| bigint.template | list | 否 | - | 连接器生成的 bigint
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| float.fake.mode | string | 否 | range | 生成 float 数据的伪数据模式,支持
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `float.template` 选项
|
-| float.min | float | 否 | 0 | 连接器生成的 float 数据的最小值
|
-| float.max | float | 否 | 0x1.fffffeP+127 | 连接器生成的 float
数据的最大值
|
-| float.template | list | 否 | - | 连接器生成的 float
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| double.fake.mode | string | 否 | range | 生成 double 数据的伪数据模式,支持
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `double.template` 选项
|
-| double.min | double | 否 | 0 | 连接器生成的 double 数据的最小值
|
-| double.max | double | 否 | 0x1.fffffffffffffP+1023 | 连接器生成的
double 数据的最大值
|
-| double.template | list | 否 | - | 连接器生成的 double
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
-| vector.dimension | int | 否 | 4 | 生成的向量的维度,不包括二进制向量
|
-| binary.vector.dimension | int | 否 | 8 | 生成的二进制向量的维度
|
-| vector.float.min | float | 否 | 0 | 连接器生成的向量中 float 数据的最小值
|
-| vector.float.max | float | 否 | 0x1.fffffeP+127 | 连接器生成的向量中
float 数据的最大值
|
-| common-options | | 否 | - | 数据源插件通用参数,详情请参考
[Source Common Options](../source-common-options.md)
|
+| tables_configs | list | 否 | - | 定义多个
FakeSource,每个项可以包含完整的 FakeSource 配置描述
|
+| schema | config | 是 | - | 定义
Schema 信息
|
+| auto.increment.enabled | boolean | 否 | false |
启用自动递增ID
|
+| auto.increment.start | int | 否 | |
自动递增ID的起始值
|
+| row.num | int | 否 | 5 |
每个并行度生成的数据总行数
|
+| split.num | int | 否 | 1 |
枚举器为每个并行度生成的分片数量
|
+| split.read-interval | long | 否 | 1 |
读取器在两个分片读取之间的间隔时间(毫秒)
|
+| map.size | int | 否 | 5 | 连接器生成的
`map` 类型的大小
|
+| array.size | int | 否 | 5 | 连接器生成的
`array` 类型的大小
|
+| bytes.length | int | 否 | 5 | 连接器生成的
`bytes` 类型的长度
|
+| string.length | int | 否 | 5 | 连接器生成的
`string` 类型的长度
|
+| string.fake.mode | string | 否 | range |
生成字符串数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`string.template` 选项
|
+| string.template | list | 否 | - |
连接器生成的字符串类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| tinyint.fake.mode | string | 否 | range | 生成
tinyint 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`tinyint.template` 选项
|
+| tinyint.min | tinyint | 否 | 0 | 连接器生成的
tinyint 数据的最小值
|
+| tinyint.max | tinyint | 否 | 127 | 连接器生成的
tinyint 数据的最大值
|
+| tinyint.template | list | 否 | - | 连接器生成的
tinyint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| smallint.fake.mode | string | 否 | range | 生成
smallint 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`smallint.template` 选项
|
+| smallint.min | smallint | 否 | 0 | 连接器生成的
smallint 数据的最小值
|
+| smallint.max | smallint | 否 | 32767 | 连接器生成的
smallint 数据的最大值
|
+| smallint.template | list | 否 | - | 连接器生成的
smallint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| int.fake.template | string | 否 | range | 生成 int
数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`int.template` 选项
|
+| int.min | smallint | 否 | 0 | 连接器生成的
int 数据的最小值
|
+| int.max | smallint | 否 | 0x7fffffff | 连接器生成的
int 数据的最大值
|
+| int.template | list | 否 | - | 连接器生成的
int 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| bigint.fake.mode | string | 否 | range | 生成
bigint 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`bigint.template` 选项
|
+| bigint.min | bigint | 否 | 0 | 连接器生成的
bigint 数据的最小值
|
+| bigint.max | bigint | 否 | 0x7fffffffffffffff | 连接器生成的
bigint 数据的最大值
|
+| bigint.template | list | 否 | - | 连接器生成的
bigint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| float.fake.mode | string | 否 | range | 生成
float 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`float.template` 选项
|
+| float.min | float | 否 | 0 | 连接器生成的
float 数据的最小值
|
+| float.max | float | 否 | 0x1.fffffeP+127 | 连接器生成的
float 数据的最大值
|
+| float.template | list | 否 | - | 连接器生成的
float 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| double.fake.mode | string | 否 | range | 生成
double 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置
`double.template` 选项
|
+| double.min | double | 否 | 0 | 连接器生成的
double 数据的最小值
|
+| double.max | double | 否 | 0x1.fffffffffffffP+1023 |
连接器生成的 double 数据的最大值
|
+| double.template | list | 否 | - | 连接器生成的
double 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
|
+| vector.dimension | int | 否 | 4 |
生成的向量的维度,不包括二进制向量
|
+| binary.vector.dimension | int | 否 | 8 |
生成的二进制向量的维度
|
+| vector.float.min | float | 否 | 0 |
连接器生成的向量中 float 数据的最小值
|
+| vector.float.max | float | 否 | 0x1.fffffeP+127 |
连接器生成的向量中 float 数据的最大值
|
+| common-options | | 否 | - |
数据源插件通用参数,详情请参考 [Source Common Options](../source-common-options.md)
|
## 任务示例
@@ -520,6 +521,33 @@ source {
}
```
+### 自增主键示例
+
+```hocon
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ auto.increment.enabled = true
+ auto.increment.start = 1000
+ row.num = 50000
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ primaryKey {
+ name = "pk"
+ columnNames = [id]
+ }
+ }
+ }
+}
+
+```
+
## 变更日志
<ChangeLog />
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 619cedeeba..209d81fd60 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -34,7 +34,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.api.options.EnvCommonOptions.PARALLELISM;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.ARRAY_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.AUTO_INCREMENT_ENABLED;
+import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.AUTO_INCREMENT_START;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.BIGINT_FAKE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.BIGINT_MAX;
import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.BIGINT_MIN;
@@ -82,6 +85,9 @@ import static
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOp
@Builder
@Getter
public class FakeConfig implements Serializable {
+
+ @Builder.Default private int parallelism = PARALLELISM.defaultValue();
+
@Builder.Default private int rowNum = ROW_NUM.defaultValue();
@Builder.Default private int splitNum = SPLIT_NUM.defaultValue();
@@ -148,6 +154,10 @@ public class FakeConfig implements Serializable {
@Builder.Default
private FakeSourceOptions.FakeMode doubleFakeMode =
DOUBLE_FAKE_MODE.defaultValue();
+ @Builder.Default private Boolean autoIncrementEnabled =
AUTO_INCREMENT_ENABLED.defaultValue();
+
+ @Builder.Default private Long autoIncrementStart =
AUTO_INCREMENT_START.defaultValue();
+
private List<String> stringTemplate;
private List<Integer> tinyintTemplate;
private List<Integer> smallintTemplate;
@@ -170,6 +180,7 @@ public class FakeConfig implements Serializable {
public static FakeConfig buildWithConfig(ReadonlyConfig readonlyConfig) {
FakeConfigBuilder builder = FakeConfig.builder();
+
readonlyConfig.getOptional(PARALLELISM).ifPresent(builder::parallelism);
builder.rowNum(readonlyConfig.get(ROW_NUM));
builder.splitNum(readonlyConfig.get(SPLIT_NUM));
builder.splitReadInterval(readonlyConfig.get(SPLIT_READ_INTERVAL));
@@ -204,6 +215,8 @@ public class FakeConfig implements Serializable {
readonlyConfig.getOptional(TIME_HOUR_TEMPLATE).ifPresent(builder::timeHourTemplate);
readonlyConfig.getOptional(TIME_MINUTE_TEMPLATE).ifPresent(builder::timeMinuteTemplate);
readonlyConfig.getOptional(TIME_SECOND_TEMPLATE).ifPresent(builder::timeSecondTemplate);
+
readonlyConfig.getOptional(AUTO_INCREMENT_ENABLED).ifPresent(builder::autoIncrementEnabled);
+
readonlyConfig.getOptional(AUTO_INCREMENT_START).ifPresent(builder::autoIncrementStart);
readonlyConfig
.getOptional(TINYINT_MIN)
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
index 8b6c514b95..6de04050cc 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
@@ -299,6 +299,18 @@ public class FakeSourceOptions {
.defaultValue(FakeMode.RANGE)
.withDescription("The fake mode of generating double
data");
+ public static final Option<Boolean> AUTO_INCREMENT_ENABLED =
+ Options.key("auto.increment.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable auto increment ID generation");
+
+ public static final Option<Long> AUTO_INCREMENT_START =
+ Options.key("auto.increment.start")
+ .longType()
+ .defaultValue(1L)
+ .withDescription("Starting value for auto increment ID");
+
public enum FakeMode {
RANGE,
TEMPLATE;
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 382795c5bb..859d52be60 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -57,8 +57,6 @@ import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
-import static org.apache.seatunnel.api.table.type.SqlType.TIME;
-
public class FakeDataGenerator {
private static final String CURRENT_DATE = "CURRENT_DATE";
private static final String CURRENT_TIME = "CURRENT_TIME";
@@ -72,7 +70,7 @@ public class FakeDataGenerator {
private final FakeDataRandomUtils fakeDataRandomUtils;
private String tableId;
- public FakeDataGenerator(FakeConfig fakeConfig) {
+ public FakeDataGenerator(FakeConfig fakeConfig, String jobId) {
this.catalogTable = fakeConfig.getCatalogTable();
this.tableId = catalogTable.getTableId().toTablePath().toString();
this.fakeConfig = fakeConfig;
@@ -80,7 +78,7 @@ public class FakeDataGenerator {
fakeConfig.getFakeRows() == null
? null
: new JsonDeserializationSchema(catalogTable, false,
false);
- this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig);
+ this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig, jobId);
}
private SeaTunnelRow convertRow(FakeConfig.RowData rowData) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index dd312bed10..ba470dbf33 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -32,10 +32,13 @@ import
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+@Slf4j
public class FakeSource
implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit,
FakeSourceState>,
SupportParallelism,
@@ -82,7 +85,8 @@ public class FakeSource
@Override
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(
SourceReader.Context readerContext) {
- return new FakeSourceReader(readerContext,
multipleTableFakeSourceConfig);
+ return new FakeSourceReader(
+ readerContext, multipleTableFakeSourceConfig,
jobContext.getJobId());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index e3309c6be6..c2861c7c7d 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -48,8 +48,9 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
private volatile long latestTimestamp = 0;
public FakeSourceReader(
- SourceReader.Context context,
- MultipleTableFakeSourceConfig multipleTableFakeSourceConfig) {
+ Context context,
+ MultipleTableFakeSourceConfig multipleTableFakeSourceConfig,
+ String jobId) {
this.context = context;
this.multipleTableFakeSourceConfig = multipleTableFakeSourceConfig;
this.fakeDataGeneratorMap =
@@ -62,7 +63,7 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
.getTableId()
.toTablePath()
.toString(),
- FakeDataGenerator::new));
+ fakeConfig -> new
FakeDataGenerator(fakeConfig, jobId)));
this.minSplitReadInterval =
multipleTableFakeSourceConfig.getFakeConfigs().stream()
.map(FakeConfig::getSplitReadInterval)
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
new file mode 100644
index 0000000000..e773363bcf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.utils;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoIncrementIdGenerator implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AtomicLong id;
+
+ public AutoIncrementIdGenerator(long start) {
+ this.id = new AtomicLong(start);
+ }
+
+ public Long getNextId() {
+ return id.getAndIncrement();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
index c4a038ff1a..5e9f4e809c 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
@@ -38,9 +38,11 @@ import java.util.Map;
public class FakeDataRandomUtils {
private final FakeConfig fakeConfig;
+ private final String jobId;
- public FakeDataRandomUtils(FakeConfig fakeConfig) {
+ public FakeDataRandomUtils(FakeConfig fakeConfig, String jobId) {
this.fakeConfig = fakeConfig;
+ this.jobId = jobId;
}
private static <T> T randomFromList(List<T> list) {
@@ -93,6 +95,22 @@ public class FakeDataRandomUtils {
}
public Integer randomInt(Column column) {
+ if (fakeConfig.getAutoIncrementEnabled()
+ && IdGeneratorUtils.isPrimaryColumn(fakeConfig,
column.getName())) {
+ if (fakeConfig.getAutoIncrementStart()
+ + ((long) fakeConfig.getParallelism() *
fakeConfig.getRowNum())
+ > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ "The auto increment start value is too large, please
check your configuration.");
+ }
+ return IdGeneratorUtils.getIdGenerator(jobId, fakeConfig,
column.getName())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Auto increment is enabled, but no
id generator found."))
+ .getNextId()
+ .intValue();
+ }
List<Integer> intTemplate = fakeConfig.getIntTemplate();
if (!CollectionUtils.isEmpty(intTemplate)) {
return randomFromList(intTemplate);
@@ -101,6 +119,15 @@ public class FakeDataRandomUtils {
}
public Long randomBigint(Column column) {
+ if (fakeConfig.getAutoIncrementEnabled()
+ && IdGeneratorUtils.isPrimaryColumn(fakeConfig,
column.getName())) {
+ return IdGeneratorUtils.getIdGenerator(jobId, fakeConfig,
column.getName())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Auto increment is enabled, but no
id generator found."))
+ .getNextId();
+ }
List<Long> bigTemplate = fakeConfig.getBigTemplate();
if (!CollectionUtils.isEmpty(bigTemplate)) {
return randomFromList(bigTemplate);
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
new file mode 100644
index 0000000000..ee4e5430bc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.utils;
+
+import org.apache.seatunnel.shade.com.google.common.cache.Cache;
+import org.apache.seatunnel.shade.com.google.common.cache.CacheBuilder;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class IdGeneratorUtils {
+
+ private static final Cache<String, AutoIncrementIdGenerator> idGenerators =
+ CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .expireAfterWrite(30, TimeUnit.MINUTES)
+ .build();
+
+ public static synchronized Optional<AutoIncrementIdGenerator>
getIdGenerator(
+ String jobId, FakeConfig fakeConfig, String columnName) {
+ CatalogTable catalogTable = fakeConfig.getCatalogTable();
+ String tableName = catalogTable.getTableId().getTableName();
+ String key = String.format("%s:%s_%s", jobId, tableName, columnName);
+ AutoIncrementIdGenerator idGenerator = null;
+ try {
+ idGenerator =
+ idGenerators.get(
+ key,
+ () -> {
+ if (isPrimaryColumn(fakeConfig, columnName)) {
+ return new AutoIncrementIdGenerator(
+
fakeConfig.getAutoIncrementStart());
+ } else {
+ return null;
+ }
+ });
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ return Optional.ofNullable(idGenerator);
+ }
+
+ public static boolean isPrimaryColumn(FakeConfig fakeConfig, String
columnName) {
+ PrimaryKey primaryKey =
fakeConfig.getCatalogTable().getTableSchema().getPrimaryKey();
+ if (primaryKey == null) {
+ return false;
+ }
+ List<String> primaryColumns = primaryKey.getColumnNames();
+ return primaryColumns != null && primaryColumns.contains(columnName);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index 81aae384bb..7971eab20f 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.EnvCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -45,9 +46,13 @@ import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
public class FakeDataGeneratorTest {
@@ -59,7 +64,7 @@ public class FakeDataGeneratorTest {
SeaTunnelRowType seaTunnelRowType =
CatalogTableUtil.buildWithConfig(testConfig).getSeaTunnelRowType();
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig, null);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
Assertions.assertNotNull(seaTunnelRows);
@@ -114,7 +119,7 @@ public class FakeDataGeneratorTest {
ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig, null);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
Assertions.assertIterableEquals(expected, seaTunnelRows);
@@ -125,7 +130,7 @@ public class FakeDataGeneratorTest {
public void testVectorParse(String conf) throws FileNotFoundException,
URISyntaxException {
ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig, null);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
seaTunnelRows.forEach(
@@ -152,7 +157,7 @@ public class FakeDataGeneratorTest {
public void testColumnDataParse(String conf) throws FileNotFoundException,
URISyntaxException {
ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig, null);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
seaTunnelRows.forEach(
@@ -215,7 +220,7 @@ public class FakeDataGeneratorTest {
public void testDataParse(String conf) throws FileNotFoundException,
URISyntaxException {
ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig, null);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
seaTunnelRows.forEach(
@@ -229,6 +234,41 @@ public class FakeDataGeneratorTest {
});
}
+ @ParameterizedTest
+ @ValueSource(strings = {"fake-auto-increment-id.conf",
"fake-auto-increment-id.conf"})
+ public void testAutoIncrementId(String conf) throws FileNotFoundException,
URISyntaxException {
+ ReadonlyConfig testConfig = getTestConfigFile(conf);
+ int parallelism =
testConfig.getOptional(EnvCommonOptions.PARALLELISM).orElse(1);
+ FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+ List<CompletableFuture<List<SeaTunnelRow>>> futures = new
ArrayList<>();
+ String jobId = UUID.randomUUID().toString();
+ for (int i = 0; i < parallelism; i++) {
+ CompletableFuture<List<SeaTunnelRow>> uCompletableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ FakeDataGenerator fakeDataGenerator =
+ new FakeDataGenerator(fakeConfig,
jobId);
+ return
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
+ });
+ futures.add(uCompletableFuture);
+ }
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ List<SeaTunnelRow> seaTunnelRows =
+ futures.stream()
+ .map(CompletableFuture::join)
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ List<Integer> ids =
+ seaTunnelRows.stream()
+ .map(seaTunnelRow -> (int) seaTunnelRow.getField(0))
+ .distinct()
+ .sorted(Integer::compareTo)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(200, ids.size());
+ ids.stream().min(Integer::compareTo).ifPresent(min ->
Assertions.assertEquals(100, min));
+ ids.stream().max(Integer::compareTo).ifPresent(max ->
Assertions.assertEquals(299, max));
+ }
+
private ReadonlyConfig getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-auto-increment-id.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-auto-increment-id.conf
new file mode 100644
index 0000000000..56f83e02e9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-auto-increment-id.conf
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FakeSource {
+ plugin_output = "fake"
+ auto.increment.enabled = true
+ auto.increment.start = 100
+ parallelism = 4
+ row.num = 50
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ }
+ primaryKey {
+ name = "pk"
+ columnNames = [id]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
index 466eff946e..374571d0c4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -151,7 +151,7 @@ public class PulsarBatchIT extends TestSuiteBase implements
TestResource {
ConfigFactory.parseFile(new
File(Paths.get(resource.toURI()).toString()));
FakeConfig fakeConfig =
FakeConfig.buildWithConfig(ReadonlyConfig.fromConfig(config));
- FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig);
+ FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(fakeConfig, null);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(100);
JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);