This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3a16b4a4b5 [Feature][Connectors-v2] Support auto-increment id for 
FakeSource (#9505)
3a16b4a4b5 is described below

commit 3a16b4a4b56786445b16e179c2c1a66b579912a3
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jul 3 11:19:21 2025 +0800

    [Feature][Connectors-v2] Support auto-increment id for FakeSource (#9505)
---
 docs/en/connector-v2/source/FakeSource.md          |  31 +++++-
 docs/zh/connector-v2/source/FakeSource.md          | 112 +++++++++++++--------
 .../seatunnel/fake/config/FakeConfig.java          |  13 +++
 .../seatunnel/fake/config/FakeSourceOptions.java   |  12 +++
 .../seatunnel/fake/source/FakeDataGenerator.java   |   6 +-
 .../seatunnel/fake/source/FakeSource.java          |   6 +-
 .../seatunnel/fake/source/FakeSourceReader.java    |   7 +-
 .../fake/utils/AutoIncrementIdGenerator.java       |  36 +++++++
 .../seatunnel/fake/utils/FakeDataRandomUtils.java  |  29 +++++-
 .../seatunnel/fake/utils/IdGeneratorUtils.java     |  72 +++++++++++++
 .../fake/source/FakeDataGeneratorTest.java         |  50 ++++++++-
 .../src/test/resources/fake-auto-increment-id.conf |  35 +++++++
 .../e2e/connector/pulsar/PulsarBatchIT.java        |   2 +-
 13 files changed, 353 insertions(+), 58 deletions(-)

diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
index 016f9dd2bc..f9ce1d9ce8 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -26,10 +26,12 @@ just for some test cases such as type conversion or 
connector new feature testin
 
 ## Source Options
 
-|          Name           |   Type   | Required |         Default         |    
                                                                                
  Description                                                                   
                   |
+| Name                    |   Type   | Required | Default                 |    
                                                                                
  Description                                                                   
                   |
 
|-------------------------|----------|----------|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | tables_configs          | list     | no       | -                       | 
Define Multiple FakeSource, each item can contains the whole fake source config 
description below                                                               
                      |
 | schema                  | config   | yes      | -                       | 
Define Schema information                                                       
                                                                                
                      |
+| auto.increment.enabled  | boolean  | no       | false                   | 
Enable auto increment ID generation                                             
                                                                                
                                               |
+| auto.increment.start    | int      | no       |                         | 
Starting value for auto increment ID                                            
                                                                                
                                              |
 | rows                    | config   | no       | -                       | 
The row list of fake data output per degree of parallelism see title `Options 
rows Case`.                                                                     
                        |
 | row.num                 | int      | no       | 5                       | 
The total number of data generated per degree of parallelism                    
                                                                                
                      |
 | split.num               | int      | no       | 1                       | 
the number of splits generated by the enumerator for each degree of parallelism 
                                                                                
                      |
@@ -523,6 +525,33 @@ source {
 }
 
 
+```
+
+### Auto-increment primary key Example
+
+```hocon
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    plugin_output = "fake"
+    auto.increment.enabled = true
+    auto.increment.start = 1000
+    row.num = 50000
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+      primaryKey {
+        name = "pk"
+        columnNames = [id]
+      }
+    }
+  }
+}
+
 ```
 
 ## Changelog
diff --git a/docs/zh/connector-v2/source/FakeSource.md 
b/docs/zh/connector-v2/source/FakeSource.md
index 41509358ab..028d1156eb 100644
--- a/docs/zh/connector-v2/source/FakeSource.md
+++ b/docs/zh/connector-v2/source/FakeSource.md
@@ -25,49 +25,50 @@ FakeSource 是一个虚拟数据源,它根据用户定义的 schema 数据结
 
 ## 数据源选项
 
-| 名称                      | 类型    | 必填 | 默认值  | 描述                             
                                                                                
                                                                                
    |
+| 名称                        | 类型       | 必填 | 默认值                    | 描述      
                                                                                
                                                                                
                        |
 
|---------------------------|---------|------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| tables_configs            | list    | 否   | -       | 定义多个 
FakeSource,每个项可以包含完整的 FakeSource 配置描述                                           
                                                                                
              |
-| schema                    | config    | 是   | -       | 定义 Schema 信息         
                                                                                
                                                                                
            |
-| rows                      | config    | 否   | -       | 每个并行度输出的伪数据行列表,详见标题 
`Options rows Case`                                                             
                                                                               |
-| row.num                   | int    | 否   | 5       | 每个并行度生成的数据总行数           
                                                                                
                                                                                
|
-| split.num                 | int    | 否   | 1       | 枚举器为每个并行度生成的分片数量        
                                                                                
                                                                             |
-| split.read-interval       | long  | 否   | 1       | 读取器在两个分片读取之间的间隔时间(毫秒)    
                                                                                
                                                                       |
-| map.size                  | int    | 否   | 5       | 连接器生成的 `map` 类型的大小      
                                                                                
                                                                                
  |
-| array.size                | int    | 否   | 5       | 连接器生成的 `array` 类型的大小    
                                                                                
                                                                                
  |
-| bytes.length              | int    | 否   | 5       | 连接器生成的 `bytes` 类型的长度    
                                                                                
                                                                                
  |
-| string.length             | int    | 否   | 5       | 连接器生成的 `string` 类型的长度   
                                                                                
                                                                                
  |
-| string.fake.mode          | string  | 否   | range   | 生成字符串数据的伪数据模式,支持 
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `string.template` 选项   
                                                                |
-| string.template           | list    | 否   | -       | 
连接器生成的字符串类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                   
                                                                          |
-| tinyint.fake.mode         | string  | 否   | range   | 生成 tinyint 数据的伪数据模式,支持 
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `tinyint.template` 选项  
                                                             |
-| tinyint.min               | tinyint | 否   | 0       | 连接器生成的 tinyint 数据的最小值  
                                                                                
                                                                                
  |
-| tinyint.max               | tinyint | 否   | 127     | 连接器生成的 tinyint 数据的最大值  
                                                                                
                                                                                
  |
-| tinyint.template          | list    | 否   | -       | 连接器生成的 tinyint 
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                            
                                                             |
-| smallint.fake.mode        | string  | 否   | range   | 生成 smallint 
数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`smallint.template` 选项                                                          
   |
-| smallint.min              | smallint| 否   | 0       | 连接器生成的 smallint 数据的最小值 
                                                                                
                                                                                
  |
-| smallint.max              | smallint| 否   | 32767   | 连接器生成的 smallint 数据的最大值 
                                                                                
                                                                                
  |
-| smallint.template         | list    | 否   | -       | 连接器生成的 smallint 
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                            
                                                           |
-| int.fake.template         | string  | 否   | range   | 生成 int 数据的伪数据模式,支持 
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `int.template` 选项      
                                                                 |
-| int.min                   | smallint    | 否   | 0       | 连接器生成的 int 数据的最小值  
                                                                                
                                                                                
      |
-| int.max                   | smallint    | 否   | 0x7fffffff | 连接器生成的 int 
数据的最大值                                                                          
                                                                                
              |
-| int.template              | list    | 否   | -       | 连接器生成的 int 
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                            
                                                                 |
-| bigint.fake.mode          | string  | 否   | range   | 生成 bigint 数据的伪数据模式,支持 
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `bigint.template` 选项   
                                                              |
-| bigint.min                | bigint  | 否   | 0       | 连接器生成的 bigint 数据的最小值   
                                                                                
                                                                                
  |
-| bigint.max                | bigint  | 否   | 0x7fffffffffffffff | 连接器生成的 
bigint 数据的最大值                                                                   
                                                                                
                  |
-| bigint.template           | list    | 否   | -       | 连接器生成的 bigint 
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                            
                                                             |
-| float.fake.mode           | string  | 否   | range   | 生成 float 数据的伪数据模式,支持 
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `float.template` 选项    
                                                               |
-| float.min                 | float   | 否   | 0       | 连接器生成的 float 数据的最小值    
                                                                                
                                                                                
  |
-| float.max                 | float   | 否   | 0x1.fffffeP+127 | 连接器生成的 float 
数据的最大值                                                                          
                                                                                
            |
-| float.template            | list    | 否   | -       | 连接器生成的 float 
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                            
                                                               |
-| double.fake.mode          | string  | 否   | range   | 生成 double 数据的伪数据模式,支持 
`range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 `double.template` 选项   
                                                              |
-| double.min                | double  | 否   | 0       | 连接器生成的 double 数据的最小值   
                                                                                
                                                                                
  |
-| double.max                | double  | 否   | 0x1.fffffffffffffP+1023 | 连接器生成的 
double 数据的最大值                                                                   
                                                                                
                  |
-| double.template           | list    | 否   | -       | 连接器生成的 double 
类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                            
                                                             |
-| vector.dimension          | int    | 否   | 4       | 生成的向量的维度,不包括二进制向量       
                                                                                
                                                                            |
-| binary.vector.dimension   | int    | 否   | 8       | 生成的二进制向量的维度             
                                                                                
                                                                                
  |
-| vector.float.min          | float   | 否   | 0       | 连接器生成的向量中 float 数据的最小值 
                                                                                
                                                                               |
-| vector.float.max          | float   | 否   | 0x1.fffffeP+127 | 连接器生成的向量中 
float 数据的最大值                                                                    
                                                                                
            |
-| common-options            |         | 否   | -       | 数据源插件通用参数,详情请参考 
[Source Common Options](../source-common-options.md)                            
                                                                                
     |
+| tables_configs            | list     | 否   | -                      | 定义多个 
FakeSource,每个项可以包含完整的 FakeSource 配置描述                                           
                                                                                
              |
+| schema                    | config   | 是   | -                      | 定义 
Schema 信息                                                                       
                                                                                
                           |
+| auto.increment.enabled    | boolean  | 否   | false                  | 
启用自动递增ID                                                                        
                                                                                
                    |
+| auto.increment.start      | int      | 否   |                        | 
自动递增ID的起始值                                                                      
                                                                                
                    |
+| row.num                   | int      | 否   | 5                      | 
每个并行度生成的数据总行数                                                                   
                                                                                
                     |
+| split.num                 | int      | 否   | 1                      | 
枚举器为每个并行度生成的分片数量                                                                
                                                                                
                    |
+| split.read-interval       | long     | 否   | 1                      | 
读取器在两个分片读取之间的间隔时间(毫秒)                                                           
                                                                                
                |
+| map.size                  | int      | 否   | 5                      | 连接器生成的 
`map` 类型的大小                                                                     
                                                                                
                |
+| array.size                | int      | 否   | 5                      | 连接器生成的 
`array` 类型的大小                                                                   
                                                                                
                |
+| bytes.length              | int      | 否   | 5                      | 连接器生成的 
`bytes` 类型的长度                                                                   
                                                                                
                |
+| string.length             | int      | 否   | 5                      | 连接器生成的 
`string` 类型的长度                                                                  
                                                                                
                |
+| string.fake.mode          | string   | 否   | range                  | 
生成字符串数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`string.template` 选项                                                            
       |
+| string.template           | list     | 否   | -                      | 
连接器生成的字符串类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                   
                                                                          |
+| tinyint.fake.mode         | string   | 否   | range                  | 生成 
tinyint 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`tinyint.template` 选项                                                           
    |
+| tinyint.min               | tinyint  | 否   | 0                      | 连接器生成的 
tinyint 数据的最小值                                                                  
                                                                                
               |
+| tinyint.max               | tinyint  | 否   | 127                    | 连接器生成的 
tinyint 数据的最大值                                                                  
                                                                                
               |
+| tinyint.template          | list     | 否   | -                      | 连接器生成的 
tinyint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                    
                                                                     |
+| smallint.fake.mode        | string   | 否   | range                  | 生成 
smallint 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`smallint.template` 选项                                                          
   |
+| smallint.min              | smallint | 否   | 0                      | 连接器生成的 
smallint 数据的最小值                                                                 
                                                                                
               |
+| smallint.max              | smallint | 否   | 32767                  | 连接器生成的 
smallint 数据的最大值                                                                 
                                                                                
               |
+| smallint.template         | list     | 否   | -                      | 连接器生成的 
smallint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                   
                                                                    |
+| int.fake.template         | string   | 否   | range                  | 生成 int 
数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`int.template` 选项                                                               
        |
+| int.min                   | smallint | 否   | 0                      | 连接器生成的 
int 数据的最小值                                                                      
                                                                                
               |
+| int.max                   | smallint | 否   | 0x7fffffff             | 连接器生成的 
int 数据的最大值                                                                      
                                                                                
               |
+| int.template              | list     | 否   | -                      | 连接器生成的 
int 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                        
                                                                     |
+| bigint.fake.mode          | string   | 否   | range                  | 生成 
bigint 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`bigint.template` 选项                                                            
     |
+| bigint.min                | bigint   | 否   | 0                      | 连接器生成的 
bigint 数据的最小值                                                                   
                                                                                
               |
+| bigint.max                | bigint   | 否   | 0x7fffffffffffffff     | 连接器生成的 
bigint 数据的最大值                                                                   
                                                                                
               |
+| bigint.template           | list     | 否   | -                      | 连接器生成的 
bigint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                     
                                                                    |
+| float.fake.mode           | string   | 否   | range                  | 生成 
float 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`float.template` 选项                                                             
      |
+| float.min                 | float    | 否   | 0                      | 连接器生成的 
float 数据的最小值                                                                    
                                                                                
               |
+| float.max                 | float    | 否   | 0x1.fffffeP+127        | 连接器生成的 
float 数据的最大值                                                                    
                                                                                
               |
+| float.template            | list     | 否   | -                      | 连接器生成的 
float 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                      
                                                                     |
+| double.fake.mode          | string   | 否   | range                  | 生成 
double 数据的伪数据模式,支持 `range` 和 `template`,默认为 `range`,如果配置为 `template`,用户还需配置 
`double.template` 选项                                                            
     |
+| double.min                | double   | 否   | 0                      | 连接器生成的 
double 数据的最小值                                                                   
                                                                                
               |
+| double.max                | double   | 否   | 0x1.fffffffffffffP+1023 | 
连接器生成的 double 数据的最大值                                                            
                                                                                
                      |
+| double.template           | list     | 否   | -                      | 连接器生成的 
double 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项                                     
                                                                    |
+| vector.dimension          | int      | 否   | 4                      | 
生成的向量的维度,不包括二进制向量                                                               
                                                                                
                    |
+| binary.vector.dimension   | int      | 否   | 8                      | 
生成的二进制向量的维度                                                                     
                                                                                
                       |
+| vector.float.min          | float    | 否   | 0                      | 
连接器生成的向量中 float 数据的最小值                                                          
                                                                                
                    |
+| vector.float.max          | float    | 否   | 0x1.fffffeP+127        | 
连接器生成的向量中 float 数据的最大值                                                          
                                                                                
                    |
+| common-options            |          | 否   | -                      | 
数据源插件通用参数,详情请参考 [Source Common Options](../source-common-options.md)            
                                                                                
                    |
 
 ## 任务示例
 
@@ -520,6 +521,33 @@ source {
 }
 ```
 
+### 自增主键示例
+
+```hocon
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    plugin_output = "fake"
+    auto.increment.enabled = true
+    auto.increment.start = 1000
+    row.num = 50000
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+      }
+      primaryKey {
+        name = "pk"
+        columnNames = [id]
+      }
+    }
+  }
+}
+
+```
+
 ## 变更日志
 
 <ChangeLog />
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 619cedeeba..209d81fd60 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -34,7 +34,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.seatunnel.api.options.EnvCommonOptions.PARALLELISM;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.ARRAY_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.AUTO_INCREMENT_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.AUTO_INCREMENT_START;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.BIGINT_FAKE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.BIGINT_MAX;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions.BIGINT_MIN;
@@ -82,6 +85,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOp
 @Builder
 @Getter
 public class FakeConfig implements Serializable {
+
+    @Builder.Default private int parallelism = PARALLELISM.defaultValue();
+
     @Builder.Default private int rowNum = ROW_NUM.defaultValue();
 
     @Builder.Default private int splitNum = SPLIT_NUM.defaultValue();
@@ -148,6 +154,10 @@ public class FakeConfig implements Serializable {
     @Builder.Default
     private FakeSourceOptions.FakeMode doubleFakeMode = 
DOUBLE_FAKE_MODE.defaultValue();
 
+    @Builder.Default private Boolean autoIncrementEnabled = 
AUTO_INCREMENT_ENABLED.defaultValue();
+
+    @Builder.Default private Long autoIncrementStart = 
AUTO_INCREMENT_START.defaultValue();
+
     private List<String> stringTemplate;
     private List<Integer> tinyintTemplate;
     private List<Integer> smallintTemplate;
@@ -170,6 +180,7 @@ public class FakeConfig implements Serializable {
 
     public static FakeConfig buildWithConfig(ReadonlyConfig readonlyConfig) {
         FakeConfigBuilder builder = FakeConfig.builder();
+        
readonlyConfig.getOptional(PARALLELISM).ifPresent(builder::parallelism);
         builder.rowNum(readonlyConfig.get(ROW_NUM));
         builder.splitNum(readonlyConfig.get(SPLIT_NUM));
         builder.splitReadInterval(readonlyConfig.get(SPLIT_READ_INTERVAL));
@@ -204,6 +215,8 @@ public class FakeConfig implements Serializable {
         
readonlyConfig.getOptional(TIME_HOUR_TEMPLATE).ifPresent(builder::timeHourTemplate);
         
readonlyConfig.getOptional(TIME_MINUTE_TEMPLATE).ifPresent(builder::timeMinuteTemplate);
         
readonlyConfig.getOptional(TIME_SECOND_TEMPLATE).ifPresent(builder::timeSecondTemplate);
+        
readonlyConfig.getOptional(AUTO_INCREMENT_ENABLED).ifPresent(builder::autoIncrementEnabled);
+        
readonlyConfig.getOptional(AUTO_INCREMENT_START).ifPresent(builder::autoIncrementStart);
 
         readonlyConfig
                 .getOptional(TINYINT_MIN)
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
index 8b6c514b95..6de04050cc 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeSourceOptions.java
@@ -299,6 +299,18 @@ public class FakeSourceOptions {
                     .defaultValue(FakeMode.RANGE)
                     .withDescription("The fake mode of generating double 
data");
 
+    public static final Option<Boolean> AUTO_INCREMENT_ENABLED =
+            Options.key("auto.increment.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enable auto increment ID generation");
+
+    public static final Option<Long> AUTO_INCREMENT_START =
+            Options.key("auto.increment.start")
+                    .longType()
+                    .defaultValue(1L)
+                    .withDescription("Starting value for auto increment ID");
+
     public enum FakeMode {
         RANGE,
         TEMPLATE;
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 382795c5bb..859d52be60 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -57,8 +57,6 @@ import java.util.List;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static org.apache.seatunnel.api.table.type.SqlType.TIME;
-
 public class FakeDataGenerator {
     private static final String CURRENT_DATE = "CURRENT_DATE";
     private static final String CURRENT_TIME = "CURRENT_TIME";
@@ -72,7 +70,7 @@ public class FakeDataGenerator {
     private final FakeDataRandomUtils fakeDataRandomUtils;
     private String tableId;
 
-    public FakeDataGenerator(FakeConfig fakeConfig) {
+    public FakeDataGenerator(FakeConfig fakeConfig, String jobId) {
         this.catalogTable = fakeConfig.getCatalogTable();
         this.tableId = catalogTable.getTableId().toTablePath().toString();
         this.fakeConfig = fakeConfig;
@@ -80,7 +78,7 @@ public class FakeDataGenerator {
                 fakeConfig.getFakeRows() == null
                         ? null
                         : new JsonDeserializationSchema(catalogTable, false, 
false);
-        this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig);
+        this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig, jobId);
     }
 
     private SeaTunnelRow convertRow(FakeConfig.RowData rowData) {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index dd312bed10..ba470dbf33 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -32,10 +32,13 @@ import 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class FakeSource
         implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, 
FakeSourceState>,
                 SupportParallelism,
@@ -82,7 +85,8 @@ public class FakeSource
     @Override
     public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(
             SourceReader.Context readerContext) {
-        return new FakeSourceReader(readerContext, 
multipleTableFakeSourceConfig);
+        return new FakeSourceReader(
+                readerContext, multipleTableFakeSourceConfig, 
jobContext.getJobId());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index e3309c6be6..c2861c7c7d 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -48,8 +48,9 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
     private volatile long latestTimestamp = 0;
 
     public FakeSourceReader(
-            SourceReader.Context context,
-            MultipleTableFakeSourceConfig multipleTableFakeSourceConfig) {
+            Context context,
+            MultipleTableFakeSourceConfig multipleTableFakeSourceConfig,
+            String jobId) {
         this.context = context;
         this.multipleTableFakeSourceConfig = multipleTableFakeSourceConfig;
         this.fakeDataGeneratorMap =
@@ -62,7 +63,7 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
                                                         .getTableId()
                                                         .toTablePath()
                                                         .toString(),
-                                        FakeDataGenerator::new));
+                                        fakeConfig -> new 
FakeDataGenerator(fakeConfig, jobId)));
         this.minSplitReadInterval =
                 multipleTableFakeSourceConfig.getFakeConfigs().stream()
                         .map(FakeConfig::getSplitReadInterval)
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
new file mode 100644
index 0000000000..e773363bcf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.utils;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoIncrementIdGenerator implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final AtomicLong id;
+
+    public AutoIncrementIdGenerator(long start) {
+        this.id = new AtomicLong(start);
+    }
+
+    public Long getNextId() {
+        return id.getAndIncrement();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
index c4a038ff1a..5e9f4e809c 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java
@@ -38,9 +38,11 @@ import java.util.Map;
 
 public class FakeDataRandomUtils {
     private final FakeConfig fakeConfig;
+    private final String jobId;
 
-    public FakeDataRandomUtils(FakeConfig fakeConfig) {
+    public FakeDataRandomUtils(FakeConfig fakeConfig, String jobId) {
         this.fakeConfig = fakeConfig;
+        this.jobId = jobId;
     }
 
     private static <T> T randomFromList(List<T> list) {
@@ -93,6 +95,22 @@ public class FakeDataRandomUtils {
     }
 
     public Integer randomInt(Column column) {
+        if (fakeConfig.getAutoIncrementEnabled()
+                && IdGeneratorUtils.isPrimaryColumn(fakeConfig, 
column.getName())) {
+            if (fakeConfig.getAutoIncrementStart()
+                            + ((long) fakeConfig.getParallelism() * 
fakeConfig.getRowNum())
+                    > Integer.MAX_VALUE) {
+                throw new IllegalArgumentException(
+                        "The auto increment start value is too large, please 
check your configuration.");
+            }
+            return IdGeneratorUtils.getIdGenerator(jobId, fakeConfig, 
column.getName())
+                    .orElseThrow(
+                            () ->
+                                    new IllegalArgumentException(
+                                            "Auto increment is enabled, but no 
id generator found."))
+                    .getNextId()
+                    .intValue();
+        }
         List<Integer> intTemplate = fakeConfig.getIntTemplate();
         if (!CollectionUtils.isEmpty(intTemplate)) {
             return randomFromList(intTemplate);
@@ -101,6 +119,15 @@ public class FakeDataRandomUtils {
     }
 
     public Long randomBigint(Column column) {
+        if (fakeConfig.getAutoIncrementEnabled()
+                && IdGeneratorUtils.isPrimaryColumn(fakeConfig, 
column.getName())) {
+            return IdGeneratorUtils.getIdGenerator(jobId, fakeConfig, 
column.getName())
+                    .orElseThrow(
+                            () ->
+                                    new IllegalArgumentException(
+                                            "Auto increment is enabled, but no 
id generator found."))
+                    .getNextId();
+        }
         List<Long> bigTemplate = fakeConfig.getBigTemplate();
         if (!CollectionUtils.isEmpty(bigTemplate)) {
             return randomFromList(bigTemplate);
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
new file mode 100644
index 0000000000..ee4e5430bc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.utils;
+
+import org.apache.seatunnel.shade.com.google.common.cache.Cache;
+import org.apache.seatunnel.shade.com.google.common.cache.CacheBuilder;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class IdGeneratorUtils {
+
+    private static final Cache<String, AutoIncrementIdGenerator> idGenerators =
+            CacheBuilder.newBuilder()
+                    .maximumSize(1000)
+                    .expireAfterWrite(30, TimeUnit.MINUTES)
+                    .build();
+
+    public static synchronized Optional<AutoIncrementIdGenerator> 
getIdGenerator(
+            String jobId, FakeConfig fakeConfig, String columnName) {
+        CatalogTable catalogTable = fakeConfig.getCatalogTable();
+        String tableName = catalogTable.getTableId().getTableName();
+        String key = String.format("%s:%s_%s", jobId, tableName, columnName);
+        AutoIncrementIdGenerator idGenerator = null;
+        try {
+            idGenerator =
+                    idGenerators.get(
+                            key,
+                            () -> {
+                                if (isPrimaryColumn(fakeConfig, columnName)) {
+                                    return new AutoIncrementIdGenerator(
+                                            
fakeConfig.getAutoIncrementStart());
+                                } else {
+                                    return null;
+                                }
+                            });
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        return Optional.ofNullable(idGenerator);
+    }
+
+    public static boolean isPrimaryColumn(FakeConfig fakeConfig, String 
columnName) {
+        PrimaryKey primaryKey = 
fakeConfig.getCatalogTable().getTableSchema().getPrimaryKey();
+        if (primaryKey == null) {
+            return false;
+        }
+        List<String> primaryColumns = primaryKey.getColumnNames();
+        return primaryColumns != null && primaryColumns.contains(columnName);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
index 81aae384bb..7971eab20f 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.EnvCommonOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -45,9 +46,13 @@ import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 public class FakeDataGeneratorTest {
 
@@ -59,7 +64,7 @@ public class FakeDataGeneratorTest {
         SeaTunnelRowType seaTunnelRowType =
                 
CatalogTableUtil.buildWithConfig(testConfig).getSeaTunnelRowType();
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
-        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig, null);
         List<SeaTunnelRow> seaTunnelRows =
                 fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
         Assertions.assertNotNull(seaTunnelRows);
@@ -114,7 +119,7 @@ public class FakeDataGeneratorTest {
 
         ReadonlyConfig testConfig = getTestConfigFile(conf);
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
-        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig, null);
         List<SeaTunnelRow> seaTunnelRows =
                 fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
         Assertions.assertIterableEquals(expected, seaTunnelRows);
@@ -125,7 +130,7 @@ public class FakeDataGeneratorTest {
     public void testVectorParse(String conf) throws FileNotFoundException, 
URISyntaxException {
         ReadonlyConfig testConfig = getTestConfigFile(conf);
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
-        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig, null);
         List<SeaTunnelRow> seaTunnelRows =
                 fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
         seaTunnelRows.forEach(
@@ -152,7 +157,7 @@ public class FakeDataGeneratorTest {
     public void testColumnDataParse(String conf) throws FileNotFoundException, 
URISyntaxException {
         ReadonlyConfig testConfig = getTestConfigFile(conf);
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
-        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig, null);
         List<SeaTunnelRow> seaTunnelRows =
                 fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
         seaTunnelRows.forEach(
@@ -215,7 +220,7 @@ public class FakeDataGeneratorTest {
     public void testDataParse(String conf) throws FileNotFoundException, 
URISyntaxException {
         ReadonlyConfig testConfig = getTestConfigFile(conf);
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
-        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+        FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig, null);
         List<SeaTunnelRow> seaTunnelRows =
                 fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
         seaTunnelRows.forEach(
@@ -229,6 +234,41 @@ public class FakeDataGeneratorTest {
                 });
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"fake-auto-increment-id.conf", 
"fake-auto-increment-id.conf"})
+    public void testAutoIncrementId(String conf) throws FileNotFoundException, 
URISyntaxException {
+        ReadonlyConfig testConfig = getTestConfigFile(conf);
+        int parallelism = 
testConfig.getOptional(EnvCommonOptions.PARALLELISM).orElse(1);
+        FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
+        List<CompletableFuture<List<SeaTunnelRow>>> futures = new 
ArrayList<>();
+        String jobId = UUID.randomUUID().toString();
+        for (int i = 0; i < parallelism; i++) {
+            CompletableFuture<List<SeaTunnelRow>> uCompletableFuture =
+                    CompletableFuture.supplyAsync(
+                            () -> {
+                                FakeDataGenerator fakeDataGenerator =
+                                        new FakeDataGenerator(fakeConfig, 
jobId);
+                                return 
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
+                            });
+            futures.add(uCompletableFuture);
+        }
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        List<SeaTunnelRow> seaTunnelRows =
+                futures.stream()
+                        .map(CompletableFuture::join)
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        List<Integer> ids =
+                seaTunnelRows.stream()
+                        .map(seaTunnelRow -> (int) seaTunnelRow.getField(0))
+                        .distinct()
+                        .sorted(Integer::compareTo)
+                        .collect(Collectors.toList());
+        Assertions.assertEquals(200, ids.size());
+        ids.stream().min(Integer::compareTo).ifPresent(min -> 
Assertions.assertEquals(100, min));
+        ids.stream().max(Integer::compareTo).ifPresent(max -> 
Assertions.assertEquals(299, max));
+    }
+
     private ReadonlyConfig getTestConfigFile(String configFile)
             throws FileNotFoundException, URISyntaxException {
         if (!configFile.startsWith("/")) {
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-auto-increment-id.conf
 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-auto-increment-id.conf
new file mode 100644
index 0000000000..56f83e02e9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-auto-increment-id.conf
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FakeSource {
+  plugin_output = "fake"
+  auto.increment.enabled = true
+  auto.increment.start = 100
+  parallelism = 4
+  row.num = 50
+  schema = {
+    fields {
+      id = "int"
+      name = "string"
+      age = "int"
+    }
+    primaryKey {
+      name = "pk"
+      columnNames = [id]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
index 466eff946e..374571d0c4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -151,7 +151,7 @@ public class PulsarBatchIT extends TestSuiteBase implements 
TestResource {
                     ConfigFactory.parseFile(new 
File(Paths.get(resource.toURI()).toString()));
 
             FakeConfig fakeConfig = 
FakeConfig.buildWithConfig(ReadonlyConfig.fromConfig(config));
-            FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig);
+            FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(fakeConfig, null);
             List<SeaTunnelRow> seaTunnelRows = 
fakeDataGenerator.generateFakedRows(100);
             JsonSerializationSchema jsonSerializationSchema =
                     new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);

Reply via email to