This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 74530a0461 [Improve][Docs] Refactor MySQL-CDC docs (#5302)
74530a0461 is described below

commit 74530a046194847cb61ca95b0eb691b0d6efd095
Author: hailin0 <[email protected]>
AuthorDate: Thu Aug 31 11:03:16 2023 +0800

    [Improve][Docs] Refactor MySQL-CDC docs (#5302)
    
    * [Improve][Docs] Refactor MySQL-CDC docs
    
    * Update MySqlTypeUtils.java
    
    * Update MySQL-CDC.md
---
 .../formats/cdc-compatible-debezium-json.md        |   1 -
 docs/en/connector-v2/source/MySQL-CDC.md           | 330 ++++++++++-----------
 .../source/MySqlIncrementalSourceFactory.java      |  16 +-
 .../cdc/mysql/source/MySqlSourceOptions.java       |  12 +-
 .../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java  |  12 +
 5 files changed, 186 insertions(+), 185 deletions(-)

diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md 
b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
index 002bd0c3be..e0751a2492 100644
--- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -19,7 +19,6 @@ source {
   MySQL-CDC {
     result_table_name = "table1"
 
-    hostname = localhost
     base-url="jdbc:mysql://localhost:3306/test"
     "startup.mode"=INITIAL
     catalog {
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index caeeca0628..6740fd4b8b 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -2,10 +2,9 @@
 
 > MySQL CDC source connector
 
-## Description
+## Support Those Engines
 
-The MySQL CDC connector allows for reading snapshot data and incremental data 
from MySQL database. This document
-describes how to set up the MySQL CDC connector to run SQL queries against 
MySQL databases.
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
@@ -16,207 +15,202 @@ describes how to set up the MySQL CDC connector to run 
SQL queries against MySQL
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [x] [support user-defined split](../../concept/connector-v2-features.md)
 
-## Options
-
-|                      name                      |   type   | required | 
default value |
-|------------------------------------------------|----------|----------|---------------|
-| username                                       | String   | Yes      | -     
        |
-| password                                       | String   | Yes      | -     
        |
-| database-names                                 | List     | No       | -     
        |
-| table-names                                    | List     | Yes      | -     
        |
-| base-url                                       | String   | Yes      | -     
        |
-| startup.mode                                   | Enum     | No       | 
INITIAL       |
-| startup.timestamp                              | Long     | No       | -     
        |
-| startup.specific-offset.file                   | String   | No       | -     
        |
-| startup.specific-offset.pos                    | Long     | No       | -     
        |
-| stop.mode                                      | Enum     | No       | NEVER 
        |
-| stop.timestamp                                 | Long     | No       | -     
        |
-| stop.specific-offset.file                      | String   | No       | -     
        |
-| stop.specific-offset.pos                       | Long     | No       | -     
        |
-| incremental.parallelism                        | Integer  | No       | 1     
        |
-| snapshot.split.size                            | Integer  | No       | 8096  
        |
-| snapshot.fetch.size                            | Integer  | No       | 1024  
        |
-| server-id                                      | String   | No       | -     
        |
-| server-time-zone                               | String   | No       | UTC   
        |
-| connect.timeout.ms                             | Duration | No       | 30000 
        |
-| connect.max-retries                            | Integer  | No       | 3     
        |
-| connection.pool.size                           | Integer  | No       | 20    
        |
-| chunk-key.even-distribution.factor.upper-bound | Double   | No       | 100   
        |
-| chunk-key.even-distribution.factor.lower-bound | Double   | No       | 0.05  
        |
-| sample-sharding.threshold                      | int      | No       | 1000  
        |
-| inverse-sampling.rate                          | int      | No       | 1000  
        |
-| exactly_once                                   | Boolean  | No       | true  
        |
-| debezium.*                                     | config   | No       | -     
        |
-| format                                         | Enum     | No       | 
DEFAULT       |
-| common-options                                 |          | no       | -     
        |
-
-### username [String]
-
-Name of the database to use when connecting to the database server.
-
-### password [String]
-
-Password to use when connecting to the database server.
-
-### database-names [List]
-
-Database name of the database to monitor.
-
-### table-names [List]
-
-Table name of the database to monitor. The table name needs to include the 
database name, for example: database_name.table_name
-
-### base-url [String]
-
-URL has to be with database, like "jdbc:mysql://localhost:5432/db" or 
"jdbc:mysql://localhost:5432/db?useSSL=true".
-
-### startup.mode [Enum]
-
-Optional startup mode for MySQL CDC consumer, valid enumerations are 
"initial", "earliest", "latest" and "specific".
-
-### startup.timestamp [Long]
-
-Start from the specified epoch timestamp (in milliseconds).
-
-**Note, This option is required when the "startup.mode" option used 
`'timestamp'`.**
-
-### startup.specific-offset.file [String]
-
-Start from the specified binlog file name.
-
-**Note, This option is required when the "startup.mode" option used 
`'specific'`.**
-
-### startup.specific-offset.pos [Long]
-
-Start from the specified binlog file position.
-
-**Note, This option is required when the "startup.mode" option used 
`'specific'`.**
-
-### stop.mode [Enum]
-
-Optional stop mode for MySQL CDC consumer, valid enumerations are "never".
-
-### stop.timestamp [Long]
-
-Stop from the specified epoch timestamp (in milliseconds).
-
-**Note, This option is required when the "stop.mode" option used 
`'timestamp'`.**
-
-### stop.specific-offset.file [String]
-
-Stop from the specified binlog file name.
-
-**Note, This option is required when the "stop.mode" option used 
`'specific'`.**
-
-### stop.specific-offset.pos [Long]
-
-Stop from the specified binlog file position.
-
-**Note, This option is required when the "stop.mode" option used 
`'specific'`.**
-
-### incremental.parallelism [Integer]
-
-The number of parallel readers in the incremental phase.
-
-### snapshot.split.size [Integer]
-
-The split size (number of rows) of table snapshot, captured tables are split 
into multiple splits when read the snapshot
-of table.
-
-### snapshot.fetch.size [Integer]
-
-The maximum fetch size for per poll when read table snapshot.
-
-### chunk-key.even-distribution.factor.upper-bound [Double]
-
-The upper bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
-
-### chunk-key.even-distribution.factor.lower-bound [Double]
+## Description
 
-The lower bound of the chunk key distribution factor. This factor is used to 
determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unevenly distributed and the sampling-based sharding 
strategy will be used if the estim [...]
+The MySQL CDC connector allows for reading snapshot data and incremental data 
from MySQL database. This document
+describes how to set up the MySQL CDC connector to run SQL queries against 
MySQL databases.
 
-### sample-sharding.threshold [Integer]
+## Supported DataSource Info
 
-This configuration specifies the threshold of estimated shard count to trigger 
the sample sharding strategy. When the distribution factor is outside the 
bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy will be used. This can help to handle large datasets 
more efficiently. The default [...]
+| Datasource |                                                               
Supported versions                                                              
  |          Driver          |               Url                |               
                 Maven                                 |
+|------------|-------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
+| MySQL      | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x 
</li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 
8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
 
-### inverse-sampling.rate [Integer]
+## Database Dependency
 
-The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferred. The default value is 1000.
+### Install Jdbc Driver
 
-### server-id [String]
+Please download and put mysql driver in `${SEATUNNEL_HOME}/lib/` dir. For 
example: cp mysql-connector-java-xxx.jar `$SEATNUNNEL_HOME/lib/`
 
-A numeric ID or a numeric ID range of this database client, The numeric ID 
syntax is like '5400', the numeric ID range
-syntax is like '5400-5408'.
+### Creating MySQL user
 
-Every ID must be unique across all currently-running database processes in the 
MySQL cluster. This connector joins the
-MySQL cluster as another server (with this unique ID) so it can read the 
binlog.
+You have to define a MySQL user with appropriate permissions on all databases 
that the Debezium MySQL connector monitors.
 
-By default, a random number is generated between 5400 and 6400, though we 
recommend setting an explicit value.
+1. Create the MySQL user:
 
-### server-time-zone [String]
+```sql
+mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
+```
 
-The session time zone in database server. If not set, then 
ZoneId.systemDefault() is used to determine the server time zone.
+2. Grant the required permissions to the user:
 
-### connect.timeout.ms [long]
+```sql
+mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION 
CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
+```
 
-The maximum time that the connector should wait after trying to connect to the 
database server before timing out.
+3. Finalize the user’s permissions:
 
-### connect.max-retries [Integer]
+```sql
+mysql> FLUSH PRIVILEGES;
+```
 
-The max retry times that the connector should retry to build database server 
connection.
+### Enabling the MySQL binlog
 
-### connection.pool.size [Integer]
+You must enable binary logging for MySQL replication. The binary logs record 
transaction updates for replication tools to propagate changes.
 
-The connection pool size.
+1. Check whether the `log-bin` option is already on:
 
-### exactly_once [Boolean]
+```sql
+mysql> show variables where variable_name in ('log_bin', 'binlog_format', 
'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
++--------------------------+----------------+
+| Variable_name            | Value          |
++--------------------------+----------------+
+| binlog_format            | ROW            |
+| binlog_row_image         | FULL           |
+| enforce_gtid_consistency | ON             |
+| gtid_mode                | ON             |
+| log_bin                  | ON             |
++--------------------------+----------------+
+5 rows in set (0.00 sec)
+```
 
-Enable exactly once semantic.
+2. If inconsistent with the above results, configure your MySQL server 
configuration file(`$MYSQL_HOME/mysql.cnf`) with the following properties, 
which are described in the table below:
 
-### debezium [Config]
+```
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 10
+binlog_format     = row
+binlog_row_image  = FULL
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
+```
 
-Pass-through Debezium's properties to Debezium Embedded Engine which is used 
to capture data changes from MySQL server.
+3. Restart MySQL Server
 
-See more about
-the [Debezium's MySQL Connector 
properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties)
+```shell
+/etc/inint.d/mysqld restart
+```
 
-### format [Enum]
+4. Confirm your changes by checking the binlog status once more:
+
+```sql
+mysql> show variables where variable_name in ('log_bin', 'binlog_format', 
'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
++--------------------------+----------------+
+| Variable_name            | Value          |
++--------------------------+----------------+
+| binlog_format            | ROW            |
+| binlog_row_image         | FULL           |
+| enforce_gtid_consistency | ON             |
+| gtid_mode                | ON             |
+| log_bin                  | ON             |
++--------------------------+----------------+
+5 rows in set (0.00 sec)
+```
 
-Optional output format for MySQL CDC, valid enumerations are 
"DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".
+### Notes
+
+#### Setting up MySQL session timeouts
+
+When an initial consistent snapshot is made for large databases, your 
established connection could timeout while the tables are being read. You can 
prevent this behavior by configuring interactive_timeout and wait_timeout in 
your MySQL configuration file.
+- `interactive_timeout`: The number of seconds the server waits for activity 
on an interactive connection before closing it. See [MySQL’s 
documentation](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_interactive_timeout)
 for more details.
+- `wait_timeout`: The number of seconds the server waits for activity on a 
non-interactive connection before closing it. See [MySQL’s 
documentation](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_wait_timeout)
 for more details.
+
+*For more database settings see [Debezium MySQL 
Connector](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql)*
+
+## Data Type Mapping
+
+|                                     Mysql Data type                          
            | SeaTunnel Data type |
+|------------------------------------------------------------------------------------------|---------------------|
+| BIT(1)<br/>TINYINT(1)                                                        
            | BOOLEAN             |
+| TINYINT                                                                      
            | TINYINT             |
+| TINYINT UNSIGNED<br/>SMALLINT                                                
            | SMALLINT            |
+| SMALLINT UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT 
UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR      | INT                 |
+| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT                                 
            | BIGINT              |
+| BIGINT UNSIGNED                                                              
            | DECIMAL(20,0)       |
+| DECIMAL(p, s) <br/>DECIMAL(p, s) UNSIGNED <br/>NUMERIC(p, s) <br/>NUMERIC(p, 
s) UNSIGNED | DECIMAL(p,s)        |
+| FLOAT<br/>FLOAT UNSIGNED                                                     
            | FLOAT               |
+| DOUBLE<br/>DOUBLE UNSIGNED<br/>REAL<br/>REAL UNSIGNED                        
            | DOUBLE              |
+| 
CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>ENUM<br/>JSON
     | STRING              |
+| DATE                                                                         
            | DATE                |
+| TIME                                                                         
            | TIME                |
+| DATETIME<br/>TIMESTAMP                                                       
            | TIMESTAMP           |
+| 
BINARY<br/>VARBINAR<br/>BIT(p)<br/>TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB
         | BYTES               |
+
+## Source Options
+
+|                      Name                      |   Type   | Required | 
Default |                                                                       
                                                                                
                                                                                
                                                              Description       
                                                                                
                    [...]
+|------------------------------------------------|----------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| base-url                                       | String   | Yes      | -     
  | The URL of the JDBC connection. Refer to a case: 
`jdbc:mysql://localhost:3306:3306/test`.                                        
                                                                                
                                                                                
                                                                                
                                         [...]
+| username                                       | String   | Yes      | -     
  | Name of the database to use when connecting to the database server.         
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| password                                       | String   | Yes      | -     
  | Password to use when connecting to the database server.                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| database-names                                 | List     | No       | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| table-names                                    | List     | Yes      | -     
  | Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
               [...]
+| startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize 
historical data at startup, and then synchronize incremental data.<br/> 
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup 
from the latest offset.<br/> `specific`: Startup from user-supplied specific 
offsets.                             [...]
+| startup.specific-offset.file                   | String   | No       | -     
  | Start from the specified binlog file name. **Note, This option is required 
when the `startup.mode` option used `specific`.**                               
                                                                                
                                                                                
                                                                                
               [...]
+| startup.specific-offset.pos                    | Long     | No       | -     
  | Start from the specified binlog file position. **Note, This option is 
required when the `startup.mode` option used `specific`.**                      
                                                                                
                                                                                
                                                                                
                    [...]
+| stop.mode                                      | Enum     | No       | NEVER 
  | Optional stop mode for MySQL CDC consumer, valid enumerations are `never`, 
`latest` or `specific`. <br/> `never`: Real-time job don't stop the 
source.<br/> `latest`: Stop from the latest offset.<br/> `specific`: Stop from 
user-supplied specific offset.                                                  
                                                                                
                            [...]
+| stop.specific-offset.file                      | String   | No       | -     
  | Stop from the specified binlog file name. **Note, This option is required 
when the `stop.mode` option used `specific`.**                                  
                                                                                
                                                                                
                                                                                
                [...]
+| stop.specific-offset.pos                       | Long     | No       | -     
  | Stop from the specified binlog file position. **Note, This option is 
required when the `stop.mode` option used `specific`.**                         
                                                                                
                                                                                
                                                                                
                     [...]
+| snapshot.split.size                            | Integer  | No       | 8096  
  | The split size (number of rows) of table snapshot, captured tables are 
split into multiple splits when read the snapshot of table.                     
                                                                                
                                                                                
                                                                                
                   [...]
+| snapshot.fetch.size                            | Integer  | No       | 1024  
  | The maximum fetch size for per poll when read table snapshot.               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| server-id                                      | String   | No       | -     
  | A numeric ID or a numeric ID range of this database client, The numeric ID 
syntax is like `5400`, the numeric ID range syntax is like '5400-5408'. <br/> 
Every ID must be unique across all currently-running database processes in the 
MySQL cluster. This connector joins the <br/> MySQL cluster as another server 
(with this unique ID) so it can read the binlog. <br/> By default, a random 
number is generated bet [...]
+| server-time-zone                               | String   | No       | UTC   
  | The session time zone in database server. If not set, then 
ZoneId.systemDefault() is used to determine the server time zone.               
                                                                                
                                                                                
                                                                                
                               [...]
+| connect.timeout.ms                             | Duration | No       | 30000 
  | The maximum time that the connector should wait after trying to connect to 
the database server before timing out.                                          
                                                                                
                                                                                
                                                                                
               [...]
+| connect.max-retries                            | Integer  | No       | 3     
  | The max retry times that the connector should retry to build database 
server connection.                                                              
                                                                                
                                                                                
                                                                                
                    [...]
+| connection.pool.size                           | Integer  | No       | 20    
  | The jdbc connection pool size.                                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| chunk-key.even-distribution.factor.upper-bound | Double   | No       | 100   
  | The upper bound of the chunk key distribution factor. This factor is used 
to determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be less than or equal to this upper bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is greater, the table 
will be considered as unev [...]
+| chunk-key.even-distribution.factor.lower-bound | Double   | No       | 0.05  
  | The lower bound of the chunk key distribution factor. This factor is used 
to determine whether the table data is evenly distributed. If the distribution 
factor is calculated to be greater than or equal to this lower bound (i.e., 
(MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for 
even distribution. Otherwise, if the distribution factor is less, the table 
will be considered as unev [...]
+| sample-sharding.threshold                      | Integer  | No       | 1000  
  | This configuration specifies the threshold of estimated shard count to 
trigger the sample sharding strategy. When the distribution factor is outside 
the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strategy [...]
+| inverse-sampling.rate                          | Integer  | No       | 1000  
  | The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferr [...]
+| exactly_once                                   | Boolean  | No       | true  
  | Enable exactly once semantic.                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| format                                         | Enum     | No       | 
DEFAULT | Optional output format for MySQL CDC, valid enumerations are 
`DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`.                                           
                                                                                
                                                                                
                                                                                
                             [...]
+| debezium                                       | Config   | No       | -     
  | Pass-through [Debezium's 
properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties)
 to Debezium Embedded Engine which is used to capture data changes from MySQL 
server.                                                                         
                                                                                
                                      [...]
+| common-options                                 |          | no       | -     
  | Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                                                                
                          [...]
+
+## Task Example
+
+### Simple
+
+> Support multi-table reading
 
-#### example
+```
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 10000
+}
 
-```conf
 source {
   MySQL-CDC {
-    debezium {
-        snapshot.mode = "never"
-        decimal.handling.mode = "double"
+    catalog = {
+      factory = MySQL
     }
+    base-url = "jdbc:mysql://localhost:3306/testdb"
+    username = "root"
+    password = "root@123"
+    table-names = ["testdb.table1", "testdb.table2"]
+    
+    startup.mode = "initial"
   }
 }
-```
-
-### common options
-
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
-
-## Example
 
-```Jdbc {
-source {
-  MySQL-CDC {
-    result_table_name = "fake"
-    parallelism = 1
-    server-id = 5656
-    username = "mysqluser"
-    password = "mysqlpw"
-    table-names = ["inventory_vwyw0n.products"]
-    base-url = "jdbc:mysql://localhost:56725/inventory_vwyw0n"
+sink {
+  Console {
   }
 }
 ```
 
+### Support debezium-compatible format send to kafka
+
+> Must be used with kafka connector sink, see [compatible debezium 
format](../formats/cdc-compatible-debezium-json.md) for details
+
 ## Changelog
 
 - Add MySQL CDC Source Connector
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index a84eb79be3..6429fa4b52 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -71,6 +71,10 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
                         JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
                         JdbcSourceOptions.INVERSE_SAMPLING_RATE)
                 .optional(MySqlSourceOptions.STARTUP_MODE, 
MySqlSourceOptions.STOP_MODE)
+                .conditional(
+                        MySqlSourceOptions.STARTUP_MODE,
+                        StartupMode.INITIAL,
+                        SourceOptions.EXACTLY_ONCE)
                 .conditional(
                         MySqlSourceOptions.STARTUP_MODE,
                         StartupMode.SPECIFIC,
@@ -81,18 +85,6 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
                         StopMode.SPECIFIC,
                         SourceOptions.STOP_SPECIFIC_OFFSET_FILE,
                         SourceOptions.STOP_SPECIFIC_OFFSET_POS)
-                .conditional(
-                        MySqlSourceOptions.STARTUP_MODE,
-                        StartupMode.TIMESTAMP,
-                        SourceOptions.STARTUP_TIMESTAMP)
-                .conditional(
-                        MySqlSourceOptions.STOP_MODE,
-                        StopMode.TIMESTAMP,
-                        SourceOptions.STOP_TIMESTAMP)
-                .conditional(
-                        MySqlSourceOptions.STARTUP_MODE,
-                        StartupMode.INITIAL,
-                        SourceOptions.EXACTLY_ONCE)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
index 43f3f4c70c..bc59fd0f5c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
@@ -34,18 +34,22 @@ public class MySqlSourceOptions {
                                     Arrays.asList(
                                             StartupMode.INITIAL,
                                             StartupMode.EARLIEST,
-                                            StartupMode.LATEST))
+                                            StartupMode.LATEST,
+                                            StartupMode.SPECIFIC))
                             .defaultValue(StartupMode.INITIAL)
                             .withDescription(
                                     "Optional startup mode for CDC source, 
valid enumerations are "
-                                            + "\"initial\", \"earliest\", 
\"latest\", \"timestamp\"\n or \"specific\"");
+                                            + "\"initial\", \"earliest\", 
\"latest\" or \"specific\"");
 
     public static final SingleChoiceOption<StopMode> STOP_MODE =
             (SingleChoiceOption)
                     Options.key(SourceOptions.STOP_MODE_KEY)
-                            .singleChoice(StopMode.class, 
Arrays.asList(StopMode.NEVER))
+                            .singleChoice(
+                                    StopMode.class,
+                                    Arrays.asList(
+                                            StopMode.LATEST, 
StopMode.SPECIFIC, StopMode.NEVER))
                             .defaultValue(StopMode.NEVER)
                             .withDescription(
                                     "Optional stop mode for CDC source, valid 
enumerations are "
-                                            + "\"never\", \"latest\", 
\"timestamp\"\n or \"specific\"");
+                                            + "\"never\", \"latest\" or 
\"specific\"");
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
index d30ba32d22..00c10f53cb 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -50,10 +50,14 @@ public class MySqlTypeUtils {
     private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
     private static final String MYSQL_DECIMAL = "DECIMAL";
     private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String MYSQL_NUMERIC = "NUMERIC";
+    private static final String MYSQL_NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
     private static final String MYSQL_FLOAT = "FLOAT";
     private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
     private static final String MYSQL_DOUBLE = "DOUBLE";
     private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String MYSQL_REAL = "REAL";
+    private static final String MYSQL_REAL_UNSIGNED = "REAL UNSIGNED";
 
     // -------------------------string----------------------------
     private static final String MYSQL_CHAR = "CHAR";
@@ -63,6 +67,7 @@ public class MySqlTypeUtils {
     private static final String MYSQL_TEXT = "TEXT";
     private static final String MYSQL_LONGTEXT = "LONGTEXT";
     private static final String MYSQL_JSON = "JSON";
+    private static final String MYSQL_ENUM = "ENUM";
 
     // ------------------------------time-------------------------
     private static final String MYSQL_DATE = "DATE";
@@ -89,6 +94,7 @@ public class MySqlTypeUtils {
                 return column.length() == 1 ? BasicType.BOOLEAN_TYPE : 
BasicType.INT_TYPE;
             case MYSQL_TINYINT_UNSIGNED:
             case MYSQL_SMALLINT:
+                return BasicType.SHORT_TYPE;
             case MYSQL_SMALLINT_UNSIGNED:
             case MYSQL_MEDIUMINT:
             case MYSQL_MEDIUMINT_UNSIGNED:
@@ -103,6 +109,9 @@ public class MySqlTypeUtils {
             case MYSQL_BIGINT_UNSIGNED:
                 return new DecimalType(20, 0);
             case MYSQL_DECIMAL:
+            case MYSQL_DECIMAL_UNSIGNED:
+            case MYSQL_NUMERIC:
+            case MYSQL_NUMERIC_UNSIGNED:
                 return new DecimalType(column.length(), 
column.scale().orElse(0));
             case MYSQL_FLOAT:
                 return BasicType.FLOAT_TYPE;
@@ -110,8 +119,10 @@ public class MySqlTypeUtils {
                 log.warn("{} will probably cause value overflow.", 
MYSQL_FLOAT_UNSIGNED);
                 return BasicType.FLOAT_TYPE;
             case MYSQL_DOUBLE:
+            case MYSQL_REAL:
                 return BasicType.DOUBLE_TYPE;
             case MYSQL_DOUBLE_UNSIGNED:
+            case MYSQL_REAL_UNSIGNED:
                 log.warn("{} will probably cause value overflow.", 
MYSQL_DOUBLE_UNSIGNED);
                 return BasicType.DOUBLE_TYPE;
             case MYSQL_CHAR:
@@ -120,6 +131,7 @@ public class MySqlTypeUtils {
             case MYSQL_TEXT:
             case MYSQL_VARCHAR:
             case MYSQL_JSON:
+            case MYSQL_ENUM:
                 return BasicType.STRING_TYPE;
             case MYSQL_LONGTEXT:
                 log.warn(


Reply via email to